[hotfix][cdc-runtime] Fix schema registry hanging in multiple parallelism

pull/3582/head
yuxiqian 7 months ago committed by Leonard Xu
parent dacbe7c34a
commit 803d438657

@ -301,7 +301,7 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
@ -387,7 +387,7 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
@ -473,7 +473,7 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5, Eve, 5 -> Eve], after=[5, Eva, 5 -> Eva], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6, Fiona, 6 -> Fiona], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6, Fiona, 6 -> Fiona], after=[], op=DELETE, meta=()}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={name=VARCHAR(17)}, oldTypeMapping={name=STRING}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={name=VARCHAR(17)}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7, Gem, 7 -> Gem], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8, Helen, 8 -> Helen], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8, Helen, 8 -> Helen], after=[8, Harry, 8 -> Harry], op=UPDATE, meta=()}",
@ -559,7 +559,7 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 6 -> Fiona], after=[], op=DELETE, meta=()}",
// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 7 -> Gem], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 8 -> Helen], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 8 -> Helen], after=[5th, 8, Harry, 18.0, -3, 8 -> Harry], op=UPDATE, meta=()}",
@ -651,7 +651,7 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6 -> Fiona, 3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7 -> Gem, 4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8 -> Helen, 5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8 -> Helen, 5th, 8, Helen, 18.0, -2], after=[8 -> Harry, 5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",

@ -296,22 +296,11 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
stat.execute("ALTER TABLE products DROP COLUMN new_column;");
stat.execute(
"INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114
// Test TruncateTableEvent
stat.execute("TRUNCATE TABLE products;");
// Test DropTableEvent. It's all over.
stat.execute("DROP TABLE products;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
waitUntilSpecificEvent(
String.format(
"DropTableEvent{tableId=%s.products}",
mysqlInventoryDatabase.getDatabaseName()));
validateResult(
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}",
@ -321,14 +310,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}",
"AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}",
"AlterColumnTypeEvent{tableId=%s.products, nameMapping={new_col=BIGINT}}",
"DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}",
"DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}",
"DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.products}",
"DropTableEvent{tableId=%s.products}");
"DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}");
}
private void validateResult(String... expectedEvents) throws Exception {

@ -97,13 +97,11 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}",
"DropTableEvent{tableId=%s.members}"));
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}"));
}
@Test
@ -186,10 +184,9 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}"));
}
@ -204,14 +201,12 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, null, 1026169, age < 20], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, null, 1028196, age < 20], op=INSERT, meta=()}"),
Arrays.asList(
"Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.",
"Ignored schema change DropTableEvent{tableId=%s.members} to table %s.members."));
Collections.singletonList(
"Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members."));
}
@Test
@ -352,13 +347,6 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
// triggers TruncateTableEvent
stmt.execute("TRUNCATE TABLE members;");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
// triggers DropTableEvent
stmt.execute("DROP TABLE members;");
}
List<String> expectedTmEvents =

@ -912,7 +912,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
validateEvents(
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`LAST` VARCHAR(17), position=AFTER, existedColumnName=NAMEALPHA}]}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008, 8, 8, 80, 17, Jazz, Last, id -> 3008], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}",
"AlterColumnTypeEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=DOUBLE}}",
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}",
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009, 9, 9.0, 90, 18, Keka, Finale, id -> 3009], op=INSERT, meta=()}",
@ -1019,7 +1019,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}",
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3008 <- id, First, 3008, 8, 8, 80, 17, Jazz], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}",
"AlterColumnTypeEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=DOUBLE}}",
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}",
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3009 <- id, 1st, 3009, 9, 9.0, 90, 18, Keka], op=INSERT, meta=()}",

@ -19,10 +19,12 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@ -106,87 +108,73 @@ public class SchemaManager {
public final boolean isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
TableId tableId = event.tableId();
Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
return Boolean.TRUE.equals(
SchemaChangeEventVisitor.visit(
event,
addColumnEvent -> {
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
List<Column> existedColumns = latestSchema.get().getColumns();
// It has been applied only if all columns are present in existedColumns
for (AddColumnEvent.ColumnWithPosition column :
addColumnEvent.getAddedColumns()) {
if (!existedColumns.contains(column.getAddColumn())) {
return false;
}
}
return true;
},
alterColumnTypeEvent -> {
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
Schema schema = latestSchema.get();
// It has been applied only if all column types are set as expected
for (Map.Entry<String, DataType> entry :
alterColumnTypeEvent.getTypeMapping().entrySet()) {
if (!schema.getColumn(entry.getKey()).isPresent()
|| !schema.getColumn(entry.getKey())
.get()
.getType()
.equals(entry.getValue())) {
return false;
}
}
return true;
},
createTableEvent -> {
// It has been applied if such table already exists
return latestSchema.isPresent();
},
dropColumnEvent -> {
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
List<String> existedColumnNames = latestSchema.get().getColumnNames();
// It has been applied only if corresponding column types do not exist
return dropColumnEvent.getDroppedColumnNames().stream()
.noneMatch(existedColumnNames::contains);
},
dropTableEvent -> {
// It has been applied if such table does not exist
return !latestSchema.isPresent();
},
renameColumnEvent -> {
// It has been applied if such table already exists
if (!latestSchema.isPresent()) {
return false;
}
List<String> existedColumnNames = latestSchema.get().getColumnNames();
// It has been applied only if all previous names do not exist, and all
// new names already exist
for (Map.Entry<String, String> entry :
renameColumnEvent.getNameMapping().entrySet()) {
if (existedColumnNames.contains(entry.getKey())
|| !existedColumnNames.contains(entry.getValue())) {
return false;
}
}
return true;
},
truncateTableEvent -> {
// We have no way to ensure if a TruncateTableEvent has been applied
// before. Just assume it's not.
return false;
}));
if (event instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
if (!latestSchema.isPresent()) {
return false;
}
List<Column> existedColumns = latestSchema.get().getColumns();
// It has been applied only if all columns are present in existedColumns
for (AddColumnEvent.ColumnWithPosition column : addColumnEvent.getAddedColumns()) {
if (!existedColumns.contains(column.getAddColumn())) {
return false;
}
}
return true;
} else if (event instanceof AlterColumnTypeEvent) {
AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event;
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
Schema schema = latestSchema.get();
// It has been applied only if all column types are set as expected
for (Map.Entry<String, DataType> entry :
alterColumnTypeEvent.getTypeMapping().entrySet()) {
if (!schema.getColumn(entry.getKey()).isPresent()
|| !schema.getColumn(entry.getKey())
.get()
.getType()
.equals(entry.getValue())) {
return false;
}
}
return true;
} else if (event instanceof CreateTableEvent) {
return latestSchema.isPresent();
} else if (event instanceof DropColumnEvent) {
DropColumnEvent dropColumnEvent = (DropColumnEvent) event;
if (!latestSchema.isPresent()) {
return false;
}
List<String> existedColumnNames = latestSchema.get().getColumnNames();
// It has been applied only if corresponding column types do not exist
return dropColumnEvent.getDroppedColumnNames().stream()
.noneMatch(existedColumnNames::contains);
} else if (event instanceof RenameColumnEvent) {
RenameColumnEvent renameColumnEvent = (RenameColumnEvent) event;
// It has been applied if such table already exists
if (!latestSchema.isPresent()) {
return false;
}
List<String> existedColumnNames = latestSchema.get().getColumnNames();
// It has been applied only if all previous names do not exist, and all
// new names already exist
for (Map.Entry<String, String> entry : renameColumnEvent.getNameMapping().entrySet()) {
if (existedColumnNames.contains(entry.getKey())
|| !existedColumnNames.contains(entry.getValue())) {
return false;
}
}
return true;
} else {
throw new RuntimeException("Unknown schema change event: " + event);
}
}
public final boolean schemaExists(

@ -104,6 +104,12 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
private SchemaChangeBehavior schemaChangeBehavior;
/**
* Current parallelism. Use this to verify if Schema Registry has collected enough flush success
* events from sink operators.
*/
private int currentParallelism;
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
@ -135,7 +141,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
public void start() throws Exception {
LOG.info("Starting SchemaRegistry for {}.", operatorName);
this.failedReasons.clear();
LOG.info("Started SchemaRegistry for {}.", operatorName);
this.currentParallelism = context.currentParallelism();
LOG.info(
"Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism);
}
@Override
@ -155,7 +163,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {

@ -25,7 +25,6 @@ import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
@ -48,11 +47,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@ -103,8 +102,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;
this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
@ -122,7 +121,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
"Received schema change event request {} from table {}. Start to buffer requests for others.",
"Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
@ -134,7 +133,11 @@ public class SchemaRegistryRequestHandler implements Closeable {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated.");
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
@ -149,22 +152,13 @@ public class SchemaRegistryRequestHandler implements Closeable {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored.");
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}
// Backfill pre-schema info for sink applying
derivedSchemaChangeEvents.forEach(
e -> {
if (e instanceof SchemaChangeEventWithPreSchema) {
SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e;
if (!pe.hasPreSchema()) {
schemaManager
.getLatestEvolvedSchema(pe.tableId())
.ifPresent(pe::fillPreSchema);
}
}
});
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
@ -220,7 +214,11 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes");
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
currentDerivedSchemaChangeEvents);
}
/**
@ -239,13 +237,21 @@ public class SchemaRegistryRequestHandler implements Closeable {
* @param tableId the subtask in SchemaOperator and table that the FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
public void flushSuccess(TableId tableId, int sinkSubtask) {
public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
flushedSinkWriters.add(sinkSubtask);
if (activeSinkWriters.size() < parallelism) {
LOG.info(
"Not all active sink writers have been registered. Current {}, expected {}.",
activeSinkWriters.size(),
parallelism);
return;
}
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents");
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ schemaChangeStatus);
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
@ -259,6 +265,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
LOG.info(
"SchemaChangeStatus switched from FINISHED to IDLE for request {}",
currentDerivedSchemaChangeEvents);
// This request has been finished, return it and prepare for the next request
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
@ -379,10 +389,6 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
return events;
}
case DROP_TABLE:
// We don't drop any tables in Lenient mode.
LOG.info("A drop table event {} has been ignored in Lenient mode.", event);
return Collections.emptyList();
default:
return Collections.singletonList(event);
}

Loading…
Cancel
Save