diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java index 5fd249b7b..cb8310b02 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/FlushEvent.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.common.event; +import java.util.List; import java.util.Objects; /** @@ -24,18 +25,36 @@ import java.util.Objects; * start flushing. */ public class FlushEvent implements Event { + /** The sink table(s) that need to be flushed. */ + private final List tableIds; /** Which subTask ID this FlushEvent was initiated from. */ private final int sourceSubTaskId; - public FlushEvent(int sourceSubTaskId) { + /** Which type of schema change event caused this FlushEvent. */ + private final SchemaChangeEventType schemaChangeEventType; + + public FlushEvent( + int sourceSubTaskId, + List tableIds, + SchemaChangeEventType schemaChangeEventType) { + this.tableIds = tableIds; this.sourceSubTaskId = sourceSubTaskId; + this.schemaChangeEventType = schemaChangeEventType; + } + + public List getTableIds() { + return tableIds; } public int getSourceSubTaskId() { return sourceSubTaskId; } + public SchemaChangeEventType getSchemaChangeEventType() { + return schemaChangeEventType; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -45,7 +64,9 @@ public class FlushEvent implements Event { return false; } FlushEvent that = (FlushEvent) o; - return sourceSubTaskId == that.sourceSubTaskId; + return sourceSubTaskId == that.sourceSubTaskId + && Objects.equals(tableIds, that.tableIds) + && Objects.equals(schemaChangeEventType, that.schemaChangeEventType); } @Override @@ -55,6 +76,13 @@ public class FlushEvent implements Event { @Override public String toString() { - return "FlushEvent{" + "sourceSubTaskId=" + sourceSubTaskId + '}'; + return "FlushEvent{" + + "sourceSubTaskId=" + + sourceSubTaskId + + ", tableIds=" + + tableIds + + ", schemaChangeEventType=" + + schemaChangeEventType + + '}'; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java index b528f53aa..21bc14e0e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java @@ -124,7 +124,10 @@ public class BucketAssignOperator extends AbstractStreamOperator output.collect( new StreamRecord<>( new BucketWrapperFlushEvent( - currentTaskNumber, ((FlushEvent) event).getSourceSubTaskId()))); + currentTaskNumber, + ((FlushEvent) event).getSourceSubTaskId(), + ((FlushEvent) event).getTableIds(), + ((FlushEvent) event).getSchemaChangeEventType()))); return; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java index 95e0f6d3f..6ff1573e1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java @@ -21,7 +21,10 @@ import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.runtime.serializer.EnumSerializer; +import org.apache.flink.cdc.runtime.serializer.ListSerializer; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; @@ -40,8 +43,10 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton private final EventSerializer eventSerializer = EventSerializer.INSTANCE; - private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; - + private final ListSerializer tableIdListSerializer = + new ListSerializer<>(TableIdSerializer.INSTANCE); + private final EnumSerializer schemaChangeEventTypeEnumSerializer = + new EnumSerializer<>(SchemaChangeEventType.class); /** Sharable instance of the TableIdSerializer. */ public static final BucketWrapperEventSerializer INSTANCE = new BucketWrapperEventSerializer(); @@ -82,6 +87,9 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event; dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket()); dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId()); + tableIdListSerializer.serialize(bucketWrapperFlushEvent.getTableIds(), dataOutputView); + schemaChangeEventTypeEnumSerializer.serialize( + bucketWrapperFlushEvent.getSchemaChangeEventType(), dataOutputView); } } @@ -89,7 +97,11 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton public Event deserialize(DataInputView source) throws IOException { EventClass eventClass = enumSerializer.deserialize(source); if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) { - return new BucketWrapperFlushEvent(source.readInt(), source.readInt()); + return new BucketWrapperFlushEvent( + source.readInt(), + source.readInt(), + tableIdListSerializer.deserialize(source), + schemaChangeEventTypeEnumSerializer.deserialize(source)); } else { return new BucketWrapperChangeEvent( source.readInt(), (ChangeEvent) eventSerializer.deserialize(source)); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java index 065871676..25e9f9152 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java @@ -18,7 +18,10 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import java.util.List; import java.util.Objects; /** A wrapper class for {@link FlushEvent} to attach bucket id. */ @@ -26,8 +29,12 @@ public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper private final int bucket; - public BucketWrapperFlushEvent(int bucket, int subTaskId) { - super(subTaskId); + public BucketWrapperFlushEvent( + int bucket, + int subTaskId, + List tableIds, + SchemaChangeEventType schemaChangeEventType) { + super(subTaskId, tableIds, schemaChangeEventType); this.bucket = bucket; } diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java index 2cd2f88eb..e47795ba9 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java @@ -180,6 +180,6 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment { expectResult, "ob_products_sink", new String[] {"id", "name", "description", "weight", "enum_c", "json_c"}, - 60000L); + 300000L); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java index eca0193c8..ab856ba5f 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java @@ -150,6 +150,7 @@ public class SchemaOperator extends AbstractStreamOperator // Then, notify this information to the coordinator requestSchemaChange( + tableId, new SchemaChangeRequest(sourcePartition, subTaskId, schemaChangeEvent)); schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1); } else if (event instanceof DataChangeEvent) { @@ -188,9 +189,15 @@ public class SchemaOperator extends AbstractStreamOperator } } - private void requestSchemaChange(SchemaChangeRequest schemaChangeRequest) { + private void requestSchemaChange( + TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) { LOG.info("{}> Sent FlushEvent to downstream...", subTaskId); - output.collect(new StreamRecord<>(new FlushEvent(subTaskId))); + output.collect( + new StreamRecord<>( + new FlushEvent( + subTaskId, + tableIdRouter.route(sourceTableId), + schemaChangeRequest.getSchemaChangeEvent().getType()))); LOG.info("{}> Sending evolve request...", subTaskId); SchemaChangeResponse response = sendRequestToCoordinator(schemaChangeRequest); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index 9ea79d729..e5039543e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -164,8 +164,10 @@ public class SchemaOperator extends AbstractStreamOperator schemaOperatorMetrics.increaseSchemaChangeEvents(1); // First, send FlushEvent or it might be blocked later + List sinkTables = router.route(tableId); LOG.info("{}> Sending the FlushEvent.", subTaskId); - output.collect(new StreamRecord<>(new FlushEvent(subTaskId))); + output.collect( + new StreamRecord<>(new FlushEvent(subTaskId, sinkTables, originalEvent.getType()))); // Then, queue to request schema change to SchemaCoordinator. SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java index c0c6c5b6e..1e9f27a09 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkFunctionOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -112,6 +113,20 @@ public class DataSinkFunctionOperator extends StreamSink { // ----------------------------- Helper functions ------------------------------- private void handleFlushEvent(FlushEvent event) throws Exception { userFunction.finish(); + if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) { + event.getTableIds().stream() + .filter(tableId -> !processedTableIds.contains(tableId)) + .forEach( + tableId -> { + LOG.info("Table {} has not been processed", tableId); + try { + emitLatestSchema(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + processedTableIds.add(tableId); + }); + } schemaEvolutionClient.notifyFlushSuccess( getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId()); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index 9c438f51c..472ed85ee 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -198,6 +199,20 @@ public class DataSinkWriterOperator extends AbstractStreamOperator !processedTableIds.contains(tableId)) + .forEach( + tableId -> { + LOG.info("Table {} has not been processed", tableId); + try { + emitLatestSchema(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + processedTableIds.add(tableId); + }); + } schemaEvolutionClient.notifyFlushSuccess( getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId()); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java index 8a17589fc..bea776c88 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializer.java @@ -24,7 +24,10 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.runtime.serializer.EnumSerializer; +import org.apache.flink.cdc.runtime.serializer.ListSerializer; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; @@ -42,11 +45,14 @@ public final class EventSerializer extends TypeSerializerSingleton { private final SchemaChangeEventSerializer schemaChangeEventSerializer = SchemaChangeEventSerializer.INSTANCE; - private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; + private final ListSerializer listSerializer = + new ListSerializer<>(TableIdSerializer.INSTANCE); private final EnumSerializer enumSerializer = new EnumSerializer<>(EventClass.class); private final TypeSerializer dataChangeEventSerializer = DataChangeEventSerializer.INSTANCE; + private final EnumSerializer schemaChangeEventTypeEnumSerializer = + new EnumSerializer<>(SchemaChangeEventType.class); @Override public boolean isImmutableType() { @@ -62,7 +68,11 @@ public final class EventSerializer extends TypeSerializerSingleton { public Event copy(Event from) { if (from instanceof FlushEvent) { FlushEvent flushEvent = (FlushEvent) from; - return new FlushEvent(((FlushEvent) from).getSourceSubTaskId()); + return new FlushEvent( + flushEvent.getSourceSubTaskId(), + listSerializer.copy(((FlushEvent) from).getTableIds()), + schemaChangeEventTypeEnumSerializer.copy( + flushEvent.getSchemaChangeEventType())); } else if (from instanceof SchemaChangeEvent) { return schemaChangeEventSerializer.copy((SchemaChangeEvent) from); } else if (from instanceof DataChangeEvent) { @@ -86,6 +96,9 @@ public final class EventSerializer extends TypeSerializerSingleton { if (record instanceof FlushEvent) { enumSerializer.serialize(EventClass.FLUSH_EVENT, target); target.writeInt(((FlushEvent) record).getSourceSubTaskId()); + listSerializer.serialize(((FlushEvent) record).getTableIds(), target); + schemaChangeEventTypeEnumSerializer.serialize( + ((FlushEvent) record).getSchemaChangeEventType(), target); } else if (record instanceof SchemaChangeEvent) { enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target); schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target); @@ -102,7 +115,10 @@ public final class EventSerializer extends TypeSerializerSingleton { EventClass eventClass = enumSerializer.deserialize(source); switch (eventClass) { case FLUSH_EVENT: - return new FlushEvent(source.readInt()); + return new FlushEvent( + source.readInt(), + listSerializer.deserialize(source), + schemaChangeEventTypeEnumSerializer.deserialize(source)); case DATA_CHANGE_EVENT: return dataChangeEventSerializer.deserialize(source); case SCHEME_CHANGE_EVENT: diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java index adba67428..1b4de204c 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java @@ -181,16 +181,24 @@ public class SchemaEvolveTest extends SchemaTestBase { })) .map(StreamRecord::getValue) .containsExactly( - new FlushEvent(0), + new FlushEvent( + 0, Collections.singletonList(TABLE_ID), createTableEvent.getType()), createTableEvent, genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"), - new FlushEvent(0), + new FlushEvent( + 0, Collections.singletonList(TABLE_ID), addColumnEvent.getType()), addColumnEventAtLast, genInsert(TABLE_ID, "ISFSB", 2, "Bob", 31.415926f, "Bye-bye", false), - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(TABLE_ID), + renameColumnEvent.getType()), appendRenamedColumnAtLast, genInsert(TABLE_ID, "ISFSBS", 3, "Cicada", 123.456f, null, true, "Ok"), - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(TABLE_ID), + alterColumnTypeEvent.getType()), alterColumnTypeEventWithBackfill, genInsert( TABLE_ID, @@ -201,11 +209,16 @@ public class SchemaEvolveTest extends SchemaTestBase { null, false, "Nah"), - new FlushEvent(0), + new FlushEvent( + 0, Collections.singletonList(TABLE_ID), dropColumnEvent.getType()), genInsert(TABLE_ID, "ISDSBS", 5, "Eve", 1.414, null, true, null), - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(TABLE_ID), + truncateTableEvent.getType()), genInsert(TABLE_ID, "ISDSBS", 6, "Ferris", 0.001, null, false, null), - new FlushEvent(0)); + new FlushEvent( + 0, Collections.singletonList(TABLE_ID), dropTableEvent.getType())); } @Test @@ -308,7 +321,8 @@ public class SchemaEvolveTest extends SchemaTestBase { })) .map(StreamRecord::getValue) .containsExactly( - new FlushEvent(0), + new FlushEvent( + 0, Collections.singletonList(TABLE_ID), createTableEvent.getType()), createTableEvent, genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"), genInsert(TABLE_ID, "ISFS", 2, "Bob", 31.415926f, "Bye-bye"), diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java index 53895d56c..6f9377552 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java @@ -112,7 +112,11 @@ public class SchemaEvolveTest { Assertions.assertThat( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)) .isEqualTo( harness.getOutputRecords().stream() @@ -172,7 +176,12 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), addColumnEvents)); + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN)), + addColumnEvents)); Schema schemaV2 = Schema.newBuilder() @@ -230,7 +239,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN)), renameColumnEvents)); Schema schemaV3 = @@ -275,7 +288,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_TYPE)), alterColumnTypeEvents)); Schema schemaV4 = @@ -311,7 +328,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN)), dropColumnEvents)); Schema schemaV5 = @@ -373,7 +394,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -429,7 +454,12 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), addColumnEvents)); + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN)), + addColumnEvents)); Schema schemaV2 = Schema.newBuilder() @@ -487,7 +517,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN)), renameColumnEvents)); Schema schemaV3 = @@ -532,7 +566,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_TYPE)), alterColumnTypeEvents)); Schema schemaV4 = @@ -568,7 +606,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN)), dropColumnEvents)); Schema schemaV5 = @@ -630,7 +672,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -720,7 +766,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -772,7 +822,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 4, STRING, "Derrida", SMALLINT, (short) 20)), @@ -840,7 +893,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 6, STRING, null, SMALLINT, (short) 22)), @@ -889,7 +945,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_TYPE), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 8, STRING, null, SMALLINT, null)), DataChangeEvent.insertEvent( @@ -930,7 +989,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN), DataChangeEvent.insertEvent( tableId, buildRecord(INT, 12, STRING, null, DOUBLE, null)), DataChangeEvent.insertEvent( @@ -1006,7 +1068,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1105,7 +1171,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1156,7 +1226,11 @@ public class SchemaEvolveTest { processEvent(schemaOperator, addColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(0)); + expectedEvents.add( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN)); expectedEvents.addAll(addColumnEvents); Assertions.assertThat( @@ -1218,7 +1292,11 @@ public class SchemaEvolveTest { processEvent(schemaOperator, renameColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(0)); + expectedEvents.add( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN)); expectedEvents.addAll(renameColumnEvents); Assertions.assertThat( @@ -1263,7 +1341,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_TYPE), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1321,7 +1402,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1417,7 +1501,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE)), createAndInsertDataEvents)); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); @@ -1468,7 +1556,11 @@ public class SchemaEvolveTest { processEvent(schemaOperator, addColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(0)); + expectedEvents.add( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN)); expectedEvents.addAll(addColumnEvents); @@ -1531,7 +1623,11 @@ public class SchemaEvolveTest { processEvent(schemaOperator, renameColumnEvents); List expectedEvents = new ArrayList<>(); - expectedEvents.add(new FlushEvent(0)); + expectedEvents.add( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN)); expectedEvents.addAll(renameColumnEvents); Assertions.assertThat( @@ -1576,7 +1672,10 @@ public class SchemaEvolveTest { List expectedEvents = Arrays.asList( - new FlushEvent(0), + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_TYPE), DataChangeEvent.insertEvent( tableId, buildRecord( @@ -1633,7 +1732,11 @@ public class SchemaEvolveTest { processEvent(schemaOperator, dropColumnEvents); FlushEvent result; - result = new FlushEvent(0); + result = + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN); List expectedEvents = Arrays.asList( result, @@ -1718,7 +1821,11 @@ public class SchemaEvolveTest { FlushEvent result; TableId tableId1 = tableId; Event schemaChangeEvent = createAndInsertDataEvents.get(0); - result = new FlushEvent(0); + result = + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE); Assertions.assertThat( harness.getOutputRecords().stream() .map(StreamRecord::getValue) @@ -1780,7 +1887,12 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), addColumnEvents)); + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN)), + addColumnEvents)); Schema schemaV2 = Schema.newBuilder() @@ -1885,7 +1997,11 @@ public class SchemaEvolveTest { int subTaskId = 0; TableId tableId1 = tableId; Event schemaChangeEvent = renameColumnEvents.get(0); - result = new FlushEvent(subTaskId); + result = + new FlushEvent( + subTaskId, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN); Assertions.assertThat( harness.getOutputRecords().stream() .map(StreamRecord::getValue) @@ -1966,7 +2082,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ALTER_COLUMN_TYPE)), lenientAlterColumnTypeEvents)); Schema schemaV4 = @@ -2027,7 +2147,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN)), lenientDropColumnEvents)); Schema schemaV5 = @@ -2108,7 +2232,11 @@ public class SchemaEvolveTest { FlushEvent result; TableId tableId1 = tableId; Event schemaChangeEvent = createAndInsertDataEvents.get(0); - result = new FlushEvent(0); + result = + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.CREATE_TABLE); Assertions.assertThat( harness.getOutputRecords().stream() .map(StreamRecord::getValue) @@ -2156,7 +2284,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.DROP_COLUMN)), lenientDropColumnEvents)); Schema schemaV2 = @@ -2271,7 +2403,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.ADD_COLUMN)), lenientAddColumnEvents)); Schema schemaV3 = @@ -2389,7 +2525,11 @@ public class SchemaEvolveTest { .collect(Collectors.toList())) .isEqualTo( ListUtils.union( - Collections.singletonList(new FlushEvent(0)), + Collections.singletonList( + new FlushEvent( + 0, + Collections.singletonList(tableId), + SchemaChangeEventType.RENAME_COLUMN)), lenientRenameColumnEvents)); Schema schemaV4 = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorAdapter.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorAdapter.java new file mode 100644 index 000000000..8fd60f632 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorAdapter.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.sink; + +import org.apache.flink.cdc.common.event.ChangeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +/** + * The DataSinkOperatorAdapter class acts as an adapter for testing the core schema evolution + * process in both {@link DataSinkWriterOperator} and {@link DataSinkFunctionOperator}. + */ +public class DataSinkOperatorAdapter extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private SchemaEvolutionClient schemaEvolutionClient; + + private final OperatorID schemaOperatorID; + + /** A set of {@link TableId} that already processed {@link CreateTableEvent}. */ + private final Set processedTableIds; + + public DataSinkOperatorAdapter() { + this.schemaOperatorID = new OperatorID(); + this.processedTableIds = new HashSet<>(); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output> output) { + super.setup(containingTask, config, output); + schemaEvolutionClient = + new SchemaEvolutionClient( + containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), + schemaOperatorID); + } + + @Override + public void open() throws Exception {} + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask()); + } + + @Override + public void snapshotState(StateSnapshotContext context) {} + + @Override + public void processWatermark(Watermark mark) {} + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) {} + + @Override + public void processElement(StreamRecord element) throws Exception { + Event event = element.getValue(); + + // FlushEvent triggers flush + if (event instanceof FlushEvent) { + handleFlushEvent(((FlushEvent) event)); + return; + } + + // CreateTableEvent marks the table as processed directly + if (event instanceof CreateTableEvent) { + processedTableIds.add(((CreateTableEvent) event).tableId()); + // replace FlinkWriterOperator/StreamSink and emit the event for testing + output.collect(element); + return; + } + + // Check if the table is processed before emitting all other events, because we have to make + // sure that sink have a view of the full schema before processing any change events, + // including schema changes. + ChangeEvent changeEvent = (ChangeEvent) event; + if (!processedTableIds.contains(changeEvent.tableId())) { + emitLatestSchema(changeEvent.tableId()); + processedTableIds.add(changeEvent.tableId()); + } + processedTableIds.add(changeEvent.tableId()); + output.collect(element); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) {} + + @Override + public void close() throws Exception {} + + @Override + public void endInput() {} + + // ----------------------------- Helper functions ------------------------------- + + private void handleFlushEvent(FlushEvent event) throws Exception { + // omit copySinkWriter/userFunction flush from testing + if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) { + event.getTableIds().stream() + .filter(tableId -> !processedTableIds.contains(tableId)) + .forEach( + tableId -> { + LOG.info("Table {} has not been processed", tableId); + try { + emitLatestSchema(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + processedTableIds.add(tableId); + }); + } + schemaEvolutionClient.notifyFlushSuccess( + getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId()); + } + + private void emitLatestSchema(TableId tableId) throws Exception { + Optional schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId); + if (schema.isPresent()) { + // request and process CreateTableEvent because SinkWriter need to retrieve + // Schema to deserialize RecordData after resuming job. + output.collect(new StreamRecord<>(new CreateTableEvent(tableId, schema.get()))); + processedTableIds.add(tableId); + } else { + throw new RuntimeException( + "Could not find schema message from SchemaRegistry for " + tableId); + } + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java new file mode 100644 index 000000000..fbeb32506 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.sink; + +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; +import org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Unit tests for the DataSinkOperator ({@link DataSinkWriterOperator}/{@link + * DataSinkFunctionOperator} handling schema evolution events. + */ +public class DataSinkOperatorWithSchemaEvolveTest { + private static final TableId CUSTOMERS_TABLEID = + TableId.tableId("my_company", "my_branch", "customers"); + private static final Schema CUSTOMERS_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .primaryKey("col1") + .build(); + private static final Schema CUSTOMERS_LATEST_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .physicalColumn("col3", DataTypes.STRING()) + .primaryKey("col1") + .build(); + + public RegularEventOperatorTestHarness setupHarness( + DataSinkOperatorAdapter dataSinkWriterOperator) throws Exception { + RegularEventOperatorTestHarness harness = + RegularEventOperatorTestHarness.withDuration( + dataSinkWriterOperator, 1, Duration.ofSeconds(3)); + // Initialization + harness.open(); + return harness; + } + + private FlushEvent createFlushEvent(TableId tableId, SchemaChangeEvent postEvent) { + return new FlushEvent(0, Collections.singletonList(tableId), postEvent.getType()); + } + + private void processSchemaChangeEvent( + DataSinkOperatorAdapter dataSinkWriterOperator, + RegularEventOperatorTestHarness schemaOperatorHarness, + TableId tableId, + SchemaChangeEvent event) + throws Exception { + // Create the flush event to process before the schema change event + FlushEvent flushEvent = createFlushEvent(tableId, event); + + // Send schema change request to coordinator + schemaOperatorHarness.requestSchemaChangeEvent(tableId, event); + + // Send flush event to SinkWriterOperator + dataSinkWriterOperator.processElement(new StreamRecord<>(flushEvent)); + + // Wait for coordinator to complete the schema change and get the finished schema change + // events + SchemaChangeResponse schemaEvolveResponse = + schemaOperatorHarness.requestSchemaChangeResult(tableId, event); + List finishedSchemaChangeEvents = + schemaEvolveResponse.getAppliedSchemaChangeEvents(); + + // Send the schema change events to SinkWriterOperator + finishedSchemaChangeEvents.forEach( + e -> { + try { + dataSinkWriterOperator.processElement(new StreamRecord<>(e)); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + } + + private void processDataChangeEvent( + DataSinkOperatorAdapter dataSinkWriterOperator, DataChangeEvent event) + throws Exception { + // Send the data change event to SinkWriterOperator + dataSinkWriterOperator.processElement(new StreamRecord<>(event)); + } + + private void assertOutputEvents( + RegularEventOperatorTestHarness + dataSinkWriterOperatorHarness, + List expectedEvents) { + Assertions.assertThat( + dataSinkWriterOperatorHarness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo(expectedEvents); + } + + /** + * This case tests the schema evolution process for handling schema change events by a sink + * operator under normal conditions. + */ + @Test + public void testSchemaChangeEvent() throws Exception { + DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter(); + try (RegularEventOperatorTestHarness + dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) { + // Create CreateTableEvent + CreateTableEvent createTableEvent = + new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); + // test processing CreateTableEvent + processSchemaChangeEvent( + dataSinkWriterOperator, + dataSinkWriterOperatorHarness, + CUSTOMERS_TABLEID, + createTableEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, Collections.singletonList(createTableEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + + // Add column + AddColumnEvent.ColumnWithPosition columnWithPosition = + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("col3", DataTypes.STRING())); + AddColumnEvent addColumnEvent = + new AddColumnEvent( + CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition)); + + // test processing AddColumnEvent + processSchemaChangeEvent( + dataSinkWriterOperator, + dataSinkWriterOperatorHarness, + CUSTOMERS_TABLEID, + addColumnEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, Collections.singletonList(addColumnEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + + // Insert + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + CUSTOMERS_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("2"), + new BinaryStringData("3"), + })); + + // test processing DataChangeEvent + processDataChangeEvent(dataSinkWriterOperator, insertEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, Collections.singletonList(insertEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + } + } + + /** + * This case tests the schema evolution process for handling schema change events by a sink + * operator after failover. + */ + @Test + public void testSchemaChangeEventAfterFailover() throws Exception { + DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter(); + try (RegularEventOperatorTestHarness + dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) { + dataSinkWriterOperatorHarness.registerOriginalSchema( + CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); + dataSinkWriterOperatorHarness.registerEvolvedSchema( + CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); + + // Add column + AddColumnEvent.ColumnWithPosition columnWithPosition = + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("col3", DataTypes.STRING())); + AddColumnEvent addColumnEvent = + new AddColumnEvent( + CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition)); + + // test AddColumnEvent + processSchemaChangeEvent( + dataSinkWriterOperator, + dataSinkWriterOperatorHarness, + CUSTOMERS_TABLEID, + addColumnEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, + Arrays.asList( + new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA), + addColumnEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + + // Insert + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + CUSTOMERS_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("2"), + new BinaryStringData("3"), + })); + + // test DataChangeEvent + processDataChangeEvent(dataSinkWriterOperator, insertEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, Collections.singletonList(insertEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + } + } + + /** + * This case tests the schema evolution process for handling data change events by a sink + * operator under normal conditions. + */ + @Test + public void testDataChangeEvent() throws Exception { + DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter(); + try (RegularEventOperatorTestHarness + dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) { + // Create CreateTableEvent + CreateTableEvent createTableEvent = + new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); + // test CreateTableEvent + processSchemaChangeEvent( + dataSinkWriterOperator, + dataSinkWriterOperatorHarness, + CUSTOMERS_TABLEID, + createTableEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, Collections.singletonList(createTableEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + + // Insert + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) CUSTOMERS_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + CUSTOMERS_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), new BinaryStringData("2") + })); + + // test DataChangeEvent + processDataChangeEvent(dataSinkWriterOperator, insertEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, Collections.singletonList(insertEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + } + } + + /** + * This case tests the data change process for handling schema change events by a sink operator + * after failover. + */ + @Test + public void testDataChangeEventAfterFailover() throws Exception { + DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter(); + try (RegularEventOperatorTestHarness + dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) { + dataSinkWriterOperatorHarness.registerOriginalSchema( + CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); + dataSinkWriterOperatorHarness.registerEvolvedSchema( + CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA); + + // Insert + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + CUSTOMERS_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("2"), + new BinaryStringData("3"), + })); + + // test DataChangeEvent + processDataChangeEvent(dataSinkWriterOperator, insertEvent); + assertOutputEvents( + dataSinkWriterOperatorHarness, + Arrays.asList( + new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA), + insertEvent)); + dataSinkWriterOperatorHarness.clearOutputRecords(); + } + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java index 7c0617c23..ce44c26e5 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.DefaultDataChangeEventHashFunctionProvider; @@ -33,6 +34,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.jupiter.api.Test; +import java.util.Collections; + import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link RegularPrePartitionOperator}. */ @@ -80,7 +83,11 @@ class PrePartitionOperatorTest { // FlushEvent RegularPrePartitionOperator operator = testHarness.getOperator(); - FlushEvent flushEvent = new FlushEvent(0); + FlushEvent flushEvent = + new FlushEvent( + 0, + Collections.singletonList(CUSTOMERS), + SchemaChangeEventType.CREATE_TABLE); operator.processElement(new StreamRecord<>(flushEvent)); assertThat(testHarness.getOutputRecords()).hasSize(DOWNSTREAM_PARALLELISM); for (int i = 0; i < DOWNSTREAM_PARALLELISM; i++) { diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java index 7983ab737..5249af155 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/EventSerializerTest.java @@ -20,9 +20,12 @@ package org.apache.flink.cdc.runtime.serializer.event; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import java.util.Arrays; +import java.util.Collections; import java.util.stream.Stream; /** A test for the {@link EventSerializer}. */ @@ -44,7 +47,20 @@ public class EventSerializerTest extends SerializerTestBase { @Override protected Event[] getTestData() { - Event[] flushEvents = new Event[] {new FlushEvent(1), new FlushEvent(2), new FlushEvent(3)}; + Event[] flushEvents = + new Event[] { + new FlushEvent(1, Collections.emptyList(), SchemaChangeEventType.CREATE_TABLE), + new FlushEvent( + 2, + Collections.singletonList(TableId.tableId("schema", "table")), + SchemaChangeEventType.CREATE_TABLE), + new FlushEvent( + 3, + Arrays.asList( + TableId.tableId("schema", "table"), + TableId.tableId("namespace", "schema", "table")), + SchemaChangeEventType.CREATE_TABLE) + }; Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData(); Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData(); return Stream.concat( diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java index 777ec22f3..59338c6c1 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/event/PartitioningEventSerializerTest.java @@ -20,11 +20,14 @@ package org.apache.flink.cdc.runtime.serializer.event; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -48,7 +51,20 @@ class PartitioningEventSerializerTest extends SerializerTestBase, E extends Event> implements AutoCloseable { + private static final Logger LOG = + LoggerFactory.getLogger(RegularEventOperatorTestHarness.class); public static final OperatorID SCHEMA_OPERATOR_ID = new OperatorID(15213L, 15513L); public static final OperatorID SINK_OPERATOR_ID = new OperatorID(15214L, 15514L); @@ -212,6 +224,47 @@ public class RegularEventOperatorTestHarness Schema Registry is busy now, waiting for next request...", 0); + Thread.sleep(1000); + } else if (response.isWaitingForFlush()) { + LOG.info( + "{}> Schema change event has not collected enough flush success events from writers, waiting...", + 0); + Thread.sleep(1000); + } else { + return response; + } + } else { + throw new TimeoutException("Timeout when requesting schema change."); + } + } + } + public Schema getLatestOriginalSchema(TableId tableId) throws Exception { return ((GetOriginalSchemaResponse) unwrap(