[FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler thread safe by blocking subsequent schemaChangeEvent

This closes #3563.

Co-authored-by: Hongshun Wang <loserwang1024@gmail.com>
pull/3567/head
yuxiqian 5 months ago committed by GitHub
parent 565032e3af
commit ee843e2f24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -28,6 +28,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
@ -68,7 +69,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
// Used when startup mode is not initial
private boolean alreadySendCreateTableForBinlogSplit = false;
private final List<CreateTableEvent> createTableEventCache;
private List<CreateTableEvent> createTableEventCache;
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@ -80,23 +81,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
sourceConfig.isIncludeSchemaChanges());
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache = new ArrayList<>();
if (!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<TableId> capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema));
}
} catch (SQLException e) {
throw new RuntimeException("Cannot start emitter to fetch table schema.", e);
}
}
this.createTableEventCache = generateCreateTableEvent(sourceConfig);
}
@Override
@ -104,6 +89,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
SourceRecord element, SourceOutput<Event> output, MySqlSplitState splitState)
throws Exception {
if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
// In Snapshot phase of INITIAL startup mode, we lazily send CreateTableEvent to
// downstream to avoid checkpoint timeout.
TableId tableId = splitState.asSnapshotSplitState().toMySqlSplit().getTableId();
if (!alreadySendCreateTableTables.contains(tableId)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
@ -111,11 +98,24 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
alreadySendCreateTableTables.add(tableId);
}
}
} else if (splitState.isBinlogSplitState()
&& !alreadySendCreateTableForBinlogSplit
&& !sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
createTableEventCache.forEach(output::collect);
} else if (splitState.isBinlogSplitState() && !alreadySendCreateTableForBinlogSplit) {
alreadySendCreateTableForBinlogSplit = true;
if (sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
// In Snapshot -> Binlog transition of INITIAL startup mode, ensure all table
// schemas have been sent to downstream. We use previously cached schema instead of
// re-request latest schema because there might be some pending schema change events
// in the queue, and that may accidentally emit evolved schema before corresponding
// schema change events.
createTableEventCache.stream()
.filter(
event ->
!alreadySendCreateTableTables.contains(
MySqlSchemaUtils.toDbzTableId(event.tableId())))
.forEach(output::collect);
} else {
// In Binlog only mode, we simply emit all schemas at once.
createTableEventCache.forEach(output::collect);
}
}
super.processElement(element, output, splitState);
}
@ -233,4 +233,22 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
}
return mySqlAntlrDdlParser;
}
private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<CreateTableEvent> createTableEventCache = new ArrayList<>();
List<TableId> capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema));
}
return createTableEventCache;
} catch (SQLException e) {
throw new RuntimeException("Cannot start emitter to fetch table schema.", e);
}
}
}

@ -295,8 +295,9 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
.executeAndCollect();
// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType))
.isEqualTo(expectedSnapshot);
@ -306,7 +307,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;");
}
List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType))
.isEqualTo(expectedStreamRecord);
@ -397,8 +399,9 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
};
// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, COMMON_TYPES))
.isEqualTo(expectedSnapshot);
@ -412,7 +415,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
Object[] expectedStreamRecord = expectedSnapshot;
List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, COMMON_TYPES))
.isEqualTo(expectedStreamRecord);
@ -437,9 +441,10 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
"Event-Source")
.executeAndCollect();
// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
// skip CreateTableEvents
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType))
.isEqualTo(expectedSnapshot);
@ -450,7 +455,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
"UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
}
List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType))
.isEqualTo(expectedStreamRecord);

@ -62,6 +62,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -226,12 +227,33 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
BinaryStringData.fromString("c-21")
})));
}
// In this configuration, several subtasks might emit their corresponding CreateTableEvent
// to downstream. Since it is not possible to predict how many CreateTableEvents should we
// expect, we simply filter them out from expected sets, and assert there's at least one.
List<Event> actual =
fetchResults(events, 1 + expectedSnapshot.size() + expectedBinlog.size());
assertThat(actual.get(0)).isEqualTo(createTableEvent);
assertThat(actual.subList(1, 10))
fetchResultsExcept(
events, expectedSnapshot.size() + expectedBinlog.size(), createTableEvent);
assertThat(actual.subList(0, expectedSnapshot.size()))
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0]));
assertThat(actual.subList(10, actual.size())).isEqualTo(expectedBinlog);
assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
.isEqualTo(expectedBinlog);
}
private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
List<T> result = new ArrayList<>(size);
List<T> sideResults = new ArrayList<>();
while (size > 0 && iter.hasNext()) {
T event = iter.next();
if (!event.equals(sideEvent)) {
result.add(event);
size--;
} else {
sideResults.add(sideEvent);
}
}
// Also ensure we've received at least one or many side events.
assertThat(sideResults).isNotEmpty();
return result;
}
@Test

@ -17,6 +17,9 @@
package org.apache.flink.cdc.connectors.mysql.testutils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -38,6 +41,22 @@ public class MySqSourceTestUtils {
return result;
}
public static <T> Tuple2<List<T>, List<CreateTableEvent>> fetchResultsAndCreateTableEvent(
Iterator<T> iter, int size) {
List<T> result = new ArrayList<>(size);
List<CreateTableEvent> createTableEvents = new ArrayList<>();
while (size > 0 && iter.hasNext()) {
T event = iter.next();
if (event instanceof CreateTableEvent) {
createTableEvents.add((CreateTableEvent) event);
} else {
result.add(event);
size--;
}
}
return Tuple2.of(result, createTableEvents);
}
public static String getServerId(int parallelism) {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;

@ -194,12 +194,13 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
+ " table.create.properties.replication_num: 1\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
DORIS.getUsername(),
DORIS.getPassword());
DORIS.getPassword(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -324,13 +325,14 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment {
+ " projection: \\*, 'fine' AS FINE\n"
+ " filter: id <> 3 AND id <> 4\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
complexDataTypesDatabase.getDatabaseName(),
DORIS.getUsername(),
DORIS.getPassword(),
complexDataTypesDatabase.getDatabaseName());
complexDataTypesDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -102,11 +102,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName());
mysqlInventoryDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -209,11 +210,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName());
mysqlInventoryDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -154,11 +154,12 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -248,13 +249,14 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " sink-table: %s.ALL\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -331,13 +333,14 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " sink-table: NEW_%s.ALPHABET\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -428,7 +431,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " sink-table: NEW_%s.BETAGAMM\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@ -436,7 +439,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -535,7 +539,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " sink-table: NEW_%s.TABLEC\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@ -545,7 +549,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -647,14 +652,15 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " sink-table: %s.ALL\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -733,13 +739,14 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
+ " replace-symbol: <>\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName(),
routeTestDatabase.getDatabaseName());
routeTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -240,11 +240,12 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: unexpected\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
schemaEvolveDatabase.getDatabaseName());
schemaEvolveDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -310,13 +311,14 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: %s\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
dbName,
mergeTable ? "(members|new_members)" : "members",
behavior);
behavior,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@ -118,7 +118,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, VERSION\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@ -126,7 +126,8 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -200,13 +201,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " filter: ID <= 1008\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -287,7 +289,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEBETA) AS NAME\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
@ -295,7 +297,8 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -365,13 +368,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEBETA) AS NAME\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -446,13 +450,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -530,12 +535,13 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " - source-table: %s.TABLE\\.*\n"
+ " projection: \\*, ID + 1000 as UID, VERSION AS NEWVERSION\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -613,13 +619,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " - source-table: %s.TABLEBETA\n"
+ " projection: ID, CAST(VERSION AS DOUBLE) + 100 AS VERSION, CAST(AGEBETA AS VARCHAR) || ' - ' || NAMEBETA AS IDENTIFIER\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -708,13 +715,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1\n"
+ " parallelism: %d\n"
+ " local-time-zone: America/Los_Angeles",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName());
transformTestDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
@ -781,7 +789,9 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
}
boolean extractDataLines(String line) {
if (!line.startsWith("DataChangeEvent{")) {
// In multiple parallelism mode, a prefix with subTaskId (like '1> ') will be appended.
// Should trim it before extracting data fields.
if (!line.startsWith("DataChangeEvent{", 3)) {
return false;
}
Stream.of("before", "after")

@ -128,7 +128,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, VERSION, answer() AS ANS, typeof(ID) AS TYP\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1\n"
+ " parallelism: %d\n"
+ " user-defined-function:\n"
+ " - name: addone\n"
+ " classpath: org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass\n"
@ -146,6 +146,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
parallelism,
language,
language,
language,
@ -267,7 +268,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
+ " projection: ID, VERSION, typeof(ID) AS TYP\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1\n"
+ " parallelism: %d\n"
+ " user-defined-function:\n"
+ " - name: addone\n"
+ " classpath: org.apache.flink.udf.examples.%s.AddOneFunctionClass\n"
@ -281,6 +282,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
transformRenameDatabase.getDatabaseName(),
parallelism,
language,
language,
language);

@ -67,6 +67,8 @@ public abstract class PipelineTestEnvironment extends TestLogger {
@Parameterized.Parameter public String flinkVersion;
public Integer parallelism = 4;
// ------------------------------------------------------------------------------------------
// Flink Variables
// ------------------------------------------------------------------------------------------

@ -39,20 +39,17 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@ -120,6 +117,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private final SchemaChangeBehavior schemaChangeBehavior;
private transient SchemaOperatorMetrics schemaOperatorMetrics;
private transient int subTaskId;
@VisibleForTesting
public SchemaOperator(List<RouteRule> routingRules) {
@ -153,6 +151,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
schemaOperatorMetrics =
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
@ -218,17 +217,6 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
});
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
if (context.isRestored()) {
// Multiple operators may appear during a restart process,
// only clear the pendingSchemaChanges when the first operator starts.
if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
sendRequestToCoordinator(new RefreshPendingListsRequest());
}
}
}
/**
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
@ -248,7 +236,11 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private void processSchemaChangeEvents(SchemaChangeEvent event)
throws InterruptedException, TimeoutException, ExecutionException {
TableId tableId = event.tableId();
LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId);
LOG.info(
"{}> Table {} received SchemaChangeEvent {} and start to be blocked.",
subTaskId,
tableId,
event);
handleSchemaChangeEvent(tableId, event);
// Update caches
originalSchema.put(tableId, getLatestOriginalSchema(tableId));
@ -412,36 +404,62 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
schemaChangeEvent));
}
// The request will need to send a FlushEvent or block until flushing finished
// The request will block if another schema change event is being handled
SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
if (!response.getSchemaChangeEvents().isEmpty()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
if (response.isAccepted()) {
LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
// The request will block until flushing finished in each sink writer
ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream();
SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
// Update evolved schema changes based on apply results
finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));
} else if (response.isDuplicate()) {
LOG.info(
"{}> Schema change event {} has been handled in another subTask already.",
subTaskId,
schemaChangeEvent);
} else if (response.isIgnored()) {
LOG.info(
"{}> Schema change event {} has been ignored. No schema evolution needed.",
subTaskId,
schemaChangeEvent);
} else {
throw new IllegalStateException("Unexpected response status " + response);
}
}
private SchemaChangeResponse requestSchemaChange(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
TableId tableId, SchemaChangeEvent schemaChangeEvent)
throws InterruptedException, TimeoutException {
long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
if (response.isRegistryBusy()) {
if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
LOG.info(
"{}> Schema Registry is busy now, waiting for next request...",
subTaskId);
Thread.sleep(1000);
} else {
throw new TimeoutException("TimeOut when requesting schema change");
}
} else {
return response;
}
}
}
private ReleaseUpstreamResponse requestReleaseUpstream()
private SchemaChangeResultResponse requestSchemaChangeResult()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
sendRequestToCoordinator(new ReleaseUpstreamRequest());
sendRequestToCoordinator(new SchemaChangeResultRequest());
long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (coordinationResponse instanceof SchemaChangeProcessingResponse) {
if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
@ -451,7 +469,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
throw new TimeoutException("TimeOut when requesting release upstream");
}
}
return ((ReleaseUpstreamResponse) coordinationResponse);
return ((SchemaChangeResultResponse) coordinationResponse);
}
private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>

@ -18,11 +18,15 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer;
@ -39,6 +43,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
@ -93,6 +98,97 @@ public class SchemaManager {
return behavior;
}
/**
* This function checks if the given schema change event has been applied already. If so, it
* will be ignored to avoid sending duplicate evolved schema change events to sink metadata
* applier.
*/
public final boolean isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
TableId tableId = event.tableId();
Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
return Boolean.TRUE.equals(
SchemaChangeEventVisitor.visit(
event,
addColumnEvent -> {
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
List<Column> existedColumns = latestSchema.get().getColumns();
// It has been applied only if all columns are present in existedColumns
for (AddColumnEvent.ColumnWithPosition column :
addColumnEvent.getAddedColumns()) {
if (!existedColumns.contains(column.getAddColumn())) {
return false;
}
}
return true;
},
alterColumnTypeEvent -> {
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
Schema schema = latestSchema.get();
// It has been applied only if all column types are set as expected
for (Map.Entry<String, DataType> entry :
alterColumnTypeEvent.getTypeMapping().entrySet()) {
if (!schema.getColumn(entry.getKey()).isPresent()
|| !schema.getColumn(entry.getKey())
.get()
.getType()
.equals(entry.getValue())) {
return false;
}
}
return true;
},
createTableEvent -> {
// It has been applied if such table already exists
return latestSchema.isPresent();
},
dropColumnEvent -> {
// It has not been applied if schema does not even exist
if (!latestSchema.isPresent()) {
return false;
}
List<String> existedColumnNames = latestSchema.get().getColumnNames();
// It has been applied only if corresponding column types do not exist
return dropColumnEvent.getDroppedColumnNames().stream()
.noneMatch(existedColumnNames::contains);
},
dropTableEvent -> {
// It has been applied if such table does not exist
return !latestSchema.isPresent();
},
renameColumnEvent -> {
// It has been applied if such table already exists
if (!latestSchema.isPresent()) {
return false;
}
List<String> existedColumnNames = latestSchema.get().getColumnNames();
// It has been applied only if all previous names do not exist, and all
// new names already exist
for (Map.Entry<String, String> entry :
renameColumnEvent.getNameMapping().entrySet()) {
if (existedColumnNames.contains(entry.getKey())
|| !existedColumnNames.contains(entry.getValue())) {
return false;
}
}
return true;
},
truncateTableEvent -> {
// We have no way to ensure if a TruncateTableEvent has been applied
// before. Just assume it's not.
return false;
}));
}
public final boolean schemaExists(
Map<TableId, SortedMap<Integer, Schema>> schemaMap, TableId tableId) {
return schemaMap.containsKey(tableId) && !schemaMap.get(tableId).isEmpty();

@ -29,8 +29,6 @@ import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaReque
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
@ -201,18 +199,14 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
} else if (request instanceof ReleaseUpstreamRequest) {
return requestHandler.handleReleaseUpstreamRequest();
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof GetEvolvedSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof RefreshPendingListsRequest) {
return requestHandler.refreshPendingLists();
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);

@ -33,41 +33,36 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
/** A handler to deal with all requests and events for {@link SchemaRegistry}. */
@Internal
@NotThreadSafe
public class SchemaRegistryRequestHandler implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
@ -81,21 +76,18 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final SchemaDerivation schemaDerivation;
/**
* Not applied SchemaChangeRequest before receiving all flush success events for its table from
* sink writers.
* Atomic flag indicating if current RequestHandler could accept more schema changes for now.
*/
private final List<PendingSchemaChange> pendingSchemaChanges;
private final AtomicReference<RequestStatus> schemaChangeStatus;
private final List<SchemaChangeEvent> finishedSchemaChanges;
private final List<SchemaChangeEvent> ignoredSchemaChanges;
private volatile Throwable currentChangeException;
private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
private volatile List<SchemaChangeEvent> currentFinishedSchemaChanges;
private volatile List<SchemaChangeEvent> currentIgnoredSchemaChanges;
/** Sink writers which have sent flush success events for the request. */
private final Set<Integer> flushedSinkWriters;
/** Status of the execution of current schema change request. */
private volatile boolean isSchemaChangeApplying;
/** Actual exception if failed to apply schema change. */
private volatile Throwable schemaChangeException;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;
@ -107,16 +99,81 @@ public class SchemaRegistryRequestHandler implements Closeable {
SchemaDerivation schemaDerivation,
SchemaChangeBehavior schemaChangeBehavior) {
this.metadataApplier = metadataApplier;
this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
this.finishedSchemaChanges = new LinkedList<>();
this.ignoredSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
this.isSchemaChangeApplying = false;
this.schemaChangeBehavior = schemaChangeBehavior;
this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
this.currentFinishedSchemaChanges = new ArrayList<>();
this.currentIgnoredSchemaChanges = new ArrayList<>();
this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
}
/**
* Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.
*
* @param request the received SchemaChangeRequest
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
"Received schema change event request {} from table {}. Start to buffer requests for others.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
// If this schema change event has been requested by another subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
clearCurrentSchemaChangeRequest();
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated.");
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
// If this schema change event is filtered out by LENIENT mode or merging table route
// strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
clearCurrentSchemaChangeRequest();
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored.");
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}
// Backfill pre-schema info for sink applying
derivedSchemaChangeEvents.forEach(
e -> {
if (e instanceof SchemaChangeEventWithPreSchema) {
SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e;
if (!pe.hasPreSchema()) {
schemaManager
.getLatestEvolvedSchema(pe.tableId())
.ifPresent(pe::fillPreSchema);
}
}
});
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"Schema Registry is busy processing a schema change request, could not handle request {} for now.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
}
/**
@ -127,27 +184,22 @@ public class SchemaRegistryRequestHandler implements Closeable {
*/
private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
isSchemaChangeApplying = true;
schemaChangeException = null;
finishedSchemaChanges.clear();
ignoredSchemaChanges.clear();
for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {
if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
ignoredSchemaChanges.add(changeEvent);
currentIgnoredSchemaChanges.add(changeEvent);
continue;
}
}
if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {
LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);
ignoredSchemaChanges.add(changeEvent);
currentIgnoredSchemaChanges.add(changeEvent);
} else {
try {
metadataApplier.applySchemaChange(changeEvent);
LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId);
LOG.info("Applied schema change {} to table {}.", changeEvent, tableId);
schemaManager.applyEvolvedSchemaChange(changeEvent);
finishedSchemaChanges.add(changeEvent);
currentFinishedSchemaChanges.add(changeEvent);
} catch (Throwable t) {
LOG.error(
"Failed to apply schema change {} to table {}. Caused by: {}",
@ -155,7 +207,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
tableId,
t);
if (!shouldIgnoreException(t)) {
schemaChangeException = t;
currentChangeException = t;
break;
} else {
LOG.warn(
@ -166,74 +218,9 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
}
}
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
startNextSchemaChangeRequest();
}
isSchemaChangeApplying = false;
}
/**
* Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.
*
* @param request the received SchemaChangeRequest
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (pendingSchemaChanges.isEmpty()) {
LOG.info(
"Received schema change event request from table {}. Start to buffer requests for others.",
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
if (event instanceof CreateTableEvent
&& schemaManager.originalSchemaExists(request.getTableId())) {
return CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(Collections.emptyList())));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
derivedSchemaChangeEvents.forEach(
e -> {
if (e instanceof SchemaChangeEventWithPreSchema) {
SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e;
if (!pe.hasPreSchema()) {
schemaManager
.getLatestEvolvedSchema(pe.tableId())
.ifPresent(pe::fillPreSchema);
}
}
});
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
if (!derivedSchemaChangeEvents.isEmpty()) {
PendingSchemaChange pendingSchemaChange =
new PendingSchemaChange(request, response);
pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents;
pendingSchemaChanges.add(pendingSchemaChange);
pendingSchemaChanges.get(0).startToWaitForReleaseRequest();
}
return response;
} else {
LOG.info("There are already processing requests. Wait for processing.");
CompletableFuture<CoordinationResponse> response = new CompletableFuture<>();
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
return response;
}
}
/** Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing. */
public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest() {
CompletableFuture<CoordinationResponse> response =
pendingSchemaChanges.get(0).getResponseFuture();
if (response.isDone() && !isSchemaChangeApplying) {
startNextSchemaChangeRequest();
} else {
pendingSchemaChanges.get(0).receiveReleaseRequest();
}
return response;
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes");
}
/**
@ -252,76 +239,33 @@ public class SchemaRegistryRequestHandler implements Closeable {
* @param tableId the subtask in SchemaOperator and table that the FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedException {
public void flushSuccess(TableId tableId, int sinkSubtask) {
flushedSinkWriters.add(sinkSubtask);
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents");
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
schemaChangeThreadPool.submit(
() -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents));
Thread.sleep(1000);
if (schemaChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", schemaChangeException);
}
if (isSchemaChangeApplying) {
waitFlushSuccess
.getResponseFuture()
.complete(wrap(new SchemaChangeProcessingResponse()));
} else {
waitFlushSuccess
.getResponseFuture()
.complete(wrap(new ReleaseUpstreamResponse(finishedSchemaChanges)));
}
() -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents));
}
}
private void startNextSchemaChangeRequest() {
pendingSchemaChanges.remove(0);
flushedSinkWriters.clear();
while (!pendingSchemaChanges.isEmpty()) {
PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0);
SchemaChangeRequest request = pendingSchemaChange.changeRequest;
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.evolvedSchemaExists(request.getTableId())) {
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(Collections.emptyList())));
pendingSchemaChanges.remove(0);
} else {
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
if (!derivedSchemaChangeEvents.isEmpty()) {
pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents;
pendingSchemaChange.startToWaitForReleaseRequest();
break;
}
}
}
}
public CompletableFuture<CoordinationResponse> refreshPendingLists() {
pendingSchemaChanges.clear();
flushedSinkWriters.clear();
return CompletableFuture.completedFuture(wrap(new RefreshPendingListsResponse()));
}
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
if (schemaChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", schemaChangeException);
}
if (isSchemaChangeApplying) {
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
} else {
Preconditions.checkState(
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
// This request has been finished, return it and prepare for the next request
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
() -> wrap(new ReleaseUpstreamResponse(finishedSchemaChanges)));
() -> wrap(new SchemaChangeResultResponse(finishedEvents)));
} else {
// Still working on schema change request, waiting it
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
}
}
@ -445,57 +389,54 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
private boolean shouldIgnoreException(Throwable throwable) {
// In IGNORE mode, will never try to apply schema change events
// In EVOLVE and and LENIENT mode, such failure will not be tolerated
// In EVOLVE and LENIENT mode, such failure will not be tolerated
// In EXCEPTION mode, an exception will be thrown once captured
return (throwable instanceof UnsupportedSchemaChangeEventException)
&& (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
}
private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private List<SchemaChangeEvent> derivedSchemaChangeEvents;
private CompletableFuture<CoordinationResponse> responseFuture;
private RequestStatus status;
public PendingSchemaChange(
SchemaChangeRequest changeRequest,
CompletableFuture<CoordinationResponse> responseFuture) {
this.changeRequest = changeRequest;
this.responseFuture = responseFuture;
this.status = RequestStatus.PENDING;
}
public SchemaChangeRequest getChangeRequest() {
return changeRequest;
}
public CompletableFuture<CoordinationResponse> getResponseFuture() {
return responseFuture;
}
public RequestStatus getStatus() {
return status;
}
public void startToWaitForReleaseRequest() {
if (!responseFuture.isDone()) {
throw new IllegalStateException(
"Cannot start to wait for flush success before the SchemaChangeRequest is done.");
}
this.responseFuture = new CompletableFuture<>();
this.status = RequestStatus.WAIT_RELEASE_REQUEST;
}
public void receiveReleaseRequest() {
this.status = RECEIVED_RELEASE_REQUEST;
private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
if (currentChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", currentChangeException);
}
List<SchemaChangeEvent> finishedSchemaChanges =
new ArrayList<>(currentFinishedSchemaChanges);
flushedSinkWriters.clear();
currentDerivedSchemaChangeEvents.clear();
currentFinishedSchemaChanges.clear();
currentIgnoredSchemaChanges.clear();
currentChangeException = null;
return finishedSchemaChanges;
}
enum RequestStatus {
PENDING,
WAIT_RELEASE_REQUEST,
RECEIVED_RELEASE_REQUEST
// Schema change event state could transfer in the following way:
//
// -------- B --------
// | |
// v |
// -------- ---------------------
// | IDLE | --- A --> | WAITING_FOR_FLUSH |
// -------- ---------------------
// ^ |
// E C
// \ v
// ------------ ------------
// | FINISHED | <-- D -- | APPLYING |
// ------------ ------------
//
// A: When a request came to an idling request handler.
// B: When current request is duplicate or ignored by LENIENT / routed table merging
// strategies.
// C: When schema registry collected enough flush success events, and actually started to apply
// schema changes.
// D: When schema change application finishes (successfully or with exceptions)
// E: When current schema change request result has been retrieved by SchemaOperator, and ready
// for the next request.
private enum RequestStatus {
IDLE,
WAITING_FOR_FLUSH,
APPLYING,
FINISHED
}
}

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
/** Request to refresh the pendingSchemaChanges of {@link SchemaRegistryRequestHandler}. */
public class RefreshPendingListsRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
}

@ -1,26 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
/** Response to refresh the pendingSchemaChanges of {@link RefreshPendingListsRequest}. */
public class RefreshPendingListsResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
}

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
/**
* The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to release upstream
* after sending {@link FlushEvent}.
*/
public class ReleaseUpstreamRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
}

@ -23,8 +23,8 @@ import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
/**
* The response for {@link SchemaChangeResultRequest} or {@link ReleaseUpstreamRequest} from {@link
* SchemaRegistry} to {@link SchemaOperator} if not apply {@link SchemaChangeEvent} in time.
* The response for {@link SchemaChangeResultRequest} from {@link SchemaRegistry} to {@link
* SchemaOperator} if not apply {@link SchemaChangeEvent} in time.
*/
public class SchemaChangeProcessingResponse implements CoordinationResponse {

@ -22,6 +22,7 @@ import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -38,8 +39,44 @@ public class SchemaChangeResponse implements CoordinationResponse {
*/
private final List<SchemaChangeEvent> schemaChangeEvents;
public SchemaChangeResponse(List<SchemaChangeEvent> schemaChangeEvents) {
private final ResponseCode responseCode;
public static SchemaChangeResponse accepted(List<SchemaChangeEvent> schemaChangeEvents) {
return new SchemaChangeResponse(schemaChangeEvents, ResponseCode.ACCEPTED);
}
public static SchemaChangeResponse busy() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.BUSY);
}
public static SchemaChangeResponse duplicate() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.DUPLICATE);
}
public static SchemaChangeResponse ignored() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED);
}
private SchemaChangeResponse(
List<SchemaChangeEvent> schemaChangeEvents, ResponseCode responseCode) {
this.schemaChangeEvents = schemaChangeEvents;
this.responseCode = responseCode;
}
public boolean isAccepted() {
return ResponseCode.ACCEPTED.equals(responseCode);
}
public boolean isRegistryBusy() {
return ResponseCode.BUSY.equals(responseCode);
}
public boolean isDuplicate() {
return ResponseCode.DUPLICATE.equals(responseCode);
}
public boolean isIgnored() {
return ResponseCode.IGNORED.equals(responseCode);
}
public List<SchemaChangeEvent> getSchemaChangeEvents() {
@ -55,11 +92,43 @@ public class SchemaChangeResponse implements CoordinationResponse {
return false;
}
SchemaChangeResponse response = (SchemaChangeResponse) o;
return schemaChangeEvents.equals(response.schemaChangeEvents);
return Objects.equals(schemaChangeEvents, response.schemaChangeEvents)
&& responseCode == response.responseCode;
}
@Override
public int hashCode() {
return Objects.hash(schemaChangeEvents);
return Objects.hash(schemaChangeEvents, responseCode);
}
@Override
public String toString() {
return "SchemaChangeResponse{"
+ "schemaChangeEvents="
+ schemaChangeEvents
+ ", responseCode="
+ responseCode
+ '}';
}
/**
* Schema Change Response status code.
*
* <p>- Accepted: Requested schema change request has been accepted exclusively. Any other
* schema change requests will be blocked.
*
* <p>- Busy: Schema registry is currently busy processing another schema change request.
*
* <p>- Duplicate: This schema change request has been submitted before, possibly by another
* paralleled subTask.
*
* <p>- Ignored: This schema change request has been assessed, but no actual evolution is
* required. Possibly caused by LENIENT mode or merging table strategies.
*/
public enum ResponseCode {
ACCEPTED,
BUSY,
DUPLICATE,
IGNORED
}
}

@ -26,10 +26,10 @@ import java.util.List;
import java.util.Objects;
/**
* The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link
* The response for {@link SchemaChangeResultRequest} from {@link SchemaRegistry} to {@link
* SchemaOperator}.
*/
public class ReleaseUpstreamResponse implements CoordinationResponse {
public class SchemaChangeResultResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
@ -39,7 +39,7 @@ public class ReleaseUpstreamResponse implements CoordinationResponse {
*/
private final List<SchemaChangeEvent> finishedSchemaChangeEvents;
public ReleaseUpstreamResponse(List<SchemaChangeEvent> finishedSchemaChangeEvents) {
public SchemaChangeResultResponse(List<SchemaChangeEvent> finishedSchemaChangeEvents) {
this.finishedSchemaChangeEvents = finishedSchemaChangeEvents;
}
@ -63,7 +63,7 @@ public class ReleaseUpstreamResponse implements CoordinationResponse {
if (object == null || getClass() != object.getClass()) {
return false;
}
ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object;
SchemaChangeResultResponse that = (SchemaChangeResultResponse) object;
return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents);
}

@ -163,7 +163,6 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
}
public void registerTableSchema(TableId tableId, Schema schema) {
schemaRegistry.handleApplyOriginalSchemaChangeEvent(new CreateTableEvent(tableId, schema));
schemaRegistry.handleCoordinationRequest(
new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema)));
schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema));

Loading…
Cancel
Save