[FLINK-36754][transform] Projection should treated as an asterisk when projection expression is empty or null

This closes  #3749
pull/3623/head
MOBIN 2 weeks ago committed by GitHub
parent e2b8f70e0d
commit d83ccbeeb4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -20,7 +20,6 @@ package org.apache.flink.cdc.composer.definition;
import org.apache.flink.cdc.common.utils.StringUtils;
import java.util.Objects;
import java.util.Optional;
/**
* Definition of a transformation.
@ -75,24 +74,24 @@ public class TransformDef {
return sourceTable;
}
public Optional<String> getProjection() {
return Optional.ofNullable(projection);
public String getProjection() {
return projection;
}
public boolean isValidProjection() {
return !StringUtils.isNullOrWhitespaceOnly(projection);
}
public Optional<String> getFilter() {
return Optional.ofNullable(filter);
public String getFilter() {
return filter;
}
public boolean isValidFilter() {
return !StringUtils.isNullOrWhitespaceOnly(filter);
}
public Optional<String> getDescription() {
return Optional.ofNullable(description);
public String getDescription() {
return description;
}
public String getPrimaryKeys() {

@ -59,8 +59,8 @@ public class TransformTranslator {
for (TransformDef transform : transforms) {
preTransformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.getProjection().orElse(null),
transform.getFilter().orElse(null),
transform.getProjection(),
transform.getFilter(),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
@ -98,8 +98,8 @@ public class TransformTranslator {
if (transform.isValidProjection() || transform.isValidFilter()) {
postTransformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.isValidProjection() ? transform.getProjection().get() : null,
transform.isValidFilter() ? transform.getFilter().get() : null,
transform.getProjection(),
transform.getFilter(),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),

@ -77,6 +77,7 @@ import java.util.stream.Stream;
import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Integration test for {@link FlinkPipelineComposer}. */
class FlinkPipelineTransformITCase {
@ -196,8 +197,8 @@ class FlinkPipelineTransformITCase {
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}"));
}
@ -218,9 +219,9 @@ class FlinkPipelineTransformITCase {
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@ -370,6 +371,151 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
}
@ParameterizedTest
@EnumSource
@Disabled("to be fixed in FLINK-37132")
void testMultiTransformSchemaColumnsCompatibilityWithNullProjection(
ValuesDataSink.SinkApi sinkApi) {
TransformDef nullProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null,
null);
assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
nullProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
"id,UPPER(name) AS name",
"age >= 18",
null,
null,
null,
null,
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts.");
}
@ParameterizedTest
@EnumSource
@Disabled("to be fixed in FLINK-37132")
void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection(
ValuesDataSink.SinkApi sinkApi) {
TransformDef emptyProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"",
"age < 18",
null,
null,
null,
null,
null);
assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
emptyProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
"id,UPPER(name) AS name",
"age >= 18",
null,
null,
null,
null,
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts.");
}
@ParameterizedTest
@EnumSource
void testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi sinkApi)
throws Exception {
TransformDef nullProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null,
null);
TransformDef emptyProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"",
"age < 18",
null,
null,
null,
null,
null);
TransformDef asteriskProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"*",
"age < 18",
null,
null,
null,
null,
null);
runGenericTransformTest(
sinkApi,
Arrays.asList(
// Setting projection as null, '', or * should be equivalent
nullProjection,
emptyProjection,
asteriskProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference all column
"id,UPPER(name) AS name,age,description",
"age >= 18",
null,
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
}
/** This tests if transform generates metadata info correctly. */
@ParameterizedTest
@EnumSource
@ -1447,7 +1593,7 @@ class FlinkPipelineTransformITCase {
assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",

@ -389,17 +389,8 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
if (!transform.getSelectors().isMatch(tableId)) {
continue;
}
if (!transform.getProjection().isPresent()) {
processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null);
hasMatchTransform = true;
} else {
TransformProjection transformProjection = transform.getProjection().get();
if (transformProjection.isValid()) {
processProjectionTransform(
tableId, tableSchema, referencedColumnsSet, transform);
hasMatchTransform = true;
}
}
processProjectionTransform(tableId, tableSchema, referencedColumnsSet, transform);
hasMatchTransform = true;
}
if (!hasMatchTransform) {
processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null);

@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.operators.transform;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.utils.StringUtils;
import javax.annotation.Nullable;
@ -48,7 +49,7 @@ public class TransformRule implements Serializable {
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
this.tableInclusions = tableInclusions;
this.projection = projection;
this.projection = StringUtils.isNullOrWhitespaceOnly(projection) ? "*" : projection;
this.filter = normalizeFilter(projection, filter);
this.primaryKey = primaryKey;
this.partitionKey = partitionKey;

Loading…
Cancel
Save