diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 928a3b9db..ba9dd4212 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -32,6 +32,9 @@ import org.apache.flink.cdc.common.types.DataTypeFamily; import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DecimalType; +import org.apache.flink.cdc.common.types.LocalZonedTimestampType; +import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; import javax.annotation.Nullable; @@ -175,6 +178,24 @@ public class SchemaUtils { if (lType.equals(rType)) { // identical type mergedType = rType; + } else if (lType instanceof TimestampType && rType instanceof TimestampType) { + return DataTypes.TIMESTAMP( + Math.max( + ((TimestampType) lType).getPrecision(), + ((TimestampType) rType).getPrecision())); + } else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) { + return DataTypes.TIMESTAMP_TZ( + Math.max( + ((ZonedTimestampType) lType).getPrecision(), + ((ZonedTimestampType) rType).getPrecision())); + } else if (lType instanceof LocalZonedTimestampType + && rType instanceof LocalZonedTimestampType) { + return DataTypes.TIMESTAMP_LTZ( + Math.max( + ((LocalZonedTimestampType) lType).getPrecision(), + ((LocalZonedTimestampType) rType).getPrecision())); + } else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) { + return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION); } else if (lType.is(DataTypeFamily.INTEGER_NUMERIC) && rType.is(DataTypeFamily.INTEGER_NUMERIC)) { mergedType = DataTypes.BIGINT(); @@ -184,7 +205,7 @@ public class SchemaUtils { } else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC) && rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { mergedType = DataTypes.DOUBLE(); - } else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) { + } else if (lType instanceof DecimalType && rType instanceof DecimalType) { // Merge two decimal types DecimalType lhsDecimal = (DecimalType) lType; DecimalType rhsDecimal = (DecimalType) rType; @@ -194,7 +215,7 @@ public class SchemaUtils { rhsDecimal.getPrecision() - rhsDecimal.getScale()); int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()); mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); - } else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) { + } else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int DecimalType lhsDecimal = (DecimalType) lType; mergedType = @@ -203,7 +224,7 @@ public class SchemaUtils { lhsDecimal.getPrecision(), lhsDecimal.getScale() + getNumericPrecision(rType)), lhsDecimal.getScale()); - } else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) { + } else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int DecimalType rhsDecimal = (DecimalType) rType; mergedType = diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index 8a508a890..a3aff294b 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -291,6 +291,35 @@ public class SchemaUtilsTest { DataTypes.INT().nullable(), DataTypes.INT().nullable())) .isEqualTo(DataTypes.INT().nullable()); + // Test merging temporal types + Assertions.assertThat( + SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), DataTypes.TIMESTAMP(6))) + .isEqualTo(DataTypes.TIMESTAMP(9)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(7))) + .isEqualTo(DataTypes.TIMESTAMP_TZ(7)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_LTZ(2), DataTypes.TIMESTAMP_LTZ(1))) + .isEqualTo(DataTypes.TIMESTAMP_LTZ(2)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP())) + .isEqualTo(DataTypes.TIMESTAMP(9)); + + Assertions.assertThat( + SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP())) + .isEqualTo(DataTypes.TIMESTAMP(9)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_TZ())) + .isEqualTo(DataTypes.TIMESTAMP(9)); + // incompatible type merges test Assertions.assertThatThrownBy( () -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE())) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index b3f17b2e5..114035fe9 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; @@ -88,17 +89,19 @@ public class FlinkPipelineComposer implements PipelineComposer { @Override public PipelineExecution compose(PipelineDef pipelineDef) { - int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); + Configuration pipelineDefConfig = pipelineDef.getConfig(); + + int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM); env.getConfig().setParallelism(parallelism); SchemaChangeBehavior schemaChangeBehavior = - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR); + pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR); // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = sourceTranslator.translate( - pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); + pipelineDef.getSource(), env, pipelineDefConfig, parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); @@ -110,10 +113,9 @@ public class FlinkPipelineComposer implements PipelineComposer { SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator( schemaChangeBehavior, - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), - pipelineDef - .getConfig() - .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT)); + pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), + pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT), + pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); @@ -122,13 +124,13 @@ public class FlinkPipelineComposer implements PipelineComposer { transformTranslator.translatePostTransform( stream, pipelineDef.getTransforms(), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), + pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), pipelineDef.getUdfs()); // Build DataSink in advance as schema operator requires MetadataApplier DataSinkTranslator sinkTranslator = new DataSinkTranslator(); DataSink dataSink = - sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env); + sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env); stream = schemaOperatorTranslator.translate( @@ -157,7 +159,7 @@ public class FlinkPipelineComposer implements PipelineComposer { addFrameworkJars(); return new FlinkPipelineExecution( - env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking); + env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), isBlocking); } private void addFrameworkJars() { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index c965c88a6..c5cadcd1e 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -19,7 +19,6 @@ package org.apache.flink.cdc.composer.flink.translator; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -39,16 +38,18 @@ import java.util.List; public class SchemaOperatorTranslator { private final SchemaChangeBehavior schemaChangeBehavior; private final String schemaOperatorUid; - private final Duration rpcTimeOut; + private final String timezone; public SchemaOperatorTranslator( SchemaChangeBehavior schemaChangeBehavior, String schemaOperatorUid, - Duration rpcTimeOut) { + Duration rpcTimeOut, + String timezone) { this.schemaChangeBehavior = schemaChangeBehavior; this.schemaOperatorUid = schemaOperatorUid; this.rpcTimeOut = rpcTimeOut; + this.timezone = timezone; } public DataStream translate( @@ -56,7 +57,8 @@ public class SchemaOperatorTranslator { int parallelism, MetadataApplier metadataApplier, List routes) { - return addSchemaOperator(input, parallelism, metadataApplier, routes, schemaChangeBehavior); + return addSchemaOperator( + input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone); } public String getSchemaOperatorUid() { @@ -68,7 +70,8 @@ public class SchemaOperatorTranslator { int parallelism, MetadataApplier metadataApplier, List routes, - SchemaChangeBehavior schemaChangeBehavior) { + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { List routingRules = new ArrayList<>(); for (RouteDef route : routes) { routingRules.add( @@ -82,27 +85,12 @@ public class SchemaOperatorTranslator { "SchemaOperator", new EventTypeInfo(), new SchemaOperatorFactory( - metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior)); + metadataApplier, + routingRules, + rpcTimeOut, + schemaChangeBehavior, + timezone)); stream.uid(schemaOperatorUid).setParallelism(parallelism); return stream; } - - private DataStream dropSchemaChangeEvent(DataStream input, int parallelism) { - return input.filter(event -> !(event instanceof SchemaChangeEvent)) - .setParallelism(parallelism); - } - - private DataStream exceptionOnSchemaChange(DataStream input, int parallelism) { - return input.map( - event -> { - if (event instanceof SchemaChangeEvent) { - throw new RuntimeException( - String.format( - "Aborting execution as the pipeline encountered a schema change event: %s", - event)); - } - return event; - }) - .setParallelism(parallelism); - } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 4e16d2380..0c25c17ca 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,6 +18,10 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -57,10 +61,16 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2; @@ -1092,4 +1102,199 @@ class FlinkPipelineComposerITCase { "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } + + @ParameterizedTest + @EnumSource + void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkApi) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + List events = generateTemporalColumnEvents("default_table_"); + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York"); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Arrays.asList( + new RouteDef( + "default_namespace.default_schema.default_table_ts_\\.*", + "default_namespace.default_schema.default_table_timestamp_merged", + null, + "Merge timestamp columns with different precision"), + new RouteDef( + "default_namespace.default_schema.default_table_tz_\\.*", + "default_namespace.default_schema.default_table_zoned_timestamp_merged", + null, + "Merge timestamp_tz columns with different precision"), + new RouteDef( + "default_namespace.default_schema.default_table_ltz_\\.*", + "default_namespace.default_schema.default_table_local_zoned_timestamp_merged", + null, + "Merge timestamp_ltz columns with different precision"), + new RouteDef( + "default_namespace.default_schema.default_table_\\.*", + "default_namespace.default_schema.default_everything_merged", + null, + "Merge all timestamp family columns with different precision")), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + String[] expected = + Stream.of( + // Merging timestamp with different precision + "CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_table_timestamp_merged, nameMapping={birthday=TIMESTAMP(9)}}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + + // Merging zoned timestamp with different precision + "CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, nameMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", + + // Merging local-zoned timestamp with different precision + "CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, nameMapping={birthday=TIMESTAMP_LTZ(9)}}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + + // Merging all + "CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_everything_merged, nameMapping={birthday=TIMESTAMP(9)}}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[3, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[4, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[5, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[6, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[103, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[104, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[105, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[106, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}") + .map(s -> s.replace("{}", "default_namespace.default_schema.default")) + .toArray(String[]::new); + + assertThat(outputEvents).containsExactlyInAnyOrder(expected); + } + + private List generateTemporalColumnEvents(String tableNamePrefix) { + List events = new ArrayList<>(); + + // Initialize schemas + List names = Arrays.asList("ts_0", "ts_9", "tz_0", "tz_9", "ltz_0", "ltz_9"); + + List types = + Arrays.asList( + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(9), + DataTypes.TIMESTAMP_TZ(0), + DataTypes.TIMESTAMP_TZ(9), + DataTypes.TIMESTAMP_LTZ(0), + DataTypes.TIMESTAMP_LTZ(9)); + + Instant lowPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57Z"); + Instant highPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57.123456789Z"); + + List values = + Arrays.asList( + TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))), + TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))), + ZonedTimestampData.fromZonedDateTime( + ZonedDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))), + ZonedTimestampData.fromZonedDateTime( + ZonedDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))), + LocalZonedTimestampData.fromInstant(lowPrecisionTimestamp), + LocalZonedTimestampData.fromInstant(highPrecisionTimestamp)); + + List schemas = + types.stream() + .map( + temporalColumnType -> + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("birthday", temporalColumnType) + .primaryKey("id") + .build()) + .collect(Collectors.toList()); + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add(new CreateTableEvent(generatedTableId, generatedSchema)); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 1 + i, "Alice", 17, values.get(i)))); + } + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 101 + i, "Zen", 19, values.get(i)))); + } + + return events; + } + + BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 438cc3781..84eeebbca 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -417,7 +417,8 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase { new SchemaOperatorTranslator( SchemaChangeBehavior.EVOLVE, "$$_schema_operator_$$", - DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT, + "UTC"); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index c5e833e54..8b433ae4f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -366,7 +366,8 @@ public class StarRocksMetadataApplierITCase extends StarRocksSinkTestBase { new SchemaOperatorTranslator( SchemaChangeBehavior.EVOLVE, "$$_schema_operator_$$", - DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT, + "UTC"); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 462708085..59814f198 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -112,7 +112,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { false, Collections.emptyList(), Arrays.asList( - "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"", + "java.lang.IllegalStateException: Incompatible types found for column `age`: \"INT\" and \"DOUBLE\"", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index edcac1bbe..eaeb96312 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -113,7 +113,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { false, Collections.emptyList(), Arrays.asList( - "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"", + "java.lang.IllegalStateException: Incompatible types found for column `age`: \"INT\" and \"DOUBLE\"", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index a1bdd7885..50d4e3192 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -20,8 +20,11 @@ package org.apache.flink.cdc.runtime.operators.schema; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; @@ -71,6 +74,8 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -96,6 +101,8 @@ public class SchemaOperator extends AbstractStreamOperator private final List routingRules; + private final String timezone; + /** * Storing route source table selector, sink table name (before symbol replacement), and replace * symbol in a tuple. @@ -126,6 +133,7 @@ public class SchemaOperator extends AbstractStreamOperator this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE; + this.timezone = "UTC"; } @VisibleForTesting @@ -134,8 +142,10 @@ public class SchemaOperator extends AbstractStreamOperator this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE; + this.timezone = "UTC"; } + @VisibleForTesting public SchemaOperator( List routingRules, Duration rpcTimeOut, @@ -144,6 +154,19 @@ public class SchemaOperator extends AbstractStreamOperator this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); this.schemaChangeBehavior = schemaChangeBehavior; + this.timezone = "UTC"; + } + + public SchemaOperator( + List routingRules, + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { + this.routingRules = routingRules; + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); + this.schemaChangeBehavior = schemaChangeBehavior; + this.timezone = timezone; } @Override @@ -365,7 +388,11 @@ public class SchemaOperator extends AbstractStreamOperator } else { fieldGetters.add( new TypeCoercionFieldGetter( - column.getType(), fieldGetter, tolerantMode)); + originalSchema.getColumn(columnName).get().getType(), + column.getType(), + fieldGetter, + tolerantMode, + timezone)); } } } @@ -534,17 +561,23 @@ public class SchemaOperator extends AbstractStreamOperator } private static class TypeCoercionFieldGetter implements RecordData.FieldGetter { + private final DataType originalType; private final DataType destinationType; private final RecordData.FieldGetter originalFieldGetter; private final boolean tolerantMode; + private final String timezone; public TypeCoercionFieldGetter( + DataType originalType, DataType destinationType, RecordData.FieldGetter originalFieldGetter, - boolean tolerantMode) { + boolean tolerantMode, + String timezone) { + this.originalType = originalType; this.destinationType = destinationType; this.originalFieldGetter = originalFieldGetter; this.tolerantMode = tolerantMode; + this.timezone = timezone; } private Object fail(IllegalArgumentException e) throws IllegalArgumentException { @@ -602,6 +635,21 @@ public class SchemaOperator extends AbstractStreamOperator + "Currently only CHAR / VARCHAR can be accepted by a STRING column", originalField.getClass()))); } + } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + && originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { + // For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no + // difference in its internal representation, so there's no need to do any precision + // conversion. + return originalField; + } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE) + && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) { + return originalField; + } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) + && originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { + return originalField; + } else if (destinationType.is(DataTypeFamily.TIMESTAMP) + && originalType.is(DataTypeFamily.TIMESTAMP)) { + return castToTimestamp(originalField, timezone); } else { return fail( new IllegalArgumentException( @@ -617,4 +665,23 @@ public class SchemaOperator extends AbstractStreamOperator // Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement // is guaranteed not to be mixed together. } + + private static TimestampData castToTimestamp(Object object, String timezone) { + if (object == null) { + return null; + } + if (object instanceof LocalZonedTimestampData) { + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + ((LocalZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); + } else if (object instanceof ZonedTimestampData) { + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + ((ZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); + } else { + throw new IllegalArgumentException( + String.format( + "Unable to implicitly coerce object `%s` as a TIMESTAMP.", object)); + } + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index 7cd35a20d..367f65597 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -47,8 +47,9 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory MetadataApplier metadataApplier, List routingRules, Duration rpcTimeOut, - SchemaChangeBehavior schemaChangeBehavior) { - super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior)); + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { + super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior, timezone)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; this.schemaChangeBehavior = schemaChangeBehavior; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java index e4b547b21..1bb197b45 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -31,9 +31,8 @@ import org.apache.flink.cdc.common.schema.PhysicalColumn; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypeFamily; -import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.utils.ChangeEventUtils; +import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -247,7 +246,9 @@ public class SchemaDerivation { // Check type compatibility DataType widerType = getWiderType( - existedColumnInDerivedTable.getType(), dataType); + columnName, + existedColumnInDerivedTable.getType(), + dataType); if (!widerType.equals(existedColumnInDerivedTable.getType())) { typeDifference.put( existedColumnInDerivedTable.getName(), widerType); @@ -282,6 +283,7 @@ public class SchemaDerivation { .equals(addedColumn.getAddColumn().getType())) { DataType widerType = getWiderType( + existedColumnInDerivedTable.getName(), existedColumnInDerivedTable.getType(), addedColumn.getAddColumn().getType()); if (!widerType.equals(existedColumnInDerivedTable.getType())) { @@ -318,7 +320,10 @@ public class SchemaDerivation { Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get(); if (!existedColumnInDerivedTable.getType().equals(column.getType())) { DataType widerType = - getWiderType(existedColumnInDerivedTable.getType(), column.getType()); + getWiderType( + existedColumnInDerivedTable.getName(), + existedColumnInDerivedTable.getType(), + column.getType()); if (!widerType.equals(existedColumnInDerivedTable.getType())) { newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType); } @@ -336,23 +341,14 @@ public class SchemaDerivation { return schemaChangeEvents; } - private DataType getWiderType(DataType thisType, DataType thatType) { - if (thisType.equals(thatType)) { - return thisType; - } - if (thisType.is(DataTypeFamily.INTEGER_NUMERIC) - && thatType.is(DataTypeFamily.INTEGER_NUMERIC)) { - return DataTypes.BIGINT(); - } - if (thisType.is(DataTypeFamily.CHARACTER_STRING) - && thatType.is(DataTypeFamily.CHARACTER_STRING)) { - return DataTypes.STRING(); - } - if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC) - && thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { - return DataTypes.DOUBLE(); + private DataType getWiderType(String columnName, DataType thisType, DataType thatType) { + try { + return SchemaUtils.inferWiderType(thisType, thatType); + } catch (IllegalStateException e) { + throw new IllegalStateException( + String.format( + "Incompatible types found for column `%s`: \"%s\" and \"%s\"", + columnName, thisType, thatType)); } - throw new IllegalStateException( - String.format("Incompatible types: \"%s\" and \"%s\"", thisType, thatType)); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 9a2d1cfb4..05d74ac7d 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -381,7 +381,7 @@ class SchemaDerivationTest { schemaDerivation.applySchemaChange( new CreateTableEvent(TABLE_2, INCOMPATIBLE_SCHEMA))) .isInstanceOf(IllegalStateException.class) - .hasMessage("Incompatible types: \"INT\" and \"STRING\""); + .hasMessage("Incompatible types found for column `age`: \"INT\" and \"STRING\""); } @Test