|
|
|
@ -18,6 +18,10 @@
|
|
|
|
|
package org.apache.flink.cdc.composer.flink;
|
|
|
|
|
|
|
|
|
|
import org.apache.flink.cdc.common.configuration.Configuration;
|
|
|
|
|
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
|
|
|
|
|
import org.apache.flink.cdc.common.data.TimestampData;
|
|
|
|
|
import org.apache.flink.cdc.common.data.ZonedTimestampData;
|
|
|
|
|
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
|
|
|
|
|
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
|
|
|
|
|
import org.apache.flink.cdc.common.event.AddColumnEvent;
|
|
|
|
|
import org.apache.flink.cdc.common.event.CreateTableEvent;
|
|
|
|
@ -57,10 +61,16 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
|
|
|
|
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
|
|
import java.io.PrintStream;
|
|
|
|
|
import java.time.Instant;
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
|
import java.time.ZoneId;
|
|
|
|
|
import java.time.ZonedDateTime;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
|
|
import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
|
|
|
|
|
import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
|
|
|
|
@ -1092,4 +1102,199 @@ class FlinkPipelineComposerITCase {
|
|
|
|
|
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest
|
|
|
|
|
@EnumSource
|
|
|
|
|
void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkApi)
|
|
|
|
|
throws Exception {
|
|
|
|
|
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
|
|
|
|
|
|
|
|
|
|
// Setup value source
|
|
|
|
|
Configuration sourceConfig = new Configuration();
|
|
|
|
|
sourceConfig.set(
|
|
|
|
|
ValuesDataSourceOptions.EVENT_SET_ID,
|
|
|
|
|
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
|
|
|
|
|
|
|
|
|
|
List<Event> events = generateTemporalColumnEvents("default_table_");
|
|
|
|
|
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
|
|
|
|
|
|
|
|
|
|
SourceDef sourceDef =
|
|
|
|
|
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
|
|
|
|
|
|
|
|
|
|
// Setup value sink
|
|
|
|
|
Configuration sinkConfig = new Configuration();
|
|
|
|
|
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
|
|
|
|
|
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
|
|
|
|
|
|
|
|
|
|
// Setup pipeline
|
|
|
|
|
Configuration pipelineConfig = new Configuration();
|
|
|
|
|
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
|
|
|
|
|
pipelineConfig.set(
|
|
|
|
|
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
|
|
|
|
|
pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York");
|
|
|
|
|
PipelineDef pipelineDef =
|
|
|
|
|
new PipelineDef(
|
|
|
|
|
sourceDef,
|
|
|
|
|
sinkDef,
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
new RouteDef(
|
|
|
|
|
"default_namespace.default_schema.default_table_ts_\\.*",
|
|
|
|
|
"default_namespace.default_schema.default_table_timestamp_merged",
|
|
|
|
|
null,
|
|
|
|
|
"Merge timestamp columns with different precision"),
|
|
|
|
|
new RouteDef(
|
|
|
|
|
"default_namespace.default_schema.default_table_tz_\\.*",
|
|
|
|
|
"default_namespace.default_schema.default_table_zoned_timestamp_merged",
|
|
|
|
|
null,
|
|
|
|
|
"Merge timestamp_tz columns with different precision"),
|
|
|
|
|
new RouteDef(
|
|
|
|
|
"default_namespace.default_schema.default_table_ltz_\\.*",
|
|
|
|
|
"default_namespace.default_schema.default_table_local_zoned_timestamp_merged",
|
|
|
|
|
null,
|
|
|
|
|
"Merge timestamp_ltz columns with different precision"),
|
|
|
|
|
new RouteDef(
|
|
|
|
|
"default_namespace.default_schema.default_table_\\.*",
|
|
|
|
|
"default_namespace.default_schema.default_everything_merged",
|
|
|
|
|
null,
|
|
|
|
|
"Merge all timestamp family columns with different precision")),
|
|
|
|
|
Collections.emptyList(),
|
|
|
|
|
Collections.emptyList(),
|
|
|
|
|
pipelineConfig);
|
|
|
|
|
|
|
|
|
|
// Execute the pipeline
|
|
|
|
|
PipelineExecution execution = composer.compose(pipelineDef);
|
|
|
|
|
|
|
|
|
|
execution.execute();
|
|
|
|
|
|
|
|
|
|
// Check the order and content of all received events
|
|
|
|
|
String[] outputEvents = outCaptor.toString().trim().split("\n");
|
|
|
|
|
|
|
|
|
|
String[] expected =
|
|
|
|
|
Stream.of(
|
|
|
|
|
// Merging timestamp with different precision
|
|
|
|
|
"CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"AlterColumnTypeEvent{tableId={}_table_timestamp_merged, nameMapping={birthday=TIMESTAMP(9)}}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
|
|
|
|
|
// Merging zoned timestamp with different precision
|
|
|
|
|
"CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
|
|
|
|
|
"AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, nameMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}",
|
|
|
|
|
|
|
|
|
|
// Merging local-zoned timestamp with different precision
|
|
|
|
|
"CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, nameMapping={birthday=TIMESTAMP_LTZ(9)}}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
|
|
|
|
|
// Merging all
|
|
|
|
|
"CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"AlterColumnTypeEvent{tableId={}_everything_merged, nameMapping={birthday=TIMESTAMP(9)}}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[3, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[4, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[5, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[6, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[103, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[104, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[105, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}",
|
|
|
|
|
"DataChangeEvent{tableId={}_everything_merged, before=[], after=[106, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}")
|
|
|
|
|
.map(s -> s.replace("{}", "default_namespace.default_schema.default"))
|
|
|
|
|
.toArray(String[]::new);
|
|
|
|
|
|
|
|
|
|
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
|
|
|
|
|
List<Event> events = new ArrayList<>();
|
|
|
|
|
|
|
|
|
|
// Initialize schemas
|
|
|
|
|
List<String> names = Arrays.asList("ts_0", "ts_9", "tz_0", "tz_9", "ltz_0", "ltz_9");
|
|
|
|
|
|
|
|
|
|
List<DataType> types =
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
DataTypes.TIMESTAMP(0),
|
|
|
|
|
DataTypes.TIMESTAMP(9),
|
|
|
|
|
DataTypes.TIMESTAMP_TZ(0),
|
|
|
|
|
DataTypes.TIMESTAMP_TZ(9),
|
|
|
|
|
DataTypes.TIMESTAMP_LTZ(0),
|
|
|
|
|
DataTypes.TIMESTAMP_LTZ(9));
|
|
|
|
|
|
|
|
|
|
Instant lowPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57Z");
|
|
|
|
|
Instant highPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57.123456789Z");
|
|
|
|
|
|
|
|
|
|
List<Object> values =
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
TimestampData.fromLocalDateTime(
|
|
|
|
|
LocalDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))),
|
|
|
|
|
TimestampData.fromLocalDateTime(
|
|
|
|
|
LocalDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))),
|
|
|
|
|
ZonedTimestampData.fromZonedDateTime(
|
|
|
|
|
ZonedDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))),
|
|
|
|
|
ZonedTimestampData.fromZonedDateTime(
|
|
|
|
|
ZonedDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))),
|
|
|
|
|
LocalZonedTimestampData.fromInstant(lowPrecisionTimestamp),
|
|
|
|
|
LocalZonedTimestampData.fromInstant(highPrecisionTimestamp));
|
|
|
|
|
|
|
|
|
|
List<Schema> schemas =
|
|
|
|
|
types.stream()
|
|
|
|
|
.map(
|
|
|
|
|
temporalColumnType ->
|
|
|
|
|
Schema.newBuilder()
|
|
|
|
|
.physicalColumn("id", DataTypes.INT())
|
|
|
|
|
.physicalColumn("name", DataTypes.STRING())
|
|
|
|
|
.physicalColumn("age", DataTypes.INT())
|
|
|
|
|
.physicalColumn("birthday", temporalColumnType)
|
|
|
|
|
.primaryKey("id")
|
|
|
|
|
.build())
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < names.size(); i++) {
|
|
|
|
|
TableId generatedTableId =
|
|
|
|
|
TableId.tableId(
|
|
|
|
|
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
|
|
|
|
|
Schema generatedSchema = schemas.get(i);
|
|
|
|
|
events.add(new CreateTableEvent(generatedTableId, generatedSchema));
|
|
|
|
|
events.add(
|
|
|
|
|
DataChangeEvent.insertEvent(
|
|
|
|
|
generatedTableId,
|
|
|
|
|
generate(generatedSchema, 1 + i, "Alice", 17, values.get(i))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < names.size(); i++) {
|
|
|
|
|
TableId generatedTableId =
|
|
|
|
|
TableId.tableId(
|
|
|
|
|
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
|
|
|
|
|
Schema generatedSchema = schemas.get(i);
|
|
|
|
|
events.add(
|
|
|
|
|
DataChangeEvent.insertEvent(
|
|
|
|
|
generatedTableId,
|
|
|
|
|
generate(generatedSchema, 101 + i, "Zen", 19, values.get(i))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return events;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
BinaryRecordData generate(Schema schema, Object... fields) {
|
|
|
|
|
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
|
|
|
|
|
.generate(
|
|
|
|
|
Arrays.stream(fields)
|
|
|
|
|
.map(
|
|
|
|
|
e ->
|
|
|
|
|
(e instanceof String)
|
|
|
|
|
? BinaryStringData.fromString((String) e)
|
|
|
|
|
: e)
|
|
|
|
|
.toArray());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|