diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java index ff54eb173..c081178b8 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java @@ -20,7 +20,6 @@ package org.apache.flink.cdc.composer.definition; import org.apache.flink.cdc.common.utils.StringUtils; import java.util.Objects; -import java.util.Optional; /** * Definition of a transformation. @@ -75,24 +74,24 @@ public class TransformDef { return sourceTable; } - public Optional getProjection() { - return Optional.ofNullable(projection); + public String getProjection() { + return projection; } public boolean isValidProjection() { return !StringUtils.isNullOrWhitespaceOnly(projection); } - public Optional getFilter() { - return Optional.ofNullable(filter); + public String getFilter() { + return filter; } public boolean isValidFilter() { return !StringUtils.isNullOrWhitespaceOnly(filter); } - public Optional getDescription() { - return Optional.ofNullable(description); + public String getDescription() { + return description; } public String getPrimaryKeys() { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index c7fb15541..b49cdc114 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -59,8 +59,8 @@ public class TransformTranslator { for (TransformDef transform : transforms) { preTransformFunctionBuilder.addTransform( transform.getSourceTable(), - transform.getProjection().orElse(null), - transform.getFilter().orElse(null), + transform.getProjection(), + transform.getFilter(), transform.getPrimaryKeys(), transform.getPartitionKeys(), transform.getTableOptions(), @@ -98,8 +98,8 @@ public class TransformTranslator { if (transform.isValidProjection() || transform.isValidFilter()) { postTransformFunctionBuilder.addTransform( transform.getSourceTable(), - transform.isValidProjection() ? transform.getProjection().get() : null, - transform.isValidFilter() ? transform.getFilter().get() : null, + transform.getProjection(), + transform.getFilter(), transform.getPrimaryKeys(), transform.getPartitionKeys(), transform.getTableOptions(), diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 6364b6981..2ac394add 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -77,6 +77,7 @@ import java.util.stream.Stream; import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Integration test for {@link FlinkPipelineComposer}. */ class FlinkPipelineTransformITCase { @@ -196,8 +197,8 @@ class FlinkPipelineTransformITCase { null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}")); } @@ -218,9 +219,9 @@ class FlinkPipelineTransformITCase { null, null)), Arrays.asList( - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", - "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}")); @@ -370,6 +371,151 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}")); } + @ParameterizedTest + @EnumSource + @Disabled("to be fixed in FLINK-37132") + void testMultiTransformSchemaColumnsCompatibilityWithNullProjection( + ValuesDataSink.SinkApi sinkApi) { + TransformDef nullProjection = + new TransformDef( + "default_namespace.default_schema.mytable2", + null, + "age < 18", + null, + null, + null, + null, + null); + + assertThatThrownBy( + () -> + runGenericTransformTest( + sinkApi, + Arrays.asList( + nullProjection, + new TransformDef( + "default_namespace.default_schema.mytable2", + // reference part column + "id,UPPER(name) AS name", + "age >= 18", + null, + null, + null, + null, + null)), + Collections.emptyList())) + .rootCause() + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage( + "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " + + "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts."); + } + + @ParameterizedTest + @EnumSource + @Disabled("to be fixed in FLINK-37132") + void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection( + ValuesDataSink.SinkApi sinkApi) { + TransformDef emptyProjection = + new TransformDef( + "default_namespace.default_schema.mytable2", + "", + "age < 18", + null, + null, + null, + null, + null); + + assertThatThrownBy( + () -> + runGenericTransformTest( + sinkApi, + Arrays.asList( + emptyProjection, + new TransformDef( + "default_namespace.default_schema.mytable2", + // reference part column + "id,UPPER(name) AS name", + "age >= 18", + null, + null, + null, + null, + null)), + Collections.emptyList())) + .rootCause() + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage( + "Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() " + + "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts."); + } + + @ParameterizedTest + @EnumSource + void testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi sinkApi) + throws Exception { + TransformDef nullProjection = + new TransformDef( + "default_namespace.default_schema.mytable2", + null, + "age < 18", + null, + null, + null, + null, + null); + + TransformDef emptyProjection = + new TransformDef( + "default_namespace.default_schema.mytable2", + "", + "age < 18", + null, + null, + null, + null, + null); + + TransformDef asteriskProjection = + new TransformDef( + "default_namespace.default_schema.mytable2", + "*", + "age < 18", + null, + null, + null, + null, + null); + + runGenericTransformTest( + sinkApi, + Arrays.asList( + // Setting projection as null, '', or * should be equivalent + nullProjection, + emptyProjection, + asteriskProjection, + new TransformDef( + "default_namespace.default_schema.mytable2", + // reference all column + "id,UPPER(name) AS name,age,description", + "age >= 18", + null, + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}")); + } + /** This tests if transform generates metadata info correctly. */ @ParameterizedTest @EnumSource @@ -1447,7 +1593,7 @@ class FlinkPipelineTransformITCase { assertThat(outputEvents) .containsExactly( // Initial stage - "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}", diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index fd1d1d8c5..845fd4df1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -389,17 +389,8 @@ public class PreTransformOperator extends AbstractStreamOperator if (!transform.getSelectors().isMatch(tableId)) { continue; } - if (!transform.getProjection().isPresent()) { - processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); - hasMatchTransform = true; - } else { - TransformProjection transformProjection = transform.getProjection().get(); - if (transformProjection.isValid()) { - processProjectionTransform( - tableId, tableSchema, referencedColumnsSet, transform); - hasMatchTransform = true; - } - } + processProjectionTransform(tableId, tableSchema, referencedColumnsSet, transform); + hasMatchTransform = true; } if (!hasMatchTransform) { processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java index 31a245257..d4459e5dd 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.cdc.common.source.SupportedMetadataColumn; +import org.apache.flink.cdc.common.utils.StringUtils; import javax.annotation.Nullable; @@ -48,7 +49,7 @@ public class TransformRule implements Serializable { @Nullable String postTransformConverter, SupportedMetadataColumn[] supportedMetadataColumns) { this.tableInclusions = tableInclusions; - this.projection = projection; + this.projection = StringUtils.isNullOrWhitespaceOnly(projection) ? "*" : projection; this.filter = normalizeFilter(projection, filter); this.primaryKey = primaryKey; this.partitionKey = partitionKey;