[FLINK-36474][route] Support merging timestamp columns when routing

This closes  #3636.
pull/3596/head
yuxiqian 3 months ago committed by GitHub
parent e692e7b4bc
commit 84f97ed4c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -33,6 +33,9 @@ import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import javax.annotation.Nullable;
@ -176,6 +179,24 @@ public class SchemaUtils {
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType instanceof TimestampType && rType instanceof TimestampType) {
return DataTypes.TIMESTAMP(
Math.max(
((TimestampType) lType).getPrecision(),
((TimestampType) rType).getPrecision()));
} else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) {
return DataTypes.TIMESTAMP_TZ(
Math.max(
((ZonedTimestampType) lType).getPrecision(),
((ZonedTimestampType) rType).getPrecision()));
} else if (lType instanceof LocalZonedTimestampType
&& rType instanceof LocalZonedTimestampType) {
return DataTypes.TIMESTAMP_LTZ(
Math.max(
((LocalZonedTimestampType) lType).getPrecision(),
((LocalZonedTimestampType) rType).getPrecision()));
} else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) {
return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
@ -185,7 +206,7 @@ public class SchemaUtils {
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
} else if (lType instanceof DecimalType && rType instanceof DecimalType) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
@ -195,7 +216,7 @@ public class SchemaUtils {
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
@ -204,7 +225,7 @@ public class SchemaUtils {
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =

@ -291,6 +291,35 @@ public class SchemaUtilsTest {
DataTypes.INT().nullable(), DataTypes.INT().nullable()))
.isEqualTo(DataTypes.INT().nullable());
// Test merging temporal types
Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), DataTypes.TIMESTAMP(6)))
.isEqualTo(DataTypes.TIMESTAMP(9));
Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(7)))
.isEqualTo(DataTypes.TIMESTAMP_TZ(7));
Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(2), DataTypes.TIMESTAMP_LTZ(1)))
.isEqualTo(DataTypes.TIMESTAMP_LTZ(2));
Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP(9));
Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP(9));
Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_TZ()))
.isEqualTo(DataTypes.TIMESTAMP(9));
// incompatible type merges test
Assertions.assertThatThrownBy(
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE()))

@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@ -88,17 +89,19 @@ public class FlinkPipelineComposer implements PipelineComposer {
@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
Configuration pipelineDefConfig = pipelineDef.getConfig();
int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);
SchemaChangeBehavior schemaChangeBehavior =
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
pipelineDef.getSource(), env, pipelineDefConfig, parallelism);
// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
@ -110,10 +113,9 @@ public class FlinkPipelineComposer implements PipelineComposer {
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDef
.getConfig()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
@ -122,13 +124,13 @@ public class FlinkPipelineComposer implements PipelineComposer {
transformTranslator.translatePostTransform(
stream,
pipelineDef.getTransforms(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDef.getUdfs());
// Build DataSink in advance as schema operator requires MetadataApplier
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);
stream =
schemaOperatorTranslator.translate(
@ -157,7 +159,7 @@ public class FlinkPipelineComposer implements PipelineComposer {
addFrameworkJars();
return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), isBlocking);
}
private void addFrameworkJars() {

@ -19,7 +19,6 @@ package org.apache.flink.cdc.composer.flink.translator;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@ -39,16 +38,18 @@ import java.util.List;
public class SchemaOperatorTranslator {
private final SchemaChangeBehavior schemaChangeBehavior;
private final String schemaOperatorUid;
private final Duration rpcTimeOut;
private final String timezone;
public SchemaOperatorTranslator(
SchemaChangeBehavior schemaChangeBehavior,
String schemaOperatorUid,
Duration rpcTimeOut) {
Duration rpcTimeOut,
String timezone) {
this.schemaChangeBehavior = schemaChangeBehavior;
this.schemaOperatorUid = schemaOperatorUid;
this.rpcTimeOut = rpcTimeOut;
this.timezone = timezone;
}
public DataStream<Event> translate(
@ -56,7 +57,8 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
return addSchemaOperator(input, parallelism, metadataApplier, routes, schemaChangeBehavior);
return addSchemaOperator(
input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone);
}
public String getSchemaOperatorUid() {
@ -68,7 +70,8 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes,
SchemaChangeBehavior schemaChangeBehavior) {
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
List<RouteRule> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
@ -82,27 +85,12 @@ public class SchemaOperatorTranslator {
"SchemaOperator",
new EventTypeInfo(),
new SchemaOperatorFactory(
metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior));
metadataApplier,
routingRules,
rpcTimeOut,
schemaChangeBehavior,
timezone));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}
private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, int parallelism) {
return input.filter(event -> !(event instanceof SchemaChangeEvent))
.setParallelism(parallelism);
}
private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, int parallelism) {
return input.map(
event -> {
if (event instanceof SchemaChangeEvent) {
throw new RuntimeException(
String.format(
"Aborting execution as the pipeline encountered a schema change event: %s",
event));
}
return event;
})
.setParallelism(parallelism);
}
}

@ -18,6 +18,10 @@
package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
@ -57,10 +61,16 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
@ -1092,4 +1102,199 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
@ParameterizedTest
@EnumSource
void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkApi)
throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
List<Event> events = generateTemporalColumnEvents("default_table_");
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York");
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Arrays.asList(
new RouteDef(
"default_namespace.default_schema.default_table_ts_\\.*",
"default_namespace.default_schema.default_table_timestamp_merged",
null,
"Merge timestamp columns with different precision"),
new RouteDef(
"default_namespace.default_schema.default_table_tz_\\.*",
"default_namespace.default_schema.default_table_zoned_timestamp_merged",
null,
"Merge timestamp_tz columns with different precision"),
new RouteDef(
"default_namespace.default_schema.default_table_ltz_\\.*",
"default_namespace.default_schema.default_table_local_zoned_timestamp_merged",
null,
"Merge timestamp_ltz columns with different precision"),
new RouteDef(
"default_namespace.default_schema.default_table_\\.*",
"default_namespace.default_schema.default_everything_merged",
null,
"Merge all timestamp family columns with different precision")),
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);
// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();
// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] expected =
Stream.of(
// Merging timestamp with different precision
"CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}",
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
// Merging zoned timestamp with different precision
"CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}}",
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
// Merging local-zoned timestamp with different precision
"CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}}",
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
// Merging all
"CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[3, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[4, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[5, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[6, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[103, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[104, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[105, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[106, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}")
.map(s -> s.replace("{}", "default_namespace.default_schema.default"))
.toArray(String[]::new);
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
}
private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
List<Event> events = new ArrayList<>();
// Initialize schemas
List<String> names = Arrays.asList("ts_0", "ts_9", "tz_0", "tz_9", "ltz_0", "ltz_9");
List<DataType> types =
Arrays.asList(
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(9),
DataTypes.TIMESTAMP_TZ(0),
DataTypes.TIMESTAMP_TZ(9),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(9));
Instant lowPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57Z");
Instant highPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57.123456789Z");
List<Object> values =
Arrays.asList(
TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))),
TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))),
ZonedTimestampData.fromZonedDateTime(
ZonedDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))),
ZonedTimestampData.fromZonedDateTime(
ZonedDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))),
LocalZonedTimestampData.fromInstant(lowPrecisionTimestamp),
LocalZonedTimestampData.fromInstant(highPrecisionTimestamp));
List<Schema> schemas =
types.stream()
.map(
temporalColumnType ->
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.physicalColumn("birthday", temporalColumnType)
.primaryKey("id")
.build())
.collect(Collectors.toList());
for (int i = 0; i < names.size(); i++) {
TableId generatedTableId =
TableId.tableId(
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
Schema generatedSchema = schemas.get(i);
events.add(new CreateTableEvent(generatedTableId, generatedSchema));
events.add(
DataChangeEvent.insertEvent(
generatedTableId,
generate(generatedSchema, 1 + i, "Alice", 17, values.get(i))));
}
for (int i = 0; i < names.size(); i++) {
TableId generatedTableId =
TableId.tableId(
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
Schema generatedSchema = schemas.get(i);
events.add(
DataChangeEvent.insertEvent(
generatedTableId,
generate(generatedSchema, 101 + i, "Zen", 19, values.get(i))));
}
return events;
}
BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Arrays.stream(fields)
.map(
e ->
(e instanceof String)
? BinaryStringData.fromString((String) e)
: e)
.toArray());
}
}

@ -417,7 +417,8 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase {
new SchemaOperatorTranslator(
SchemaChangeBehavior.EVOLVE,
"$$_schema_operator_$$",
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT,
"UTC");
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());

@ -366,7 +366,8 @@ public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase {
new SchemaOperatorTranslator(
SchemaChangeBehavior.EVOLVE,
"$$_schema_operator_$$",
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT,
"UTC");
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());

@ -115,7 +115,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
false,
Collections.emptyList(),
Arrays.asList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"",
"java.lang.IllegalStateException: Incompatible types found for column `age`: \"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

@ -115,7 +115,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
false,
Collections.emptyList(),
Arrays.asList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"",
"java.lang.IllegalStateException: Incompatible types found for column `age`: \"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

@ -20,8 +20,11 @@ package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.Event;
@ -72,6 +75,8 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -97,6 +102,8 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private final List<RouteRule> routingRules;
private final String timezone;
/**
* Storing route source table selector, sink table name (before symbol replacement), and replace
* symbol in a tuple.
@ -127,6 +134,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
this.timezone = "UTC";
}
@VisibleForTesting
@ -135,8 +143,10 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
this.timezone = "UTC";
}
@VisibleForTesting
public SchemaOperator(
List<RouteRule> routingRules,
Duration rpcTimeOut,
@ -145,6 +155,19 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
this.schemaChangeBehavior = schemaChangeBehavior;
this.timezone = "UTC";
}
public SchemaOperator(
List<RouteRule> routingRules,
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
this.routingRules = routingRules;
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
this.schemaChangeBehavior = schemaChangeBehavior;
this.timezone = timezone;
}
@Override
@ -372,7 +395,11 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
} else {
fieldGetters.add(
new TypeCoercionFieldGetter(
column.getType(), fieldGetter, tolerantMode));
originalSchema.getColumn(columnName).get().getType(),
column.getType(),
fieldGetter,
tolerantMode,
timezone));
}
}
}
@ -541,17 +568,23 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
}
private static class TypeCoercionFieldGetter implements RecordData.FieldGetter {
private final DataType originalType;
private final DataType destinationType;
private final RecordData.FieldGetter originalFieldGetter;
private final boolean tolerantMode;
private final String timezone;
public TypeCoercionFieldGetter(
DataType originalType,
DataType destinationType,
RecordData.FieldGetter originalFieldGetter,
boolean tolerantMode) {
boolean tolerantMode,
String timezone) {
this.originalType = originalType;
this.destinationType = destinationType;
this.originalFieldGetter = originalFieldGetter;
this.tolerantMode = tolerantMode;
this.timezone = timezone;
}
private Object fail(IllegalArgumentException e) throws IllegalArgumentException {
@ -609,6 +642,21 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
+ "Currently only CHAR / VARCHAR can be accepted by a STRING column",
originalField.getClass())));
}
} else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
&& originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
// For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no
// difference in its internal representation, so there's no need to do any precision
// conversion.
return originalField;
} else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
&& originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
return originalField;
} else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
&& originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
return originalField;
} else if (destinationType.is(DataTypeFamily.TIMESTAMP)
&& originalType.is(DataTypeFamily.TIMESTAMP)) {
return castToTimestamp(originalField, timezone);
} else {
return fail(
new IllegalArgumentException(
@ -624,4 +672,23 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
// Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement
// is guaranteed not to be mixed together.
}
private static TimestampData castToTimestamp(Object object, String timezone) {
if (object == null) {
return null;
}
if (object instanceof LocalZonedTimestampData) {
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(
((LocalZonedTimestampData) object).toInstant(), ZoneId.of(timezone)));
} else if (object instanceof ZonedTimestampData) {
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(
((ZonedTimestampData) object).toInstant(), ZoneId.of(timezone)));
} else {
throw new IllegalArgumentException(
String.format(
"Unable to implicitly coerce object `%s` as a TIMESTAMP.", object));
}
}
}

@ -47,8 +47,9 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior) {
super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior));
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior, timezone));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
this.schemaChangeBehavior = schemaChangeBehavior;

@ -31,9 +31,8 @@ import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@ -254,7 +253,9 @@ public class SchemaDerivation {
// Check type compatibility
DataType widerType =
getWiderType(
existedColumnInDerivedTable.getType(), dataType);
columnName,
existedColumnInDerivedTable.getType(),
dataType);
if (!widerType.equals(existedColumnInDerivedTable.getType())) {
typeDifference.put(
existedColumnInDerivedTable.getName(), widerType);
@ -289,6 +290,7 @@ public class SchemaDerivation {
.equals(addedColumn.getAddColumn().getType())) {
DataType widerType =
getWiderType(
existedColumnInDerivedTable.getName(),
existedColumnInDerivedTable.getType(),
addedColumn.getAddColumn().getType());
if (!widerType.equals(existedColumnInDerivedTable.getType())) {
@ -325,7 +327,10 @@ public class SchemaDerivation {
Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get();
if (!existedColumnInDerivedTable.getType().equals(column.getType())) {
DataType widerType =
getWiderType(existedColumnInDerivedTable.getType(), column.getType());
getWiderType(
existedColumnInDerivedTable.getName(),
existedColumnInDerivedTable.getType(),
column.getType());
if (!widerType.equals(existedColumnInDerivedTable.getType())) {
newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType);
}
@ -343,23 +348,14 @@ public class SchemaDerivation {
return schemaChangeEvents;
}
private DataType getWiderType(DataType thisType, DataType thatType) {
if (thisType.equals(thatType)) {
return thisType;
}
if (thisType.is(DataTypeFamily.INTEGER_NUMERIC)
&& thatType.is(DataTypeFamily.INTEGER_NUMERIC)) {
return DataTypes.BIGINT();
}
if (thisType.is(DataTypeFamily.CHARACTER_STRING)
&& thatType.is(DataTypeFamily.CHARACTER_STRING)) {
return DataTypes.STRING();
}
if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
return DataTypes.DOUBLE();
private DataType getWiderType(String columnName, DataType thisType, DataType thatType) {
try {
return SchemaUtils.inferWiderType(thisType, thatType);
} catch (IllegalStateException e) {
throw new IllegalStateException(
String.format(
"Incompatible types found for column `%s`: \"%s\" and \"%s\"",
columnName, thisType, thatType));
}
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", thisType, thatType));
}
}

@ -381,7 +381,7 @@ class SchemaDerivationTest {
schemaDerivation.applySchemaChange(
new CreateTableEvent(TABLE_2, INCOMPATIBLE_SCHEMA)))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Incompatible types: \"INT\" and \"STRING\"");
.hasMessage("Incompatible types found for column `age`: \"INT\" and \"STRING\"");
}
@Test

Loading…
Cancel
Save