diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 54ae2c225..ebec5e160 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -41,6 +41,7 @@ import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.connectors.values.ValuesDatabase; import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink; import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper; import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions; @@ -52,8 +53,11 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.ByteArrayOutputStream; import java.io.PrintStream; @@ -116,6 +120,472 @@ class FlinkPipelineTransformITCase { System.setOut(standardOut); } + /** This tests if we can append calculated columns based on existing columns. */ + @ParameterizedTest + @EnumSource + void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, id || name AS uid, age * 2 AS double_age", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 1Alice, 36], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 2Bob, 40], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 2Bob, 40], after=[2, Bob, 30, 2Bob, 60], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 3Carol, 30], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 4Derrida, 50], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 4Derrida, 50], after=[], op=DELETE, meta=()}")); + } + + /** This tests if we can reference a column more than once in projection expressions. */ + @ParameterizedTest + @EnumSource + void testMultipleReferencedColumnsInProjection(ValuesDataSink.SinkApi sinkApi) + throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, CAST(age * age * age AS INT) AS cubic_age", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`cubic_age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 5832], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 8000], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 8000], after=[2, Bob, 30, 27000], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`cubic_age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 3375], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 15625], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 15625], after=[], op=DELETE, meta=()}")); + } + + /** This tests if we can reference a column more than once in filtering expressions. */ + @ParameterizedTest + @EnumSource + void testMultipleReferencedColumnsInFilter(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + null, + "id > 2 AND id < 4", + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}")); + } + + /** This tests if we can filter out source records by rule. */ + @ParameterizedTest + @EnumSource + void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + null, + "CHAR_LENGTH(name) > 3", + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); + } + + /** + * This tests if transform rule could be used to classify source records based on filtering + * rules. + */ + @ParameterizedTest + @EnumSource + void testMultipleDispatchTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Arrays.asList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, 'YOUNG' AS category", + "age < 20", + null, + null, + null, + null), + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, 'OLD' AS category", + "age >= 20", + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`category` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, YOUNG], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, OLD], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, OLD], after=[2, Bob, 30, OLD], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`category` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, YOUNG], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, OLD], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}")); + } + + /** This tests if transform generates metadata info correctly. */ + @ParameterizedTest + @EnumSource + void testMetadataInfo(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*", + null, + "id,name", + "id", + "replication_num=1,bucket=17", + "Just a Transform Block")), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); + } + + /** + * This tests if transform generates metadata info correctly without specifying projection / + * filtering rules. + */ + @ParameterizedTest + @EnumSource + @Disabled("This doesn't work until FLINK-35982 got fixed.") + void testMetadataInfoWithoutChangingSchema(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + null, + null, + "id,name", + "id", + "replication_num=1,bucket=17", + "Just a Transform Block")), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); + } + + /** This tests if projection rule could reference metadata info correctly. */ + @ParameterizedTest + @EnumSource + void testMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "id, name, age, __namespace_name__, __schema_name__, __table_name__", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, default_namespace, default_schema, mytable1], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, default_namespace, default_schema, mytable2], after=[], op=DELETE, meta=()}")); + } + + /** This tests if projection rule could reference metadata info correctly with wildcard (*). */ + @ParameterizedTest + @EnumSource + void testMetadataColumnWithWildcard(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, __namespace_name__, __schema_name__, __table_name__", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, default_namespace, default_schema, mytable1], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], after=[], op=DELETE, meta=()}")); + } + + /** + * This tests if transform operator could distinguish metadata column identifiers and string + * literals. + */ + @ParameterizedTest + @EnumSource + void testUsingMetadataColumnLiteralWithWildcard(ValuesDataSink.SinkApi sinkApi) + throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, '__namespace_name____schema_name____table_name__' AS string_literal", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`string_literal` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, __namespace_name____schema_name____table_name__], after=[2, Bob, 30, __namespace_name____schema_name____table_name__], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`string_literal` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], after=[], op=DELETE, meta=()}")); + } + + /** This tests if built-in comparison functions work as expected. */ + @ParameterizedTest + @EnumSource + void testBuiltinComparisonFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, " + + "id = 2 AS col1, id <> 3 AS col2, id > 2 as col3, " + + "id >= 2 as col4, id < 3 as col5, id <= 4 as col6, " + + "name IS NULL as col7, name IS NOT NULL as col8, " + + "id BETWEEN 1 AND 3 as col9, id NOT BETWEEN 2 AND 4 as col10, " + + "name LIKE 'li' as col11, name LIKE 'ro' as col12, " + + "CAST(id AS INT) IN (1, 3, 5) as col13, name IN ('Bob', 'Derrida') AS col14", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, false, true, false, false, true, true, false, true, true, true, true, false, true, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, true, false, true, true, true, false, true, true, false, false, false, false, true], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, true, false, true, true, true, false, true, true, false, false, false, false, true], after=[2, Bob, 30, true, true, false, true, true, true, false, true, true, false, false, false, false, true], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, false, true, true, true, false, true, false, true, true, false, false, true, true, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, false, true, true, true, false, true, false, true, false, false, false, false, false, true], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, false, true, true, true, false, true, false, true, false, false, false, false, false, true], after=[], op=DELETE, meta=()}")); + } + + /** This tests if built-in logical functions work as expected. */ + @ParameterizedTest + @EnumSource + void testBuiltinLogicalFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, " + + "id = 2 OR true as col1, id <> 3 OR false as col2, " + + "name = 'Alice' AND true as col4, name <> 'Bob' AND false as col5, " + + "NOT id = 1 as col6, id = 3 IS FALSE as col7, " + + "name = 'Derrida' IS TRUE as col8, " + + "name <> 'Carol' IS NOT FALSE as col9, " + + "name <> 'Eve' IS NOT TRUE as col10", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, true, true, true, false, false, true, false, true, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, true, false, false, true, true, false, true, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, true, false, false, true, true, false, true, false], after=[2, Bob, 30, true, true, false, false, true, true, false, true, false], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, true, true, false, false, true, true, false, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, true, true, false, false, true, true, true, true, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, true, true, false, false, true, true, true, true, false], after=[], op=DELETE, meta=()}")); + } + + /** This tests if built-in arithmetic functions work as expected. */ + @ParameterizedTest + @EnumSource + void testBuiltinArithmeticFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, " + + "id + 17 AS col1, id - 17 AS col2, id * 17 AS col3, " + + "CAST(id AS DOUBLE) / 1.7 AS col4, " + + "CAST(id AS INT) % 3 AS col5, ABS(id - 17) AS col6, " + + "CEIL(CAST(id AS DOUBLE) / 1.7) AS col7, " + + "FLOOR(CAST(id AS DOUBLE) / 1.7) AS col8, " + + "ROUND(CAST(id AS DOUBLE) / 1.7) AS col9, " + + "CHAR_LENGTH(UUID()) AS col10", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` INT,`col2` INT,`col3` INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 18, -16, 17, 0.5882352941176471, 1, 16, 1.0, 0.0, 1.0, 36], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], after=[2, Bob, 30, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BIGINT,`col2` BIGINT,`col3` BIGINT,`col4` DOUBLE,`col5` INT,`col6` BIGINT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 20, -14, 51, 1.7647058823529411, 0, 14, 2.0, 1.0, 2.0, 36], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 36], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 36], after=[], op=DELETE, meta=()}")); + } + + /** This tests if built-in string functions work as expected. */ + @ParameterizedTest + @EnumSource + void testBuiltinStringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, " + + "'Dear ' || name AS col1, " + + "CHAR_LENGTH(name) AS col2, " + + "UPPER(name) AS col3, " + + "LOWER(name) AS col4, " + + "TRIM(name) AS col5, " + + "REGEXP_REPLACE(name, 'Al|Bo', '**') AS col6, " + + "SUBSTR(name, 0, 1) AS col7, " + + "SUBSTR(name, 2, 1) AS col8, " + + "SUBSTR(name, 3) AS col9, " + + "CONCAT(name, ' - ', CAST(id AS VARCHAR)) AS col10", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, i, ce, Alice - 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, r, ol, Carol - 3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], after=[], op=DELETE, meta=()}")); + } + + @ParameterizedTest + @EnumSource + @Disabled("SUBSTRING ... FROM ... FOR ... isn't available until we close FLINK-35985.") + void testSubstringFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, " + + "SUBSTR(name, 0, 1) AS col1, " + + "SUBSTR(name, 2, 1) AS col2, " + + "SUBSTR(name, 3) AS col3, " + + "SUBSTRING(name FROM 0 FOR 1) AS col4, " + + "SUBSTRING(name FROM 2 FOR 1) AS col5, " + + "SUBSTRING(name FROM 3) AS col6", + null, + null, + null, + null, + null)), + Arrays.asList("To", "be", "added")); + } + + /** This tests if built-in conditional functions work as expected. */ + @ParameterizedTest + @EnumSource + @Disabled("This case will not run until we close FLINK-35986.") + void testConditionalFunctions(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, " + + "CASE UPPER(name)" + + " WHEN 'ALICE' THEN 'A - Alice'" + + " WHEN 'BOB' THEN 'B - Bob'" + + " WHEN 'CAROL' THEN 'C - Carol'" + + " ELSE 'D - Derrida' END AS col1, " + + "CASE" + + " WHEN id = 1 THEN '1 - One'" + + " WHEN id = 2 THEN '2 - Two'" + + " WHEN id = 3 THEN '3 - Three'" + + " ELSE '4 - Four' END AS col2, " + + "COALESCE(name, 'FALLBACK') AS col3, " + + "COALESCE(NULL, NULL, id, 42, NULL) AS col4, " + + "COALESCE(NULL, NULL, NULL, NULL, NULL) AS col5, " + + "IF(TRUE, 'true', 'false') AS col6, " + + "IF(id < 3, 'ID < 3', 'ID >= 3') AS col7, " + + "IF(name = 'Alice', IF(id = 1, 'YES', 'NO'), 'NO') AS col8", + null, + null, + null, + null, + null)), + Arrays.asList("Foo", "Bar", "Baz")); + } + + /** This tests if transform temporal functions works as expected. */ @Test void testTransformWithTemporalFunction() throws Exception { FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); @@ -144,6 +614,117 @@ class FlinkPipelineTransformITCase { .primaryKey("id") .build(); + List events = getTestEvents(table1Schema, table2Schema, myTable1, myTable2); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "*, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt", + null, + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + Arrays.stream(outputEvents).forEach(this::extractDataLines); + } + + void runGenericTransformTest( + ValuesDataSink.SinkApi sinkApi, + List transformDefs, + List expectedResults) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .primaryKey("id") + .build(); + Schema table2Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT()) + .physicalColumn("name", DataTypes.VARCHAR(255)) + .physicalColumn("age", DataTypes.TINYINT()) + .physicalColumn("description", DataTypes.STRING()) + .primaryKey("id") + .build(); + + List events = getTestEvents(table1Schema, table2Schema, myTable1, myTable2); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + transformDefs, + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents).containsExactly(expectedResults.toArray(new String[0])); + } + + private static List getTestEvents( + Schema table1Schema, Schema table2Schema, TableId myTable1, TableId myTable2) { List events = new ArrayList<>(); BinaryRecordDataGenerator table1dataGenerator = new BinaryRecordDataGenerator( @@ -200,48 +781,7 @@ class FlinkPipelineTransformITCase { (byte) 25, BinaryStringData.fromString("student") }))); - - ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); - - SourceDef sourceDef = - new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); - - // Setup value sink - Configuration sinkConfig = new Configuration(); - sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); - SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); - - // Setup pipeline - Configuration pipelineConfig = new Configuration(); - pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); - pipelineConfig.set( - PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); - pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/Los_Angeles"); - PipelineDef pipelineDef = - new PipelineDef( - sourceDef, - sinkDef, - Collections.emptyList(), - Collections.singletonList( - new TransformDef( - "default_namespace.default_schema.\\.*", - "*, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt", - null, - null, - null, - null, - null)), - Collections.emptyList(), - pipelineConfig); - - // Execute the pipeline - PipelineExecution execution = composer.compose(pipelineDef); - execution.execute(); - - // Check the order and content of all received events - String[] outputEvents = outCaptor.toString().trim().split("\n"); - - Arrays.stream(outputEvents).forEach(this::extractDataLines); + return events; } @Test