@ -41,6 +41,7 @@ import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef ;
import org.apache.flink.cdc.composer.definition.TransformDef ;
import org.apache.flink.cdc.connectors.values.ValuesDatabase ;
import org.apache.flink.cdc.connectors.values.ValuesDatabase ;
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory ;
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory ;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink ;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions ;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions ;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper ;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper ;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions ;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions ;
@ -52,8 +53,11 @@ import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterEach ;
import org.junit.jupiter.api.AfterEach ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.BeforeEach ;
import org.junit.jupiter.api.Disabled ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.api.extension.RegisterExtension ;
import org.junit.jupiter.api.extension.RegisterExtension ;
import org.junit.jupiter.params.ParameterizedTest ;
import org.junit.jupiter.params.provider.EnumSource ;
import java.io.ByteArrayOutputStream ;
import java.io.ByteArrayOutputStream ;
import java.io.PrintStream ;
import java.io.PrintStream ;
@ -116,6 +120,472 @@ class FlinkPipelineTransformITCase {
System . setOut ( standardOut ) ;
System . setOut ( standardOut ) ;
}
}
/** This tests if we can append calculated columns based on existing columns. */
@ParameterizedTest
@EnumSource
void testCalculatedColumns ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, id || name AS uid, age * 2 AS double_age" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 1Alice, 36], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 2Bob, 40], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 2Bob, 40], after=[2, Bob, 30, 2Bob, 60], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 3Carol, 30], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 4Derrida, 50], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 4Derrida, 50], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if we can reference a column more than once in projection expressions. */
@ParameterizedTest
@EnumSource
void testMultipleReferencedColumnsInProjection ( ValuesDataSink . SinkApi sinkApi )
throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, CAST(age * age * age AS INT) AS cubic_age" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`cubic_age` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 5832], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 8000], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 8000], after=[2, Bob, 30, 27000], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`cubic_age` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 3375], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 15625], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 15625], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if we can reference a column more than once in filtering expressions. */
@ParameterizedTest
@EnumSource
void testMultipleReferencedColumnsInFilter ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
null ,
"id > 2 AND id < 4" ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}" ) ) ;
}
/** This tests if we can filter out source records by rule. */
@ParameterizedTest
@EnumSource
void testFilteringRules ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
null ,
"CHAR_LENGTH(name) > 3" ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}" ) ) ;
}
/ * *
* This tests if transform rule could be used to classify source records based on filtering
* rules .
* /
@ParameterizedTest
@EnumSource
void testMultipleDispatchTransform ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Arrays . asList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, 'YOUNG' AS category" ,
"age < 20" ,
null ,
null ,
null ,
null ) ,
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, 'OLD' AS category" ,
"age >= 20" ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`category` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, YOUNG], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, OLD], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, OLD], after=[2, Bob, 30, OLD], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`category` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, YOUNG], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, OLD], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if transform generates metadata info correctly. */
@ParameterizedTest
@EnumSource
void testMetadataInfo ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*" ,
null ,
"id,name" ,
"id" ,
"replication_num=1,bucket=17" ,
"Just a Transform Block" ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}" ) ) ;
}
/ * *
* This tests if transform generates metadata info correctly without specifying projection /
* filtering rules .
* /
@ParameterizedTest
@EnumSource
@Disabled ( "This doesn't work until FLINK-35982 got fixed." )
void testMetadataInfoWithoutChangingSchema ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
null ,
null ,
"id,name" ,
"id" ,
"replication_num=1,bucket=17" ,
"Just a Transform Block" ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, replication_num=1})}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if projection rule could reference metadata info correctly. */
@ParameterizedTest
@EnumSource
void testMetadataColumn ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"id, name, age, __namespace_name__, __schema_name__, __table_name__" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, default_namespace, default_schema, mytable1], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, default_namespace, default_schema, mytable2], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, default_namespace, default_schema, mytable2], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, default_namespace, default_schema, mytable2], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if projection rule could reference metadata info correctly with wildcard (*). */
@ParameterizedTest
@EnumSource
void testMetadataColumnWithWildcard ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, __namespace_name__, __schema_name__, __table_name__" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, default_namespace, default_schema, mytable1], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, default_namespace, default_schema, mytable2], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], after=[], op=DELETE, meta=()}" ) ) ;
}
/ * *
* This tests if transform operator could distinguish metadata column identifiers and string
* literals .
* /
@ParameterizedTest
@EnumSource
void testUsingMetadataColumnLiteralWithWildcard ( ValuesDataSink . SinkApi sinkApi )
throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, '__namespace_name____schema_name____table_name__' AS string_literal" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`string_literal` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, __namespace_name____schema_name____table_name__], after=[2, Bob, 30, __namespace_name____schema_name____table_name__], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`string_literal` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, __namespace_name____schema_name____table_name__], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if built-in comparison functions work as expected. */
@ParameterizedTest
@EnumSource
void testBuiltinComparisonFunctions ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, "
+ "id = 2 AS col1, id <> 3 AS col2, id > 2 as col3, "
+ "id >= 2 as col4, id < 3 as col5, id <= 4 as col6, "
+ "name IS NULL as col7, name IS NOT NULL as col8, "
+ "id BETWEEN 1 AND 3 as col9, id NOT BETWEEN 2 AND 4 as col10, "
+ "name LIKE 'li' as col11, name LIKE 'ro' as col12, "
+ "CAST(id AS INT) IN (1, 3, 5) as col13, name IN ('Bob', 'Derrida') AS col14" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, false, true, false, false, true, true, false, true, true, true, true, false, true, false], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, true, false, true, true, true, false, true, true, false, false, false, false, true], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, true, false, true, true, true, false, true, true, false, false, false, false, true], after=[2, Bob, 30, true, true, false, true, true, true, false, true, true, false, false, false, false, true], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, false, true, true, true, false, true, false, true, true, false, false, true, true, false], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, false, true, true, true, false, true, false, true, false, false, false, false, false, true], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, false, true, true, true, false, true, false, true, false, false, false, false, false, true], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if built-in logical functions work as expected. */
@ParameterizedTest
@EnumSource
void testBuiltinLogicalFunctions ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, "
+ "id = 2 OR true as col1, id <> 3 OR false as col2, "
+ "name = 'Alice' AND true as col4, name <> 'Bob' AND false as col5, "
+ "NOT id = 1 as col6, id = 3 IS FALSE as col7, "
+ "name = 'Derrida' IS TRUE as col8, "
+ "name <> 'Carol' IS NOT FALSE as col9, "
+ "name <> 'Eve' IS NOT TRUE as col10" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, true, true, true, false, false, true, false, true, false], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, true, false, false, true, true, false, true, false], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, true, false, false, true, true, false, true, false], after=[2, Bob, 30, true, true, false, false, true, true, false, true, false], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, true, true, false, false, true, true, false, false, false], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, true, true, false, false, true, true, true, true, false], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, true, true, false, false, true, true, true, true, false], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if built-in arithmetic functions work as expected. */
@ParameterizedTest
@EnumSource
void testBuiltinArithmeticFunctions ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, "
+ "id + 17 AS col1, id - 17 AS col2, id * 17 AS col3, "
+ "CAST(id AS DOUBLE) / 1.7 AS col4, "
+ "CAST(id AS INT) % 3 AS col5, ABS(id - 17) AS col6, "
+ "CEIL(CAST(id AS DOUBLE) / 1.7) AS col7, "
+ "FLOOR(CAST(id AS DOUBLE) / 1.7) AS col8, "
+ "ROUND(CAST(id AS DOUBLE) / 1.7) AS col9, "
+ "CHAR_LENGTH(UUID()) AS col10" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` INT,`col2` INT,`col3` INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, 18, -16, 17, 0.5882352941176471, 1, 16, 1.0, 0.0, 1.0, 36], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], after=[2, Bob, 30, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` BIGINT,`col2` BIGINT,`col3` BIGINT,`col4` DOUBLE,`col5` INT,`col6` BIGINT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, 20, -14, 51, 1.7647058823529411, 0, 14, 2.0, 1.0, 2.0, 36], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 36], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 36], after=[], op=DELETE, meta=()}" ) ) ;
}
/** This tests if built-in string functions work as expected. */
@ParameterizedTest
@EnumSource
void testBuiltinStringFunctions ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, "
+ "'Dear ' || name AS col1, "
+ "CHAR_LENGTH(name) AS col2, "
+ "UPPER(name) AS col3, "
+ "LOWER(name) AS col4, "
+ "TRIM(name) AS col5, "
+ "REGEXP_REPLACE(name, 'Al|Bo', '**') AS col6, "
+ "SUBSTR(name, 0, 1) AS col7, "
+ "SUBSTR(name, 2, 1) AS col8, "
+ "SUBSTR(name, 3) AS col9, "
+ "CONCAT(name, ' - ', CAST(id AS VARCHAR)) AS col10" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList (
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, i, ce, Alice - 1], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], after=[2, Bob, 30, Dear Bob, 3, BOB, bob, Bob, **b, B, b, , Bob - 2], op=UPDATE, meta=()}" ,
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, options=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, r, ol, Carol - 3], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], op=INSERT, meta=()}" ,
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, r, rida, Derrida - 4], after=[], op=DELETE, meta=()}" ) ) ;
}
@ParameterizedTest
@EnumSource
@Disabled ( "SUBSTRING ... FROM ... FOR ... isn't available until we close FLINK-35985." )
void testSubstringFunctions ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, "
+ "SUBSTR(name, 0, 1) AS col1, "
+ "SUBSTR(name, 2, 1) AS col2, "
+ "SUBSTR(name, 3) AS col3, "
+ "SUBSTRING(name FROM 0 FOR 1) AS col4, "
+ "SUBSTRING(name FROM 2 FOR 1) AS col5, "
+ "SUBSTRING(name FROM 3) AS col6" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList ( "To" , "be" , "added" ) ) ;
}
/** This tests if built-in conditional functions work as expected. */
@ParameterizedTest
@EnumSource
@Disabled ( "This case will not run until we close FLINK-35986." )
void testConditionalFunctions ( ValuesDataSink . SinkApi sinkApi ) throws Exception {
runGenericTransformTest (
sinkApi ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, "
+ "CASE UPPER(name)"
+ " WHEN 'ALICE' THEN 'A - Alice'"
+ " WHEN 'BOB' THEN 'B - Bob'"
+ " WHEN 'CAROL' THEN 'C - Carol'"
+ " ELSE 'D - Derrida' END AS col1, "
+ "CASE"
+ " WHEN id = 1 THEN '1 - One'"
+ " WHEN id = 2 THEN '2 - Two'"
+ " WHEN id = 3 THEN '3 - Three'"
+ " ELSE '4 - Four' END AS col2, "
+ "COALESCE(name, 'FALLBACK') AS col3, "
+ "COALESCE(NULL, NULL, id, 42, NULL) AS col4, "
+ "COALESCE(NULL, NULL, NULL, NULL, NULL) AS col5, "
+ "IF(TRUE, 'true', 'false') AS col6, "
+ "IF(id < 3, 'ID < 3', 'ID >= 3') AS col7, "
+ "IF(name = 'Alice', IF(id = 1, 'YES', 'NO'), 'NO') AS col8" ,
null ,
null ,
null ,
null ,
null ) ) ,
Arrays . asList ( "Foo" , "Bar" , "Baz" ) ) ;
}
/** This tests if transform temporal functions works as expected. */
@Test
@Test
void testTransformWithTemporalFunction ( ) throws Exception {
void testTransformWithTemporalFunction ( ) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer . ofMiniCluster ( ) ;
FlinkPipelineComposer composer = FlinkPipelineComposer . ofMiniCluster ( ) ;
@ -144,6 +614,117 @@ class FlinkPipelineTransformITCase {
. primaryKey ( "id" )
. primaryKey ( "id" )
. build ( ) ;
. build ( ) ;
List < Event > events = getTestEvents ( table1Schema , table2Schema , myTable1 , myTable2 ) ;
ValuesDataSourceHelper . setSourceEvents ( Collections . singletonList ( events ) ) ;
SourceDef sourceDef =
new SourceDef ( ValuesDataFactory . IDENTIFIER , "Value Source" , sourceConfig ) ;
// Setup value sink
Configuration sinkConfig = new Configuration ( ) ;
sinkConfig . set ( ValuesDataSinkOptions . MATERIALIZED_IN_MEMORY , true ) ;
SinkDef sinkDef = new SinkDef ( ValuesDataFactory . IDENTIFIER , "Value Sink" , sinkConfig ) ;
// Setup pipeline
Configuration pipelineConfig = new Configuration ( ) ;
pipelineConfig . set ( PipelineOptions . PIPELINE_PARALLELISM , 1 ) ;
pipelineConfig . set ( PipelineOptions . PIPELINE_LOCAL_TIME_ZONE , "America/Los_Angeles" ) ;
PipelineDef pipelineDef =
new PipelineDef (
sourceDef ,
sinkDef ,
Collections . emptyList ( ) ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt" ,
null ,
null ,
null ,
null ,
null ) ) ,
Collections . emptyList ( ) ,
pipelineConfig ) ;
// Execute the pipeline
PipelineExecution execution = composer . compose ( pipelineDef ) ;
execution . execute ( ) ;
// Check the order and content of all received events
String [ ] outputEvents = outCaptor . toString ( ) . trim ( ) . split ( "\n" ) ;
Arrays . stream ( outputEvents ) . forEach ( this : : extractDataLines ) ;
}
void runGenericTransformTest (
ValuesDataSink . SinkApi sinkApi ,
List < TransformDef > transformDefs ,
List < String > expectedResults )
throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer . ofMiniCluster ( ) ;
// Setup value source
Configuration sourceConfig = new Configuration ( ) ;
sourceConfig . set (
ValuesDataSourceOptions . EVENT_SET_ID ,
ValuesDataSourceHelper . EventSetId . CUSTOM_SOURCE_EVENTS ) ;
TableId myTable1 = TableId . tableId ( "default_namespace" , "default_schema" , "mytable1" ) ;
TableId myTable2 = TableId . tableId ( "default_namespace" , "default_schema" , "mytable2" ) ;
Schema table1Schema =
Schema . newBuilder ( )
. physicalColumn ( "id" , DataTypes . INT ( ) )
. physicalColumn ( "name" , DataTypes . STRING ( ) )
. physicalColumn ( "age" , DataTypes . INT ( ) )
. primaryKey ( "id" )
. build ( ) ;
Schema table2Schema =
Schema . newBuilder ( )
. physicalColumn ( "id" , DataTypes . BIGINT ( ) )
. physicalColumn ( "name" , DataTypes . VARCHAR ( 255 ) )
. physicalColumn ( "age" , DataTypes . TINYINT ( ) )
. physicalColumn ( "description" , DataTypes . STRING ( ) )
. primaryKey ( "id" )
. build ( ) ;
List < Event > events = getTestEvents ( table1Schema , table2Schema , myTable1 , myTable2 ) ;
ValuesDataSourceHelper . setSourceEvents ( Collections . singletonList ( events ) ) ;
SourceDef sourceDef =
new SourceDef ( ValuesDataFactory . IDENTIFIER , "Value Source" , sourceConfig ) ;
// Setup value sink
Configuration sinkConfig = new Configuration ( ) ;
sinkConfig . set ( ValuesDataSinkOptions . MATERIALIZED_IN_MEMORY , true ) ;
sinkConfig . set ( ValuesDataSinkOptions . SINK_API , sinkApi ) ;
SinkDef sinkDef = new SinkDef ( ValuesDataFactory . IDENTIFIER , "Value Sink" , sinkConfig ) ;
// Setup pipeline
Configuration pipelineConfig = new Configuration ( ) ;
pipelineConfig . set ( PipelineOptions . PIPELINE_PARALLELISM , 1 ) ;
PipelineDef pipelineDef =
new PipelineDef (
sourceDef ,
sinkDef ,
Collections . emptyList ( ) ,
transformDefs ,
Collections . emptyList ( ) ,
pipelineConfig ) ;
// Execute the pipeline
PipelineExecution execution = composer . compose ( pipelineDef ) ;
execution . execute ( ) ;
// Check the order and content of all received events
String [ ] outputEvents = outCaptor . toString ( ) . trim ( ) . split ( "\n" ) ;
assertThat ( outputEvents ) . containsExactly ( expectedResults . toArray ( new String [ 0 ] ) ) ;
}
private static List < Event > getTestEvents (
Schema table1Schema , Schema table2Schema , TableId myTable1 , TableId myTable2 ) {
List < Event > events = new ArrayList < > ( ) ;
List < Event > events = new ArrayList < > ( ) ;
BinaryRecordDataGenerator table1dataGenerator =
BinaryRecordDataGenerator table1dataGenerator =
new BinaryRecordDataGenerator (
new BinaryRecordDataGenerator (
@ -200,48 +781,7 @@ class FlinkPipelineTransformITCase {
( byte ) 25 ,
( byte ) 25 ,
BinaryStringData . fromString ( "student" )
BinaryStringData . fromString ( "student" )
} ) ) ) ;
} ) ) ) ;
return events ;
ValuesDataSourceHelper . setSourceEvents ( Collections . singletonList ( events ) ) ;
SourceDef sourceDef =
new SourceDef ( ValuesDataFactory . IDENTIFIER , "Value Source" , sourceConfig ) ;
// Setup value sink
Configuration sinkConfig = new Configuration ( ) ;
sinkConfig . set ( ValuesDataSinkOptions . MATERIALIZED_IN_MEMORY , true ) ;
SinkDef sinkDef = new SinkDef ( ValuesDataFactory . IDENTIFIER , "Value Sink" , sinkConfig ) ;
// Setup pipeline
Configuration pipelineConfig = new Configuration ( ) ;
pipelineConfig . set ( PipelineOptions . PIPELINE_PARALLELISM , 1 ) ;
pipelineConfig . set (
PipelineOptions . PIPELINE_SCHEMA_CHANGE_BEHAVIOR , SchemaChangeBehavior . EVOLVE ) ;
pipelineConfig . set ( PipelineOptions . PIPELINE_LOCAL_TIME_ZONE , "America/Los_Angeles" ) ;
PipelineDef pipelineDef =
new PipelineDef (
sourceDef ,
sinkDef ,
Collections . emptyList ( ) ,
Collections . singletonList (
new TransformDef (
"default_namespace.default_schema.\\.*" ,
"*, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt" ,
null ,
null ,
null ,
null ,
null ) ) ,
Collections . emptyList ( ) ,
pipelineConfig ) ;
// Execute the pipeline
PipelineExecution execution = composer . compose ( pipelineDef ) ;
execution . execute ( ) ;
// Check the order and content of all received events
String [ ] outputEvents = outCaptor . toString ( ) . trim ( ) . split ( "\n" ) ;
Arrays . stream ( outputEvents ) . forEach ( this : : extractDataLines ) ;
}
}
@Test
@Test