@ -32,6 +32,8 @@ import org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTest
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator ;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord ;
import org.apache.calcite.runtime.CalciteContextException ;
import org.apache.calcite.sql.validate.SqlValidatorException ;
import org.assertj.core.api.Assertions ;
import org.junit.jupiter.api.Test ;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap ;
@ -276,6 +278,38 @@ public class PostTransformOperatorTest {
. primaryKey ( "col1" )
. build ( ) ;
private static final TableId COMPARE_TABLEID =
TableId . tableId ( "my_company" , "my_branch" , "compare_table" ) ;
private static final Schema COMPARE_SCHEMA =
Schema . newBuilder ( )
. physicalColumn ( "col1" , DataTypes . STRING ( ) . notNull ( ) )
. physicalColumn ( "numerical_equal" , DataTypes . BOOLEAN ( ) )
. physicalColumn ( "string_equal" , DataTypes . BOOLEAN ( ) )
. physicalColumn ( "time_equal" , DataTypes . BOOLEAN ( ) )
. physicalColumn ( "timestamp_equal" , DataTypes . BOOLEAN ( ) )
. physicalColumn ( "date_equal" , DataTypes . BOOLEAN ( ) )
. primaryKey ( "col1" )
. build ( ) ;
private static final TableId COMPARE_DATA_TABLEID =
TableId . tableId ( "my_company" , "my_branch" , "compare_data_table" ) ;
private static final Schema COMPARE_DATA_SCHEMA =
Schema . newBuilder ( )
. physicalColumn ( "id" , DataTypes . INT ( ) . notNull ( ) )
. physicalColumn ( "c1" , DataTypes . FLOAT ( ) . nullable ( ) )
. physicalColumn ( "c2" , DataTypes . DOUBLE ( ) . nullable ( ) )
. physicalColumn ( "c3" , DataTypes . TIMESTAMP ( ) . nullable ( ) )
. primaryKey ( "id" )
. build ( ) ;
private static final Schema EXPECTD_COMPARE_DATA_SCHEMA =
Schema . newBuilder ( )
. physicalColumn ( "id" , DataTypes . INT ( ) . notNull ( ) )
. physicalColumn ( "float_equal" , DataTypes . BOOLEAN ( ) )
. physicalColumn ( "double_equal" , DataTypes . BOOLEAN ( ) )
. physicalColumn ( "timestamp_equal" , DataTypes . BOOLEAN ( ) )
. primaryKey ( "id" )
. build ( ) ;
void testDataChangeEventTransform ( ) throws Exception {
PostTransformOperator transform =
@ -1961,6 +1995,191 @@ public class PostTransformOperatorTest {
transformFunctionEventEventOperatorTestHarness . close ( ) ;
void testCompareTransform ( ) throws Exception {
PostTransformOperator transform =
PostTransformOperator . newBuilder ( )
. addTransform (
COMPARE_TABLEID . identifier ( ) ,
"col1, 2.1 > 1 as numerical_equal,"
+ " '2024-01-01 00:00:00' < '2024-08-01 00:00:00' as string_equal,"
+ " LOCALTIME <= CURRENT_TIME as time_equal,"
+ " TO_TIMESTAMP('2024-01-01 00:00:00') <= TO_TIMESTAMP('2024-08-01 00:00:00') as timestamp_equal,"
"2 > 1" )
. addTimezone ( "UTC" )
. build ( ) ;
RegularEventOperatorTestHarness < PostTransformOperator , Event >
transformFunctionEventEventOperatorTestHarness =
RegularEventOperatorTestHarness . with ( transform , 1 ) ;
// Initialization
transformFunctionEventEventOperatorTestHarness . open ( ) ;
// Create table
CreateTableEvent createTableEvent = new CreateTableEvent ( COMPARE_TABLEID , COMPARE_SCHEMA ) ;
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator ( ( ( RowType ) COMPARE_SCHEMA . toRowDataType ( ) ) ) ;
// Insert
DataChangeEvent insertEvent =
DataChangeEvent . insertEvent (
recordDataGenerator . generate (
new Object [ ] {
new BinaryStringData ( "1" ) , null , null , null , null , null
} ) ) ;
DataChangeEvent insertEventExpect =
DataChangeEvent . insertEvent (
recordDataGenerator . generate (
new Object [ ] {
new BinaryStringData ( "1" ) , true , true , true , true , true
} ) ) ;
transform . processElement ( new StreamRecord < > ( createTableEvent ) ) ;
Assertions . assertThat (
transformFunctionEventEventOperatorTestHarness . getOutputRecords ( ) . poll ( ) )
. isEqualTo (
new StreamRecord < > ( new CreateTableEvent ( COMPARE_TABLEID , COMPARE_SCHEMA ) ) ) ;
transform . processElement ( new StreamRecord < > ( insertEvent ) ) ;
Assertions . assertThat (
transformFunctionEventEventOperatorTestHarness . getOutputRecords ( ) . poll ( ) )
. isEqualTo ( new StreamRecord < > ( insertEventExpect ) ) ;
void testCompareErrorTransform ( ) throws Exception {
PostTransformOperator transform =
PostTransformOperator . newBuilder ( )
. addTransform (
COMPARE_TABLEID . identifier ( ) ,
"col1, 2.1 > TO_TIMESTAMP('2024-01-01 00:00:00') as numerical_equal,"
+ " '2024-01-01 00:00:00' < '2024-08-01 00:00:00' as string_equal,"
+ " LOCALTIME <= CURRENT_TIME as time_equal,"
+ " TO_TIMESTAMP('2024-01-01 00:00:00') <= TO_TIMESTAMP('2024-08-01 00:00:00') as timestamp_equal,"
"2 > 1" )
. addTimezone ( "UTC" )
. build ( ) ;
RegularEventOperatorTestHarness < PostTransformOperator , Event >
transformFunctionEventEventOperatorTestHarness =
RegularEventOperatorTestHarness . with ( transform , 1 ) ;
// Initialization
transformFunctionEventEventOperatorTestHarness . open ( ) ;
// Create table
CreateTableEvent createTableEvent = new CreateTableEvent ( COMPARE_TABLEID , COMPARE_SCHEMA ) ;
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator ( ( ( RowType ) COMPARE_SCHEMA . toRowDataType ( ) ) ) ;
// Insert
DataChangeEvent insertEvent =
DataChangeEvent . insertEvent (
recordDataGenerator . generate (
new Object [ ] {
new BinaryStringData ( "1" ) , null , null , null , null , null
} ) ) ;
DataChangeEvent insertEventExpect =
DataChangeEvent . insertEvent (
recordDataGenerator . generate (
new Object [ ] {
new BinaryStringData ( "1" ) , true , true , true , true , true
} ) ) ;
Assertions . assertThatThrownBy (
( ) - > {
transform . processElement ( new StreamRecord < > ( createTableEvent ) ) ;
} )
. isExactlyInstanceOf ( CalciteContextException . class )
. hasRootCauseInstanceOf ( SqlValidatorException . class )
. hasRootCauseMessage (
"Cannot apply '>' to arguments of type '<DECIMAL(2, 1)> > <TIMESTAMP(3)>'. Supported form(s): '<COMPARABLE_TYPE> > <COMPARABLE_TYPE>'" ) ;
void testCompareDataTransform ( ) throws Exception {
PostTransformOperator transform =
PostTransformOperator . newBuilder ( )
. addTransform (
COMPARE_DATA_TABLEID . identifier ( ) ,
"id, c1 < 5 as float_equal, c2 > 2.5 as double_equal, c3 <= TO_TIMESTAMP('2024-01-01 00:00:00') as timestamp_equal" ,
null )
. addTimezone ( "UTC" )
. build ( ) ;
RegularEventOperatorTestHarness < PostTransformOperator , Event >
transformFunctionEventEventOperatorTestHarness =
RegularEventOperatorTestHarness . with ( transform , 1 ) ;
// Initialization
transformFunctionEventEventOperatorTestHarness . open ( ) ;
// Create table
CreateTableEvent createTableEvent =
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator ( ( ( RowType ) COMPARE_DATA_SCHEMA . toRowDataType ( ) ) ) ;
BinaryRecordDataGenerator expectRecordDataGenerator =
new BinaryRecordDataGenerator (
( ( RowType ) EXPECTD_COMPARE_DATA_SCHEMA . toRowDataType ( ) ) ) ;
// Insert
DataChangeEvent insertEvent1 =
DataChangeEvent . insertEvent (
recordDataGenerator . generate (
new Object [ ] {
1 ,
new Float ( 4f ) ,
new Double ( 3.5d ) ,
TimestampData . fromMillis ( 1672502400000L )
} ) ) ;
DataChangeEvent insertEventExpect1 =
DataChangeEvent . insertEvent (
expectRecordDataGenerator . generate ( new Object [ ] { 1 , true , true , true } ) ) ;
DataChangeEvent insertEvent2 =
DataChangeEvent . insertEvent (
recordDataGenerator . generate (
new Object [ ] {
2 ,
new Float ( 10 f ) ,
new Double ( 0d ) ,
TimestampData . fromMillis ( 1730390400000L )
} ) ) ;
DataChangeEvent insertEventExpect2 =
DataChangeEvent . insertEvent (
expectRecordDataGenerator . generate ( new Object [ ] { 2 , false , false , false } ) ) ;
DataChangeEvent insertEvent3 =
DataChangeEvent . insertEvent (
recordDataGenerator . generate ( new Object [ ] { 3 , null , null , null } ) ) ;
DataChangeEvent insertEventExpect3 =
DataChangeEvent . insertEvent (
expectRecordDataGenerator . generate ( new Object [ ] { 3 , false , false , false } ) ) ;
transform . processElement ( new StreamRecord < > ( createTableEvent ) ) ;
Assertions . assertThat (
transformFunctionEventEventOperatorTestHarness . getOutputRecords ( ) . poll ( ) )
. isEqualTo (
new StreamRecord < > (
new CreateTableEvent (
transform . processElement ( new StreamRecord < > ( insertEvent1 ) ) ;
Assertions . assertThat (
transformFunctionEventEventOperatorTestHarness . getOutputRecords ( ) . poll ( ) )
. isEqualTo ( new StreamRecord < > ( insertEventExpect1 ) ) ;
transform . processElement ( new StreamRecord < > ( insertEvent2 ) ) ;
Assertions . assertThat (
transformFunctionEventEventOperatorTestHarness . getOutputRecords ( ) . poll ( ) )
. isEqualTo ( new StreamRecord < > ( insertEventExpect2 ) ) ;
transform . processElement ( new StreamRecord < > ( insertEvent3 ) ) ;
Assertions . assertThat (
transformFunctionEventEventOperatorTestHarness . getOutputRecords ( ) . poll ( ) )
. isEqualTo ( new StreamRecord < > ( insertEventExpect3 ) ) ;
void testBuildInFunctionTransform ( ) throws Exception {
testExpressionConditionTransform (