[FLINK-36701][cdc-runtime] Obtain latest evolvedSchema when SinkDataWriterOperator handles FlushEvent from failover

This closes #3802

Co-authored-by: jzjsnow <snow.jiangzj@gmail.com>
pull/3846/head
Jzjsnow 3 weeks ago committed by GitHub
parent dd865bf440
commit 630e0d70c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,6 +17,7 @@
package org.apache.flink.cdc.common.event; package org.apache.flink.cdc.common.event;
import java.util.List;
import java.util.Objects; import java.util.Objects;
/** /**
@ -24,18 +25,36 @@ import java.util.Objects;
* start flushing. * start flushing.
*/ */
public class FlushEvent implements Event { public class FlushEvent implements Event {
/** The sink table(s) that need to be flushed. */
private final List<TableId> tableIds;
/** Which subTask ID this FlushEvent was initiated from. */ /** Which subTask ID this FlushEvent was initiated from. */
private final int sourceSubTaskId; private final int sourceSubTaskId;
public FlushEvent(int sourceSubTaskId) { /** Which type of schema change event caused this FlushEvent. */
private final SchemaChangeEventType schemaChangeEventType;
public FlushEvent(
int sourceSubTaskId,
List<TableId> tableIds,
SchemaChangeEventType schemaChangeEventType) {
this.tableIds = tableIds;
this.sourceSubTaskId = sourceSubTaskId; this.sourceSubTaskId = sourceSubTaskId;
this.schemaChangeEventType = schemaChangeEventType;
}
public List<TableId> getTableIds() {
return tableIds;
} }
public int getSourceSubTaskId() { public int getSourceSubTaskId() {
return sourceSubTaskId; return sourceSubTaskId;
} }
public SchemaChangeEventType getSchemaChangeEventType() {
return schemaChangeEventType;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
@ -45,7 +64,9 @@ public class FlushEvent implements Event {
return false; return false;
} }
FlushEvent that = (FlushEvent) o; FlushEvent that = (FlushEvent) o;
return sourceSubTaskId == that.sourceSubTaskId; return sourceSubTaskId == that.sourceSubTaskId
&& Objects.equals(tableIds, that.tableIds)
&& Objects.equals(schemaChangeEventType, that.schemaChangeEventType);
} }
@Override @Override
@ -55,6 +76,13 @@ public class FlushEvent implements Event {
@Override @Override
public String toString() { public String toString() {
return "FlushEvent{" + "sourceSubTaskId=" + sourceSubTaskId + '}'; return "FlushEvent{"
+ "sourceSubTaskId="
+ sourceSubTaskId
+ ", tableIds="
+ tableIds
+ ", schemaChangeEventType="
+ schemaChangeEventType
+ '}';
} }
} }

@ -124,7 +124,10 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event>
output.collect( output.collect(
new StreamRecord<>( new StreamRecord<>(
new BucketWrapperFlushEvent( new BucketWrapperFlushEvent(
currentTaskNumber, ((FlushEvent) event).getSourceSubTaskId()))); currentTaskNumber,
((FlushEvent) event).getSourceSubTaskId(),
((FlushEvent) event).getTableIds(),
((FlushEvent) event).getSchemaChangeEventType())));
return; return;
} }

@ -21,7 +21,10 @@ import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.serializer.EnumSerializer; import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
import org.apache.flink.cdc.runtime.serializer.ListSerializer;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
@ -40,8 +43,10 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event>
private final EventSerializer eventSerializer = EventSerializer.INSTANCE; private final EventSerializer eventSerializer = EventSerializer.INSTANCE;
private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; private final ListSerializer<TableId> tableIdListSerializer =
new ListSerializer<>(TableIdSerializer.INSTANCE);
private final EnumSerializer<SchemaChangeEventType> schemaChangeEventTypeEnumSerializer =
new EnumSerializer<>(SchemaChangeEventType.class);
/** Sharable instance of the TableIdSerializer. */ /** Sharable instance of the TableIdSerializer. */
public static final BucketWrapperEventSerializer INSTANCE = new BucketWrapperEventSerializer(); public static final BucketWrapperEventSerializer INSTANCE = new BucketWrapperEventSerializer();
@ -82,6 +87,9 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event>
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event; BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket()); dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId()); dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
tableIdListSerializer.serialize(bucketWrapperFlushEvent.getTableIds(), dataOutputView);
schemaChangeEventTypeEnumSerializer.serialize(
bucketWrapperFlushEvent.getSchemaChangeEventType(), dataOutputView);
} }
} }
@ -89,7 +97,11 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event>
public Event deserialize(DataInputView source) throws IOException { public Event deserialize(DataInputView source) throws IOException {
EventClass eventClass = enumSerializer.deserialize(source); EventClass eventClass = enumSerializer.deserialize(source);
if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) { if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
return new BucketWrapperFlushEvent(source.readInt(), source.readInt()); return new BucketWrapperFlushEvent(
source.readInt(),
source.readInt(),
tableIdListSerializer.deserialize(source),
schemaChangeEventTypeEnumSerializer.deserialize(source));
} else { } else {
return new BucketWrapperChangeEvent( return new BucketWrapperChangeEvent(
source.readInt(), (ChangeEvent) eventSerializer.deserialize(source)); source.readInt(), (ChangeEvent) eventSerializer.deserialize(source));

@ -18,7 +18,10 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket; package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import java.util.List;
import java.util.Objects; import java.util.Objects;
/** A wrapper class for {@link FlushEvent} to attach bucket id. */ /** A wrapper class for {@link FlushEvent} to attach bucket id. */
@ -26,8 +29,12 @@ public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper
private final int bucket; private final int bucket;
public BucketWrapperFlushEvent(int bucket, int subTaskId) { public BucketWrapperFlushEvent(
super(subTaskId); int bucket,
int subTaskId,
List<TableId> tableIds,
SchemaChangeEventType schemaChangeEventType) {
super(subTaskId, tableIds, schemaChangeEventType);
this.bucket = bucket; this.bucket = bucket;
} }

@ -180,6 +180,6 @@ public class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
expectResult, expectResult,
"ob_products_sink", "ob_products_sink",
new String[] {"id", "name", "description", "weight", "enum_c", "json_c"}, new String[] {"id", "name", "description", "weight", "enum_c", "json_c"},
60000L); 300000L);
} }
} }

@ -150,6 +150,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
// Then, notify this information to the coordinator // Then, notify this information to the coordinator
requestSchemaChange( requestSchemaChange(
tableId,
new SchemaChangeRequest(sourcePartition, subTaskId, schemaChangeEvent)); new SchemaChangeRequest(sourcePartition, subTaskId, schemaChangeEvent));
schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1); schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(1);
} else if (event instanceof DataChangeEvent) { } else if (event instanceof DataChangeEvent) {
@ -188,9 +189,15 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
} }
} }
private void requestSchemaChange(SchemaChangeRequest schemaChangeRequest) { private void requestSchemaChange(
TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
LOG.info("{}> Sent FlushEvent to downstream...", subTaskId); LOG.info("{}> Sent FlushEvent to downstream...", subTaskId);
output.collect(new StreamRecord<>(new FlushEvent(subTaskId))); output.collect(
new StreamRecord<>(
new FlushEvent(
subTaskId,
tableIdRouter.route(sourceTableId),
schemaChangeRequest.getSchemaChangeEvent().getType())));
LOG.info("{}> Sending evolve request...", subTaskId); LOG.info("{}> Sending evolve request...", subTaskId);
SchemaChangeResponse response = sendRequestToCoordinator(schemaChangeRequest); SchemaChangeResponse response = sendRequestToCoordinator(schemaChangeRequest);

@ -164,8 +164,10 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
schemaOperatorMetrics.increaseSchemaChangeEvents(1); schemaOperatorMetrics.increaseSchemaChangeEvents(1);
// First, send FlushEvent or it might be blocked later // First, send FlushEvent or it might be blocked later
List<TableId> sinkTables = router.route(tableId);
LOG.info("{}> Sending the FlushEvent.", subTaskId); LOG.info("{}> Sending the FlushEvent.", subTaskId);
output.collect(new StreamRecord<>(new FlushEvent(subTaskId))); output.collect(
new StreamRecord<>(new FlushEvent(subTaskId, sinkTables, originalEvent.getType())));
// Then, queue to request schema change to SchemaCoordinator. // Then, queue to request schema change to SchemaCoordinator.
SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent); SchemaChangeResponse response = requestSchemaChange(tableId, originalEvent);

@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
@ -112,6 +113,20 @@ public class DataSinkFunctionOperator extends StreamSink<Event> {
// ----------------------------- Helper functions ------------------------------- // ----------------------------- Helper functions -------------------------------
private void handleFlushEvent(FlushEvent event) throws Exception { private void handleFlushEvent(FlushEvent event) throws Exception {
userFunction.finish(); userFunction.finish();
if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
event.getTableIds().stream()
.filter(tableId -> !processedTableIds.contains(tableId))
.forEach(
tableId -> {
LOG.info("Table {} has not been processed", tableId);
try {
emitLatestSchema(tableId);
} catch (Exception e) {
throw new RuntimeException(e);
}
processedTableIds.add(tableId);
});
}
schemaEvolutionClient.notifyFlushSuccess( schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId()); getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
} }

@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
@ -198,6 +199,20 @@ public class DataSinkWriterOperator<CommT> extends AbstractStreamOperator<Commit
private void handleFlushEvent(FlushEvent event) throws Exception { private void handleFlushEvent(FlushEvent event) throws Exception {
copySinkWriter.flush(false); copySinkWriter.flush(false);
if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
event.getTableIds().stream()
.filter(tableId -> !processedTableIds.contains(tableId))
.forEach(
tableId -> {
LOG.info("Table {} has not been processed", tableId);
try {
emitLatestSchema(tableId);
} catch (Exception e) {
throw new RuntimeException(e);
}
processedTableIds.add(tableId);
});
}
schemaEvolutionClient.notifyFlushSuccess( schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId()); getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
} }

@ -24,7 +24,10 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.serializer.EnumSerializer; import org.apache.flink.cdc.runtime.serializer.EnumSerializer;
import org.apache.flink.cdc.runtime.serializer.ListSerializer;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
@ -42,11 +45,14 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {
private final SchemaChangeEventSerializer schemaChangeEventSerializer = private final SchemaChangeEventSerializer schemaChangeEventSerializer =
SchemaChangeEventSerializer.INSTANCE; SchemaChangeEventSerializer.INSTANCE;
private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; private final ListSerializer<TableId> listSerializer =
new ListSerializer<>(TableIdSerializer.INSTANCE);
private final EnumSerializer<EventClass> enumSerializer = private final EnumSerializer<EventClass> enumSerializer =
new EnumSerializer<>(EventClass.class); new EnumSerializer<>(EventClass.class);
private final TypeSerializer<DataChangeEvent> dataChangeEventSerializer = private final TypeSerializer<DataChangeEvent> dataChangeEventSerializer =
DataChangeEventSerializer.INSTANCE; DataChangeEventSerializer.INSTANCE;
private final EnumSerializer<SchemaChangeEventType> schemaChangeEventTypeEnumSerializer =
new EnumSerializer<>(SchemaChangeEventType.class);
@Override @Override
public boolean isImmutableType() { public boolean isImmutableType() {
@ -62,7 +68,11 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {
public Event copy(Event from) { public Event copy(Event from) {
if (from instanceof FlushEvent) { if (from instanceof FlushEvent) {
FlushEvent flushEvent = (FlushEvent) from; FlushEvent flushEvent = (FlushEvent) from;
return new FlushEvent(((FlushEvent) from).getSourceSubTaskId()); return new FlushEvent(
flushEvent.getSourceSubTaskId(),
listSerializer.copy(((FlushEvent) from).getTableIds()),
schemaChangeEventTypeEnumSerializer.copy(
flushEvent.getSchemaChangeEventType()));
} else if (from instanceof SchemaChangeEvent) { } else if (from instanceof SchemaChangeEvent) {
return schemaChangeEventSerializer.copy((SchemaChangeEvent) from); return schemaChangeEventSerializer.copy((SchemaChangeEvent) from);
} else if (from instanceof DataChangeEvent) { } else if (from instanceof DataChangeEvent) {
@ -86,6 +96,9 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {
if (record instanceof FlushEvent) { if (record instanceof FlushEvent) {
enumSerializer.serialize(EventClass.FLUSH_EVENT, target); enumSerializer.serialize(EventClass.FLUSH_EVENT, target);
target.writeInt(((FlushEvent) record).getSourceSubTaskId()); target.writeInt(((FlushEvent) record).getSourceSubTaskId());
listSerializer.serialize(((FlushEvent) record).getTableIds(), target);
schemaChangeEventTypeEnumSerializer.serialize(
((FlushEvent) record).getSchemaChangeEventType(), target);
} else if (record instanceof SchemaChangeEvent) { } else if (record instanceof SchemaChangeEvent) {
enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target); enumSerializer.serialize(EventClass.SCHEME_CHANGE_EVENT, target);
schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target); schemaChangeEventSerializer.serialize((SchemaChangeEvent) record, target);
@ -102,7 +115,10 @@ public final class EventSerializer extends TypeSerializerSingleton<Event> {
EventClass eventClass = enumSerializer.deserialize(source); EventClass eventClass = enumSerializer.deserialize(source);
switch (eventClass) { switch (eventClass) {
case FLUSH_EVENT: case FLUSH_EVENT:
return new FlushEvent(source.readInt()); return new FlushEvent(
source.readInt(),
listSerializer.deserialize(source),
schemaChangeEventTypeEnumSerializer.deserialize(source));
case DATA_CHANGE_EVENT: case DATA_CHANGE_EVENT:
return dataChangeEventSerializer.deserialize(source); return dataChangeEventSerializer.deserialize(source);
case SCHEME_CHANGE_EVENT: case SCHEME_CHANGE_EVENT:

@ -181,16 +181,24 @@ public class SchemaEvolveTest extends SchemaTestBase {
})) }))
.map(StreamRecord::getValue) .map(StreamRecord::getValue)
.containsExactly( .containsExactly(
new FlushEvent(0), new FlushEvent(
0, Collections.singletonList(TABLE_ID), createTableEvent.getType()),
createTableEvent, createTableEvent,
genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"), genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"),
new FlushEvent(0), new FlushEvent(
0, Collections.singletonList(TABLE_ID), addColumnEvent.getType()),
addColumnEventAtLast, addColumnEventAtLast,
genInsert(TABLE_ID, "ISFSB", 2, "Bob", 31.415926f, "Bye-bye", false), genInsert(TABLE_ID, "ISFSB", 2, "Bob", 31.415926f, "Bye-bye", false),
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(TABLE_ID),
renameColumnEvent.getType()),
appendRenamedColumnAtLast, appendRenamedColumnAtLast,
genInsert(TABLE_ID, "ISFSBS", 3, "Cicada", 123.456f, null, true, "Ok"), genInsert(TABLE_ID, "ISFSBS", 3, "Cicada", 123.456f, null, true, "Ok"),
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(TABLE_ID),
alterColumnTypeEvent.getType()),
alterColumnTypeEventWithBackfill, alterColumnTypeEventWithBackfill,
genInsert( genInsert(
TABLE_ID, TABLE_ID,
@ -201,11 +209,16 @@ public class SchemaEvolveTest extends SchemaTestBase {
null, null,
false, false,
"Nah"), "Nah"),
new FlushEvent(0), new FlushEvent(
0, Collections.singletonList(TABLE_ID), dropColumnEvent.getType()),
genInsert(TABLE_ID, "ISDSBS", 5, "Eve", 1.414, null, true, null), genInsert(TABLE_ID, "ISDSBS", 5, "Eve", 1.414, null, true, null),
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(TABLE_ID),
truncateTableEvent.getType()),
genInsert(TABLE_ID, "ISDSBS", 6, "Ferris", 0.001, null, false, null), genInsert(TABLE_ID, "ISDSBS", 6, "Ferris", 0.001, null, false, null),
new FlushEvent(0)); new FlushEvent(
0, Collections.singletonList(TABLE_ID), dropTableEvent.getType()));
} }
@Test @Test
@ -308,7 +321,8 @@ public class SchemaEvolveTest extends SchemaTestBase {
})) }))
.map(StreamRecord::getValue) .map(StreamRecord::getValue)
.containsExactly( .containsExactly(
new FlushEvent(0), new FlushEvent(
0, Collections.singletonList(TABLE_ID), createTableEvent.getType()),
createTableEvent, createTableEvent,
genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"), genInsert(TABLE_ID, "ISFS", 1, "Alice", 17.1828f, "Hello"),
genInsert(TABLE_ID, "ISFS", 2, "Bob", 31.415926f, "Bye-bye"), genInsert(TABLE_ID, "ISFS", 2, "Bob", 31.415926f, "Bye-bye"),

@ -112,7 +112,11 @@ public class SchemaEvolveTest {
Assertions.assertThat( Assertions.assertThat(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)) createAndInsertDataEvents))
.isEqualTo( .isEqualTo(
harness.getOutputRecords().stream() harness.getOutputRecords().stream()
@ -172,7 +176,12 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), addColumnEvents)); Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN)),
addColumnEvents));
Schema schemaV2 = Schema schemaV2 =
Schema.newBuilder() Schema.newBuilder()
@ -230,7 +239,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN)),
renameColumnEvents)); renameColumnEvents));
Schema schemaV3 = Schema schemaV3 =
@ -275,7 +288,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ALTER_COLUMN_TYPE)),
alterColumnTypeEvents)); alterColumnTypeEvents));
Schema schemaV4 = Schema schemaV4 =
@ -311,7 +328,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN)),
dropColumnEvents)); dropColumnEvents));
Schema schemaV5 = Schema schemaV5 =
@ -373,7 +394,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)); createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@ -429,7 +454,12 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), addColumnEvents)); Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN)),
addColumnEvents));
Schema schemaV2 = Schema schemaV2 =
Schema.newBuilder() Schema.newBuilder()
@ -487,7 +517,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN)),
renameColumnEvents)); renameColumnEvents));
Schema schemaV3 = Schema schemaV3 =
@ -532,7 +566,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ALTER_COLUMN_TYPE)),
alterColumnTypeEvents)); alterColumnTypeEvents));
Schema schemaV4 = Schema schemaV4 =
@ -568,7 +606,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN)),
dropColumnEvents)); dropColumnEvents));
Schema schemaV5 = Schema schemaV5 =
@ -630,7 +672,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)); createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@ -720,7 +766,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)); createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@ -772,7 +822,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, tableId,
buildRecord(INT, 4, STRING, "Derrida", SMALLINT, (short) 20)), buildRecord(INT, 4, STRING, "Derrida", SMALLINT, (short) 20)),
@ -840,7 +893,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, tableId,
buildRecord(INT, 6, STRING, null, SMALLINT, (short) 22)), buildRecord(INT, 6, STRING, null, SMALLINT, (short) 22)),
@ -889,7 +945,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ALTER_COLUMN_TYPE),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, buildRecord(INT, 8, STRING, null, SMALLINT, null)), tableId, buildRecord(INT, 8, STRING, null, SMALLINT, null)),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
@ -930,7 +989,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, buildRecord(INT, 12, STRING, null, DOUBLE, null)), tableId, buildRecord(INT, 12, STRING, null, DOUBLE, null)),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
@ -1006,7 +1068,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)); createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@ -1105,7 +1171,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)); createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@ -1156,7 +1226,11 @@ public class SchemaEvolveTest {
processEvent(schemaOperator, addColumnEvents); processEvent(schemaOperator, addColumnEvents);
List<Event> expectedEvents = new ArrayList<>(); List<Event> expectedEvents = new ArrayList<>();
expectedEvents.add(new FlushEvent(0)); expectedEvents.add(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN));
expectedEvents.addAll(addColumnEvents); expectedEvents.addAll(addColumnEvents);
Assertions.assertThat( Assertions.assertThat(
@ -1218,7 +1292,11 @@ public class SchemaEvolveTest {
processEvent(schemaOperator, renameColumnEvents); processEvent(schemaOperator, renameColumnEvents);
List<Event> expectedEvents = new ArrayList<>(); List<Event> expectedEvents = new ArrayList<>();
expectedEvents.add(new FlushEvent(0)); expectedEvents.add(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN));
expectedEvents.addAll(renameColumnEvents); expectedEvents.addAll(renameColumnEvents);
Assertions.assertThat( Assertions.assertThat(
@ -1263,7 +1341,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ALTER_COLUMN_TYPE),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, tableId,
buildRecord( buildRecord(
@ -1321,7 +1402,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, tableId,
buildRecord( buildRecord(
@ -1417,7 +1501,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE)),
createAndInsertDataEvents)); createAndInsertDataEvents));
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1); Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@ -1468,7 +1556,11 @@ public class SchemaEvolveTest {
processEvent(schemaOperator, addColumnEvents); processEvent(schemaOperator, addColumnEvents);
List<Event> expectedEvents = new ArrayList<>(); List<Event> expectedEvents = new ArrayList<>();
expectedEvents.add(new FlushEvent(0)); expectedEvents.add(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN));
expectedEvents.addAll(addColumnEvents); expectedEvents.addAll(addColumnEvents);
@ -1531,7 +1623,11 @@ public class SchemaEvolveTest {
processEvent(schemaOperator, renameColumnEvents); processEvent(schemaOperator, renameColumnEvents);
List<Event> expectedEvents = new ArrayList<>(); List<Event> expectedEvents = new ArrayList<>();
expectedEvents.add(new FlushEvent(0)); expectedEvents.add(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN));
expectedEvents.addAll(renameColumnEvents); expectedEvents.addAll(renameColumnEvents);
Assertions.assertThat( Assertions.assertThat(
@ -1576,7 +1672,10 @@ public class SchemaEvolveTest {
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
new FlushEvent(0), new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ALTER_COLUMN_TYPE),
DataChangeEvent.insertEvent( DataChangeEvent.insertEvent(
tableId, tableId,
buildRecord( buildRecord(
@ -1633,7 +1732,11 @@ public class SchemaEvolveTest {
processEvent(schemaOperator, dropColumnEvents); processEvent(schemaOperator, dropColumnEvents);
FlushEvent result; FlushEvent result;
result = new FlushEvent(0); result =
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN);
List<Event> expectedEvents = List<Event> expectedEvents =
Arrays.asList( Arrays.asList(
result, result,
@ -1718,7 +1821,11 @@ public class SchemaEvolveTest {
FlushEvent result; FlushEvent result;
TableId tableId1 = tableId; TableId tableId1 = tableId;
Event schemaChangeEvent = createAndInsertDataEvents.get(0); Event schemaChangeEvent = createAndInsertDataEvents.get(0);
result = new FlushEvent(0); result =
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE);
Assertions.assertThat( Assertions.assertThat(
harness.getOutputRecords().stream() harness.getOutputRecords().stream()
.map(StreamRecord::getValue) .map(StreamRecord::getValue)
@ -1780,7 +1887,12 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), addColumnEvents)); Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN)),
addColumnEvents));
Schema schemaV2 = Schema schemaV2 =
Schema.newBuilder() Schema.newBuilder()
@ -1885,7 +1997,11 @@ public class SchemaEvolveTest {
int subTaskId = 0; int subTaskId = 0;
TableId tableId1 = tableId; TableId tableId1 = tableId;
Event schemaChangeEvent = renameColumnEvents.get(0); Event schemaChangeEvent = renameColumnEvents.get(0);
result = new FlushEvent(subTaskId); result =
new FlushEvent(
subTaskId,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN);
Assertions.assertThat( Assertions.assertThat(
harness.getOutputRecords().stream() harness.getOutputRecords().stream()
.map(StreamRecord::getValue) .map(StreamRecord::getValue)
@ -1966,7 +2082,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ALTER_COLUMN_TYPE)),
lenientAlterColumnTypeEvents)); lenientAlterColumnTypeEvents));
Schema schemaV4 = Schema schemaV4 =
@ -2027,7 +2147,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN)),
lenientDropColumnEvents)); lenientDropColumnEvents));
Schema schemaV5 = Schema schemaV5 =
@ -2108,7 +2232,11 @@ public class SchemaEvolveTest {
FlushEvent result; FlushEvent result;
TableId tableId1 = tableId; TableId tableId1 = tableId;
Event schemaChangeEvent = createAndInsertDataEvents.get(0); Event schemaChangeEvent = createAndInsertDataEvents.get(0);
result = new FlushEvent(0); result =
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.CREATE_TABLE);
Assertions.assertThat( Assertions.assertThat(
harness.getOutputRecords().stream() harness.getOutputRecords().stream()
.map(StreamRecord::getValue) .map(StreamRecord::getValue)
@ -2156,7 +2284,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.DROP_COLUMN)),
lenientDropColumnEvents)); lenientDropColumnEvents));
Schema schemaV2 = Schema schemaV2 =
@ -2271,7 +2403,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.ADD_COLUMN)),
lenientAddColumnEvents)); lenientAddColumnEvents));
Schema schemaV3 = Schema schemaV3 =
@ -2389,7 +2525,11 @@ public class SchemaEvolveTest {
.collect(Collectors.toList())) .collect(Collectors.toList()))
.isEqualTo( .isEqualTo(
ListUtils.union( ListUtils.union(
Collections.singletonList(new FlushEvent(0)), Collections.singletonList(
new FlushEvent(
0,
Collections.singletonList(tableId),
SchemaChangeEventType.RENAME_COLUMN)),
lenientRenameColumnEvents)); lenientRenameColumnEvents));
Schema schemaV4 = Schema schemaV4 =

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.sink;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
/**
* The DataSinkOperatorAdapter class acts as an adapter for testing the core schema evolution
* process in both {@link DataSinkWriterOperator} and {@link DataSinkFunctionOperator}.
*/
public class DataSinkOperatorAdapter extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event>, BoundedOneInput {
private SchemaEvolutionClient schemaEvolutionClient;
private final OperatorID schemaOperatorID;
/** A set of {@link TableId} that already processed {@link CreateTableEvent}. */
private final Set<TableId> processedTableIds;
public DataSinkOperatorAdapter() {
this.schemaOperatorID = new OperatorID();
this.processedTableIds = new HashSet<>();
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Event>> output) {
super.setup(containingTask, config, output);
schemaEvolutionClient =
new SchemaEvolutionClient(
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
schemaOperatorID);
}
@Override
public void open() throws Exception {}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
schemaEvolutionClient.registerSubtask(getRuntimeContext().getIndexOfThisSubtask());
}
@Override
public void snapshotState(StateSnapshotContext context) {}
@Override
public void processWatermark(Watermark mark) {}
@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) {}
@Override
public void processElement(StreamRecord<Event> element) throws Exception {
Event event = element.getValue();
// FlushEvent triggers flush
if (event instanceof FlushEvent) {
handleFlushEvent(((FlushEvent) event));
return;
}
// CreateTableEvent marks the table as processed directly
if (event instanceof CreateTableEvent) {
processedTableIds.add(((CreateTableEvent) event).tableId());
// replace FlinkWriterOperator/StreamSink and emit the event for testing
output.collect(element);
return;
}
// Check if the table is processed before emitting all other events, because we have to make
// sure that sink have a view of the full schema before processing any change events,
// including schema changes.
ChangeEvent changeEvent = (ChangeEvent) event;
if (!processedTableIds.contains(changeEvent.tableId())) {
emitLatestSchema(changeEvent.tableId());
processedTableIds.add(changeEvent.tableId());
}
processedTableIds.add(changeEvent.tableId());
output.collect(element);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) {}
@Override
public void close() throws Exception {}
@Override
public void endInput() {}
// ----------------------------- Helper functions -------------------------------
private void handleFlushEvent(FlushEvent event) throws Exception {
// omit copySinkWriter/userFunction flush from testing
if (event.getSchemaChangeEventType() != SchemaChangeEventType.CREATE_TABLE) {
event.getTableIds().stream()
.filter(tableId -> !processedTableIds.contains(tableId))
.forEach(
tableId -> {
LOG.info("Table {} has not been processed", tableId);
try {
emitLatestSchema(tableId);
} catch (Exception e) {
throw new RuntimeException(e);
}
processedTableIds.add(tableId);
});
}
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getSourceSubTaskId());
}
private void emitLatestSchema(TableId tableId) throws Exception {
Optional<Schema> schema = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
if (schema.isPresent()) {
// request and process CreateTableEvent because SinkWriter need to retrieve
// Schema to deserialize RecordData after resuming job.
output.collect(new StreamRecord<>(new CreateTableEvent(tableId, schema.get())));
processedTableIds.add(tableId);
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for " + tableId);
}
}
}

@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.sink;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Unit tests for the DataSinkOperator ({@link DataSinkWriterOperator}/{@link
* DataSinkFunctionOperator} handling schema evolution events.
*/
public class DataSinkOperatorWithSchemaEvolveTest {
private static final TableId CUSTOMERS_TABLEID =
TableId.tableId("my_company", "my_branch", "customers");
private static final Schema CUSTOMERS_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
private static final Schema CUSTOMERS_LATEST_SCHEMA =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.physicalColumn("col3", DataTypes.STRING())
.primaryKey("col1")
.build();
public RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event> setupHarness(
DataSinkOperatorAdapter dataSinkWriterOperator) throws Exception {
RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event> harness =
RegularEventOperatorTestHarness.withDuration(
dataSinkWriterOperator, 1, Duration.ofSeconds(3));
// Initialization
harness.open();
return harness;
}
private FlushEvent createFlushEvent(TableId tableId, SchemaChangeEvent postEvent) {
return new FlushEvent(0, Collections.singletonList(tableId), postEvent.getType());
}
private void processSchemaChangeEvent(
DataSinkOperatorAdapter dataSinkWriterOperator,
RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event> schemaOperatorHarness,
TableId tableId,
SchemaChangeEvent event)
throws Exception {
// Create the flush event to process before the schema change event
FlushEvent flushEvent = createFlushEvent(tableId, event);
// Send schema change request to coordinator
schemaOperatorHarness.requestSchemaChangeEvent(tableId, event);
// Send flush event to SinkWriterOperator
dataSinkWriterOperator.processElement(new StreamRecord<>(flushEvent));
// Wait for coordinator to complete the schema change and get the finished schema change
// events
SchemaChangeResponse schemaEvolveResponse =
schemaOperatorHarness.requestSchemaChangeResult(tableId, event);
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getAppliedSchemaChangeEvents();
// Send the schema change events to SinkWriterOperator
finishedSchemaChangeEvents.forEach(
e -> {
try {
dataSinkWriterOperator.processElement(new StreamRecord<>(e));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
}
private void processDataChangeEvent(
DataSinkOperatorAdapter dataSinkWriterOperator, DataChangeEvent event)
throws Exception {
// Send the data change event to SinkWriterOperator
dataSinkWriterOperator.processElement(new StreamRecord<>(event));
}
private void assertOutputEvents(
RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event>
dataSinkWriterOperatorHarness,
List<Event> expectedEvents) {
Assertions.assertThat(
dataSinkWriterOperatorHarness.getOutputRecords().stream()
.map(StreamRecord::getValue)
.collect(Collectors.toList()))
.isEqualTo(expectedEvents);
}
/**
* This case tests the schema evolution process for handling schema change events by a sink
* operator under normal conditions.
*/
@Test
public void testSchemaChangeEvent() throws Exception {
DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter();
try (RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event>
dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) {
// Create CreateTableEvent
CreateTableEvent createTableEvent =
new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
// test processing CreateTableEvent
processSchemaChangeEvent(
dataSinkWriterOperator,
dataSinkWriterOperatorHarness,
CUSTOMERS_TABLEID,
createTableEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness, Collections.singletonList(createTableEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
// Add column
AddColumnEvent.ColumnWithPosition columnWithPosition =
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
new AddColumnEvent(
CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition));
// test processing AddColumnEvent
processSchemaChangeEvent(
dataSinkWriterOperator,
dataSinkWriterOperatorHarness,
CUSTOMERS_TABLEID,
addColumnEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness, Collections.singletonList(addColumnEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
// Insert
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(
((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
CUSTOMERS_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData("2"),
new BinaryStringData("3"),
}));
// test processing DataChangeEvent
processDataChangeEvent(dataSinkWriterOperator, insertEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness, Collections.singletonList(insertEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
}
}
/**
* This case tests the schema evolution process for handling schema change events by a sink
* operator after failover.
*/
@Test
public void testSchemaChangeEventAfterFailover() throws Exception {
DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter();
try (RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event>
dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) {
dataSinkWriterOperatorHarness.registerOriginalSchema(
CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
dataSinkWriterOperatorHarness.registerEvolvedSchema(
CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
// Add column
AddColumnEvent.ColumnWithPosition columnWithPosition =
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
new AddColumnEvent(
CUSTOMERS_TABLEID, Collections.singletonList(columnWithPosition));
// test AddColumnEvent
processSchemaChangeEvent(
dataSinkWriterOperator,
dataSinkWriterOperatorHarness,
CUSTOMERS_TABLEID,
addColumnEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness,
Arrays.asList(
new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA),
addColumnEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
// Insert
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(
((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
CUSTOMERS_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData("2"),
new BinaryStringData("3"),
}));
// test DataChangeEvent
processDataChangeEvent(dataSinkWriterOperator, insertEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness, Collections.singletonList(insertEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
}
}
/**
* This case tests the schema evolution process for handling data change events by a sink
* operator under normal conditions.
*/
@Test
public void testDataChangeEvent() throws Exception {
DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter();
try (RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event>
dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) {
// Create CreateTableEvent
CreateTableEvent createTableEvent =
new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
// test CreateTableEvent
processSchemaChangeEvent(
dataSinkWriterOperator,
dataSinkWriterOperatorHarness,
CUSTOMERS_TABLEID,
createTableEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness, Collections.singletonList(createTableEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
// Insert
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(((RowType) CUSTOMERS_SCHEMA.toRowDataType()));
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
CUSTOMERS_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"), new BinaryStringData("2")
}));
// test DataChangeEvent
processDataChangeEvent(dataSinkWriterOperator, insertEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness, Collections.singletonList(insertEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
}
}
/**
* This case tests the data change process for handling schema change events by a sink operator
* after failover.
*/
@Test
public void testDataChangeEventAfterFailover() throws Exception {
DataSinkOperatorAdapter dataSinkWriterOperator = new DataSinkOperatorAdapter();
try (RegularEventOperatorTestHarness<DataSinkOperatorAdapter, Event>
dataSinkWriterOperatorHarness = setupHarness(dataSinkWriterOperator)) {
dataSinkWriterOperatorHarness.registerOriginalSchema(
CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
dataSinkWriterOperatorHarness.registerEvolvedSchema(
CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
// Insert
BinaryRecordDataGenerator recordDataGenerator =
new BinaryRecordDataGenerator(
((RowType) CUSTOMERS_LATEST_SCHEMA.toRowDataType()));
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
CUSTOMERS_TABLEID,
recordDataGenerator.generate(
new Object[] {
new BinaryStringData("1"),
new BinaryStringData("2"),
new BinaryStringData("3"),
}));
// test DataChangeEvent
processDataChangeEvent(dataSinkWriterOperator, insertEvent);
assertOutputEvents(
dataSinkWriterOperatorHarness,
Arrays.asList(
new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA),
insertEvent));
dataSinkWriterOperatorHarness.clearOutputRecords();
}
}
}

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DefaultDataChangeEventHashFunctionProvider; import org.apache.flink.cdc.common.sink.DefaultDataChangeEventHashFunctionProvider;
@ -33,6 +34,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
/** Unit test for {@link RegularPrePartitionOperator}. */ /** Unit test for {@link RegularPrePartitionOperator}. */
@ -80,7 +83,11 @@ class PrePartitionOperatorTest {
// FlushEvent // FlushEvent
RegularPrePartitionOperator operator = testHarness.getOperator(); RegularPrePartitionOperator operator = testHarness.getOperator();
FlushEvent flushEvent = new FlushEvent(0); FlushEvent flushEvent =
new FlushEvent(
0,
Collections.singletonList(CUSTOMERS),
SchemaChangeEventType.CREATE_TABLE);
operator.processElement(new StreamRecord<>(flushEvent)); operator.processElement(new StreamRecord<>(flushEvent));
assertThat(testHarness.getOutputRecords()).hasSize(DOWNSTREAM_PARALLELISM); assertThat(testHarness.getOutputRecords()).hasSize(DOWNSTREAM_PARALLELISM);
for (int i = 0; i < DOWNSTREAM_PARALLELISM; i++) { for (int i = 0; i < DOWNSTREAM_PARALLELISM; i++) {

@ -20,9 +20,12 @@ package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Stream; import java.util.stream.Stream;
/** A test for the {@link EventSerializer}. */ /** A test for the {@link EventSerializer}. */
@ -44,7 +47,20 @@ public class EventSerializerTest extends SerializerTestBase<Event> {
@Override @Override
protected Event[] getTestData() { protected Event[] getTestData() {
Event[] flushEvents = new Event[] {new FlushEvent(1), new FlushEvent(2), new FlushEvent(3)}; Event[] flushEvents =
new Event[] {
new FlushEvent(1, Collections.emptyList(), SchemaChangeEventType.CREATE_TABLE),
new FlushEvent(
2,
Collections.singletonList(TableId.tableId("schema", "table")),
SchemaChangeEventType.CREATE_TABLE),
new FlushEvent(
3,
Arrays.asList(
TableId.tableId("schema", "table"),
TableId.tableId("namespace", "schema", "table")),
SchemaChangeEventType.CREATE_TABLE)
};
Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData(); Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData();
Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData(); Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData();
return Stream.concat( return Stream.concat(

@ -20,11 +20,14 @@ package org.apache.flink.cdc.runtime.serializer.event;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent; import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -48,7 +51,20 @@ class PartitioningEventSerializerTest extends SerializerTestBase<PartitioningEve
@Override @Override
protected PartitioningEvent[] getTestData() { protected PartitioningEvent[] getTestData() {
Event[] flushEvents = new Event[] {new FlushEvent(1), new FlushEvent(2), new FlushEvent(3)}; Event[] flushEvents =
new Event[] {
new FlushEvent(1, Collections.emptyList(), SchemaChangeEventType.CREATE_TABLE),
new FlushEvent(
2,
Collections.singletonList(TableId.tableId("schema", "table")),
SchemaChangeEventType.CREATE_TABLE),
new FlushEvent(
3,
Arrays.asList(
TableId.tableId("schema", "table"),
TableId.tableId("namespace", "schema", "table")),
SchemaChangeEventType.CREATE_TABLE)
};
Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData(); Event[] dataChangeEvents = new DataChangeEventSerializerTest().getTestData();
Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData(); Event[] schemaChangeEvents = new SchemaChangeEventSerializerTest().getTestData();

@ -19,11 +19,13 @@ package org.apache.flink.cdc.runtime.testutils.operators;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaResponse; import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaResponse;
@ -31,6 +33,8 @@ import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSch
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaResponse; import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.common.event.SinkWriterRegisterEvent; import org.apache.flink.cdc.runtime.operators.schema.common.event.SinkWriterRegisterEvent;
import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator; import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.testutils.schema.CollectingMetadataApplier; import org.apache.flink.cdc.runtime.testutils.schema.CollectingMetadataApplier;
import org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGateway; import org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGateway;
@ -50,6 +54,9 @@ import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.flink.util.OutputTag; import org.apache.flink.util.OutputTag;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -57,9 +64,12 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.unwrap; import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.unwrap;
/** /**
@ -75,6 +85,8 @@ import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationR
*/ */
public class RegularEventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E extends Event> public class RegularEventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E extends Event>
implements AutoCloseable { implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(RegularEventOperatorTestHarness.class);
public static final OperatorID SCHEMA_OPERATOR_ID = new OperatorID(15213L, 15513L); public static final OperatorID SCHEMA_OPERATOR_ID = new OperatorID(15213L, 15513L);
public static final OperatorID SINK_OPERATOR_ID = new OperatorID(15214L, 15514L); public static final OperatorID SINK_OPERATOR_ID = new OperatorID(15214L, 15514L);
@ -212,6 +224,47 @@ public class RegularEventOperatorTestHarness<OP extends AbstractStreamOperator<E
schemaRegistry.emplaceEvolvedSchema(tableId, schema); schemaRegistry.emplaceEvolvedSchema(tableId, schema);
} }
public void registerOriginalSchema(TableId tableId, Schema schema) {
schemaRegistry.emplaceOriginalSchema(tableId, schema);
}
public void registerEvolvedSchema(TableId tableId, Schema schema) {
schemaRegistry.emplaceEvolvedSchema(tableId, schema);
}
public SchemaChangeResponse requestSchemaChangeEvent(TableId tableId, SchemaChangeEvent event)
throws ExecutionException, InterruptedException {
return CoordinationResponseUtils.unwrap(
schemaRegistry
.handleCoordinationRequest(new SchemaChangeRequest(tableId, event, 0))
.get());
}
public SchemaChangeResponse requestSchemaChangeResult(TableId tableId, SchemaChangeEvent event)
throws ExecutionException, InterruptedException, TimeoutException {
long rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
long deadline = System.currentTimeMillis() + rpcTimeOutInMillis;
while (true) {
LOG.info("request schema change result");
SchemaChangeResponse response = requestSchemaChangeEvent(tableId, event);
if (System.currentTimeMillis() < deadline) {
if (response.isRegistryBusy()) {
LOG.info("{}> Schema Registry is busy now, waiting for next request...", 0);
Thread.sleep(1000);
} else if (response.isWaitingForFlush()) {
LOG.info(
"{}> Schema change event has not collected enough flush success events from writers, waiting...",
0);
Thread.sleep(1000);
} else {
return response;
}
} else {
throw new TimeoutException("Timeout when requesting schema change.");
}
}
}
public Schema getLatestOriginalSchema(TableId tableId) throws Exception { public Schema getLatestOriginalSchema(TableId tableId) throws Exception {
return ((GetOriginalSchemaResponse) return ((GetOriginalSchemaResponse)
unwrap( unwrap(

Loading…
Cancel
Save