From 6dd1c11fe13f29fcaf202c35234040f44383e3bf Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 22 Aug 2024 19:19:21 +0800 Subject: [PATCH] [FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler thread safe by blocking subsequent schemaChangeEvent This closes #3563. Co-authored-by: Hongshun Wang (cherry picked from commit ee843e2f246c300ecfeaf5cfd529693fadf2625f) --- .../reader/MySqlPipelineRecordEmitter.java | 62 ++-- .../mysql/source/MySqlFullTypesITCase.java | 26 +- .../mysql/source/MySqlPipelineITCase.java | 30 +- .../mysql/testutils/MySqSourceTestUtils.java | 19 + .../pipeline/tests/MySqlToDorisE2eITCase.java | 10 +- .../cdc/pipeline/tests/MysqlE2eITCase.java | 253 ++++++++----- .../cdc/pipeline/tests/RouteE2eITCase.java | 35 +- .../pipeline/tests/SchemaEvolveE2eITCase.java | 10 +- .../pipeline/tests/TransformE2eITCase.java | 44 ++- .../cdc/pipeline/tests/UdfE2eITCase.java | 6 +- .../tests/utils/PipelineTestEnvironment.java | 2 + .../operators/schema/SchemaOperator.java | 74 ++-- .../schema/coordinator/SchemaManager.java | 96 +++++ .../schema/coordinator/SchemaRegistry.java | 10 +- .../SchemaRegistryRequestHandler.java | 342 ++++++++---------- .../event/RefreshPendingListsRequest.java | 27 -- .../event/RefreshPendingListsResponse.java | 26 -- .../schema/event/ReleaseUpstreamRequest.java | 32 -- .../event/SchemaChangeProcessingResponse.java | 4 +- .../schema/event/SchemaChangeResponse.java | 75 +++- ...e.java => SchemaChangeResultResponse.java} | 8 +- .../operators/EventOperatorTestHarness.java | 1 - 22 files changed, 711 insertions(+), 481 deletions(-) delete mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java delete mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java delete mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java rename flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/{ReleaseUpstreamResponse.java => SchemaChangeResultResponse.java} (87%) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index 3f7581347..909ed6c5b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; import org.apache.flink.cdc.connectors.mysql.table.StartupMode; +import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils; import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.connector.base.source.reader.RecordEmitter; @@ -68,7 +69,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { // Used when startup mode is not initial private boolean alreadySendCreateTableForBinlogSplit = false; - private final List createTableEventCache; + private List createTableEventCache; public MySqlPipelineRecordEmitter( DebeziumDeserializationSchema debeziumDeserializationSchema, @@ -80,23 +81,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { sourceConfig.isIncludeSchemaChanges()); this.sourceConfig = sourceConfig; this.alreadySendCreateTableTables = new HashSet<>(); - this.createTableEventCache = new ArrayList<>(); - - if (!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { - try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - List capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters()); - for (TableId tableId : capturedTableIds) { - Schema schema = getSchema(jdbc, tableId); - createTableEventCache.add( - new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.catalog(), tableId.table()), - schema)); - } - } catch (SQLException e) { - throw new RuntimeException("Cannot start emitter to fetch table schema.", e); - } - } + this.createTableEventCache = generateCreateTableEvent(sourceConfig); } @Override @@ -104,6 +89,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { SourceRecord element, SourceOutput output, MySqlSplitState splitState) throws Exception { if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + // In Snapshot phase of INITIAL startup mode, we lazily send CreateTableEvent to + // downstream to avoid checkpoint timeout. TableId tableId = splitState.asSnapshotSplitState().toMySqlSplit().getTableId(); if (!alreadySendCreateTableTables.contains(tableId)) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { @@ -111,11 +98,24 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { alreadySendCreateTableTables.add(tableId); } } - } else if (splitState.isBinlogSplitState() - && !alreadySendCreateTableForBinlogSplit - && !sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { - createTableEventCache.forEach(output::collect); + } else if (splitState.isBinlogSplitState() && !alreadySendCreateTableForBinlogSplit) { alreadySendCreateTableForBinlogSplit = true; + if (sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { + // In Snapshot -> Binlog transition of INITIAL startup mode, ensure all table + // schemas have been sent to downstream. We use previously cached schema instead of + // re-request latest schema because there might be some pending schema change events + // in the queue, and that may accidentally emit evolved schema before corresponding + // schema change events. + createTableEventCache.stream() + .filter( + event -> + !alreadySendCreateTableTables.contains( + MySqlSchemaUtils.toDbzTableId(event.tableId()))) + .forEach(output::collect); + } else { + // In Binlog only mode, we simply emit all schemas at once. + createTableEventCache.forEach(output::collect); + } } super.processElement(element, output, splitState); } @@ -233,4 +233,22 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter { } return mySqlAntlrDdlParser; } + + private List generateCreateTableEvent(MySqlSourceConfig sourceConfig) { + try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { + List createTableEventCache = new ArrayList<>(); + List capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters()); + for (TableId tableId : capturedTableIds) { + Schema schema = getSchema(jdbc, tableId); + createTableEventCache.add( + new CreateTableEvent( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.catalog(), tableId.table()), + schema)); + } + return createTableEventCache; + } catch (SQLException e) { + throw new RuntimeException("Cannot start emitter to fetch table schema.", e); + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java index 91351dabf..2bd1ae689 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java @@ -295,8 +295,9 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { .executeAndCollect(); // skip CreateTableEvent - List snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2); - RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after(); + List snapshotResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType)) .isEqualTo(expectedSnapshot); @@ -306,7 +307,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;"); } - List streamResults = MySqSourceTestUtils.fetchResults(iterator, 1); + List streamResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after(); Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType)) .isEqualTo(expectedStreamRecord); @@ -397,8 +399,9 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { }; // skip CreateTableEvent - List snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2); - RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after(); + List snapshotResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, COMMON_TYPES)) .isEqualTo(expectedSnapshot); @@ -412,7 +415,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}"); Object[] expectedStreamRecord = expectedSnapshot; - List streamResults = MySqSourceTestUtils.fetchResults(iterator, 1); + List streamResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after(); Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, COMMON_TYPES)) .isEqualTo(expectedStreamRecord); @@ -437,9 +441,10 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { "Event-Source") .executeAndCollect(); - // skip CreateTableEvent - List snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2); - RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after(); + // skip CreateTableEvents + List snapshotResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after(); Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType)) .isEqualTo(expectedSnapshot); @@ -450,7 +455,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { "UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;"); } - List streamResults = MySqSourceTestUtils.fetchResults(iterator, 1); + List streamResults = + MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0; RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after(); Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType)) .isEqualTo(expectedStreamRecord); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index bd2c059bb..f019e98d8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -60,6 +60,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.stream.Stream; @@ -223,12 +224,33 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { BinaryStringData.fromString("c-21") }))); } + // In this configuration, several subtasks might emit their corresponding CreateTableEvent + // to downstream. Since it is not possible to predict how many CreateTableEvents should we + // expect, we simply filter them out from expected sets, and assert there's at least one. List actual = - fetchResults(events, 1 + expectedSnapshot.size() + expectedBinlog.size()); - assertThat(actual.get(0)).isEqualTo(createTableEvent); - assertThat(actual.subList(1, 10)) + fetchResultsExcept( + events, expectedSnapshot.size() + expectedBinlog.size(), createTableEvent); + assertThat(actual.subList(0, expectedSnapshot.size())) .containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0])); - assertThat(actual.subList(10, actual.size())).isEqualTo(expectedBinlog); + assertThat(actual.subList(expectedSnapshot.size(), actual.size())) + .isEqualTo(expectedBinlog); + } + + private static List fetchResultsExcept(Iterator iter, int size, T sideEvent) { + List result = new ArrayList<>(size); + List sideResults = new ArrayList<>(); + while (size > 0 && iter.hasNext()) { + T event = iter.next(); + if (!event.equals(sideEvent)) { + result.add(event); + size--; + } else { + sideResults.add(sideEvent); + } + } + // Also ensure we've received at least one or many side events. + assertThat(sideResults).isNotEmpty(); + return result; } @Test diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java index 0708cd8cd..a76838d66 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java @@ -17,6 +17,9 @@ package org.apache.flink.cdc.connectors.mysql.testutils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.CreateTableEvent; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -38,6 +41,22 @@ public class MySqSourceTestUtils { return result; } + public static Tuple2, List> fetchResultsAndCreateTableEvent( + Iterator iter, int size) { + List result = new ArrayList<>(size); + List createTableEvents = new ArrayList<>(); + while (size > 0 && iter.hasNext()) { + T event = iter.next(); + if (event instanceof CreateTableEvent) { + createTableEvents.add((CreateTableEvent) event); + } else { + result.add(event); + size--; + } + } + return Tuple2.of(result, createTableEvents); + } + public static String getServerId(int parallelism) { final Random random = new Random(); int serverId = random.nextInt(100) + 5400; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java index 0e58c5ed2..508eebdb7 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToDorisE2eITCase.java @@ -194,12 +194,13 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { + " table.create.properties.replication_num: 1\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, mysqlInventoryDatabase.getDatabaseName(), DORIS.getUsername(), - DORIS.getPassword()); + DORIS.getPassword(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -324,13 +325,14 @@ public class MySqlToDorisE2eITCase extends PipelineTestEnvironment { + " projection: \\*, 'fine' AS FINE\n" + " filter: id <> 3 AND id <> 4\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, complexDataTypesDatabase.getDatabaseName(), DORIS.getUsername(), DORIS.getPassword(), - complexDataTypesDatabase.getDatabaseName()); + complexDataTypesDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 917873bb8..1f730be62 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -39,8 +39,6 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeoutException; /** End-to-end tests for mysql cdc pipeline job. */ @@ -104,11 +102,12 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { + " type: values\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - mysqlInventoryDatabase.getDatabaseName()); + mysqlInventoryDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -123,54 +122,24 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { String.format( "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", mysqlInventoryDatabase.getDatabaseName())); - List expectedEvents = - Arrays.asList( - String.format( - "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName())); - validateResult(expectedEvents); + + validateResult( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}"); + LOG.info("Begin incremental reading stage."); // generate binlogs String mysqlJdbcUrl = @@ -212,38 +181,160 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", mysqlInventoryDatabase.getDatabaseName())); - expectedEvents = - Arrays.asList( - String.format( - "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", - mysqlInventoryDatabase.getDatabaseName()), - String.format( - "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", - mysqlInventoryDatabase.getDatabaseName())); - validateResult(expectedEvents); + validateResult( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}"); + } + + @Test + public void testSchemaChangeEvents() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}"); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // Perform DDL changes after the binlog is generated + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + mysqlInventoryDatabase.getDatabaseName())); + + LOG.info("Begin schema evolution stage."); + + // Test AddColumnEvent + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + + // Test AlterColumnTypeEvent + stat.execute("ALTER TABLE products MODIFY COLUMN new_col BIGINT;"); + stat.execute( + "INSERT INTO products VALUES (default,'derrida','forever 21',2.1728, null, null, null, 2147483649);"); // 112 + + // Test RenameColumnEvent + stat.execute("ALTER TABLE products RENAME COLUMN new_col TO new_column;"); + stat.execute( + "INSERT INTO products VALUES (default,'dynazenon','SSSS',2.1728, null, null, null, 2147483649);"); // 113 + + // Test DropColumnEvent + stat.execute("ALTER TABLE products DROP COLUMN new_column;"); + stat.execute( + "INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114 + + // Test TruncateTableEvent + stat.execute("TRUNCATE TABLE products;"); + + // Test DropTableEvent. It's all over. + stat.execute("DROP TABLE products;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + waitUntilSpecificEvent( + String.format( + "DropTableEvent{tableId=%s.products}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=()}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=()}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=()}", + "DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.products}", + "DropTableEvent{tableId=%s.products}"); } - private void validateResult(List expectedEvents) throws Exception { + private void validateResult(String... expectedEvents) throws Exception { + String dbName = mysqlInventoryDatabase.getDatabaseName(); for (String event : expectedEvents) { - waitUntilSpecificEvent(event); + waitUntilSpecificEvent(String.format(event, dbName, dbName)); } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index ab7ed178e..f4ea4d81a 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -154,11 +154,12 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " type: values\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -248,13 +249,14 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " sink-table: %s.ALL\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -331,13 +333,14 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " sink-table: NEW_%s.ALPHABET\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -428,7 +431,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " sink-table: NEW_%s.BETAGAMM\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, @@ -436,7 +439,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment { routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -535,7 +539,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " sink-table: NEW_%s.TABLEC\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, @@ -545,7 +549,8 @@ public class RouteE2eITCase extends PipelineTestEnvironment { routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -647,14 +652,15 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " sink-table: %s.ALL\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -733,13 +739,14 @@ public class RouteE2eITCase extends PipelineTestEnvironment { + " replace-symbol: <>\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, routeTestDatabase.getDatabaseName(), routeTestDatabase.getDatabaseName(), - routeTestDatabase.getDatabaseName()); + routeTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index c84a8824f..0a8d7c483 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -226,11 +226,12 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { + "\n" + "pipeline:\n" + " schema.change.behavior: unexpected\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, - schemaEvolveDatabase.getDatabaseName()); + schemaEvolveDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -296,13 +297,14 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { + "\n" + "pipeline:\n" + " schema.change.behavior: %s\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, dbName, mergeTable ? "(members|new_members)" : "members", - behavior); + behavior, + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index 6f612ca95..ae516fc06 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -118,7 +118,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " - source-table: %s.TABLEBETA\n" + " projection: ID, VERSION\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, @@ -126,7 +126,8 @@ public class TransformE2eITCase extends PipelineTestEnvironment { transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -200,13 +201,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " filter: ID <= 1008\n" + "\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -287,7 +289,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " - source-table: %s.TABLEBETA\n" + " projection: ID, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEBETA) AS NAME\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, @@ -295,7 +297,8 @@ public class TransformE2eITCase extends PipelineTestEnvironment { transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -365,13 +368,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " - source-table: %s.TABLEBETA\n" + " projection: \\*, CONCAT('v', VERSION) AS VERSION, LOWER(NAMEBETA) AS NAME\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -446,13 +450,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " - source-table: %s.TABLEBETA\n" + " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -530,12 +535,13 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " - source-table: %s.TABLE\\.*\n" + " projection: \\*, ID + 1000 as UID, VERSION AS NEWVERSION\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -613,13 +619,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " - source-table: %s.TABLEBETA\n" + " projection: ID, CAST(VERSION AS DOUBLE) + 100 AS VERSION, CAST(AGEBETA AS VARCHAR) || ' - ' || NAMEBETA AS IDENTIFIER\n" + "pipeline:\n" - + " parallelism: 1", + + " parallelism: %d", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -708,13 +715,14 @@ public class TransformE2eITCase extends PipelineTestEnvironment { + " projection: ID, LOCALTIME as lcl_t, CURRENT_TIME as cur_t, CAST(CURRENT_TIMESTAMP AS TIMESTAMP) as cur_ts, CAST(NOW() AS TIMESTAMP) as now_ts, LOCALTIMESTAMP as lcl_ts, CURRENT_DATE as cur_dt\n" + "\n" + "pipeline:\n" - + " parallelism: 1\n" + + " parallelism: %d\n" + " local-time-zone: America/Los_Angeles", INTER_CONTAINER_MYSQL_ALIAS, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, transformTestDatabase.getDatabaseName(), - transformTestDatabase.getDatabaseName()); + transformTestDatabase.getDatabaseName(), + parallelism); Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); @@ -781,7 +789,9 @@ public class TransformE2eITCase extends PipelineTestEnvironment { } boolean extractDataLines(String line) { - if (!line.startsWith("DataChangeEvent{")) { + // In multiple parallelism mode, a prefix with subTaskId (like '1> ') will be appended. + // Should trim it before extracting data fields. + if (!line.startsWith("DataChangeEvent{", 3)) { return false; } Stream.of("before", "after") diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java index 938e2d98e..9f5b98298 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java @@ -128,7 +128,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment { + " projection: ID, VERSION, answer() AS ANS, typeof(ID) AS TYP\n" + "\n" + "pipeline:\n" - + " parallelism: 1\n" + + " parallelism: %d\n" + " user-defined-function:\n" + " - name: addone\n" + " classpath: org.apache.flink.cdc.udf.examples.%s.AddOneFunctionClass\n" @@ -146,6 +146,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment { transformRenameDatabase.getDatabaseName(), transformRenameDatabase.getDatabaseName(), transformRenameDatabase.getDatabaseName(), + parallelism, language, language, language, @@ -267,7 +268,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment { + " projection: ID, VERSION, typeof(ID) AS TYP\n" + "\n" + "pipeline:\n" - + " parallelism: 1\n" + + " parallelism: %d\n" + " user-defined-function:\n" + " - name: addone\n" + " classpath: org.apache.flink.udf.examples.%s.AddOneFunctionClass\n" @@ -281,6 +282,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment { transformRenameDatabase.getDatabaseName(), transformRenameDatabase.getDatabaseName(), transformRenameDatabase.getDatabaseName(), + parallelism, language, language, language); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index d1c0bb7e7..530b34163 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -67,6 +67,8 @@ public abstract class PipelineTestEnvironment extends TestLogger { @Parameterized.Parameter public String flinkVersion; + public Integer parallelism = 4; + // ------------------------------------------------------------------------------------------ // Flink Variables // ------------------------------------------------------------------------------------------ diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index a700fd39c..1b4f50e89 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -39,20 +39,17 @@ import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; -import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse; import org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; -import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -120,6 +117,7 @@ public class SchemaOperator extends AbstractStreamOperator private final SchemaChangeBehavior schemaChangeBehavior; private transient SchemaOperatorMetrics schemaOperatorMetrics; + private transient int subTaskId; @VisibleForTesting public SchemaOperator(List routingRules) { @@ -153,6 +151,7 @@ public class SchemaOperator extends AbstractStreamOperator schemaOperatorMetrics = new SchemaOperatorMetrics( getRuntimeContext().getMetricGroup(), schemaChangeBehavior); + subTaskId = getRuntimeContext().getIndexOfThisSubtask(); } @Override @@ -218,17 +217,6 @@ public class SchemaOperator extends AbstractStreamOperator }); } - @Override - public void initializeState(StateInitializationContext context) throws Exception { - if (context.isRestored()) { - // Multiple operators may appear during a restart process, - // only clear the pendingSchemaChanges when the first operator starts. - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - sendRequestToCoordinator(new RefreshPendingListsRequest()); - } - } - } - /** * This method is guaranteed to not be called concurrently with other methods of the operator. */ @@ -248,7 +236,11 @@ public class SchemaOperator extends AbstractStreamOperator private void processSchemaChangeEvents(SchemaChangeEvent event) throws InterruptedException, TimeoutException, ExecutionException { TableId tableId = event.tableId(); - LOG.info("Table {} received SchemaChangeEvent and start to be blocked.", tableId); + LOG.info( + "{}> Table {} received SchemaChangeEvent {} and start to be blocked.", + subTaskId, + tableId, + event); handleSchemaChangeEvent(tableId, event); // Update caches originalSchema.put(tableId, getLatestOriginalSchema(tableId)); @@ -412,36 +404,62 @@ public class SchemaOperator extends AbstractStreamOperator schemaChangeEvent)); } - // The request will need to send a FlushEvent or block until flushing finished + // The request will block if another schema change event is being handled SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); - if (!response.getSchemaChangeEvents().isEmpty()) { - LOG.info( - "Sending the FlushEvent for table {} in subtask {}.", - tableId, - getRuntimeContext().getIndexOfThisSubtask()); + if (response.isAccepted()) { + LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId); output.collect(new StreamRecord<>(new FlushEvent(tableId))); List expectedSchemaChangeEvents = response.getSchemaChangeEvents(); schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); // The request will block until flushing finished in each sink writer - ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream(); + SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult(); List finishedSchemaChangeEvents = schemaEvolveResponse.getFinishedSchemaChangeEvents(); // Update evolved schema changes based on apply results finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); + } else if (response.isDuplicate()) { + LOG.info( + "{}> Schema change event {} has been handled in another subTask already.", + subTaskId, + schemaChangeEvent); + } else if (response.isIgnored()) { + LOG.info( + "{}> Schema change event {} has been ignored. No schema evolution needed.", + subTaskId, + schemaChangeEvent); + } else { + throw new IllegalStateException("Unexpected response status " + response); } } private SchemaChangeResponse requestSchemaChange( - TableId tableId, SchemaChangeEvent schemaChangeEvent) { - return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); + TableId tableId, SchemaChangeEvent schemaChangeEvent) + throws InterruptedException, TimeoutException { + long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; + while (true) { + SchemaChangeResponse response = + sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); + if (response.isRegistryBusy()) { + if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { + LOG.info( + "{}> Schema Registry is busy now, waiting for next request...", + subTaskId); + Thread.sleep(1000); + } else { + throw new TimeoutException("TimeOut when requesting schema change"); + } + } else { + return response; + } + } } - private ReleaseUpstreamResponse requestReleaseUpstream() + private SchemaChangeResultResponse requestSchemaChangeResult() throws InterruptedException, TimeoutException { CoordinationResponse coordinationResponse = - sendRequestToCoordinator(new ReleaseUpstreamRequest()); + sendRequestToCoordinator(new SchemaChangeResultRequest()); long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (coordinationResponse instanceof SchemaChangeProcessingResponse) { if (System.currentTimeMillis() < nextRpcTimeOutMillis) { @@ -451,7 +469,7 @@ public class SchemaOperator extends AbstractStreamOperator throw new TimeoutException("TimeOut when requesting release upstream"); } } - return ((ReleaseUpstreamResponse) coordinationResponse); + return ((SchemaChangeResultResponse) coordinationResponse); } private diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java index b87ef5b12..60e70b7ed 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java @@ -18,11 +18,15 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.cdc.runtime.serializer.schema.SchemaSerializer; @@ -39,6 +43,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -93,6 +98,97 @@ public class SchemaManager { return behavior; } + /** + * This function checks if the given schema change event has been applied already. If so, it + * will be ignored to avoid sending duplicate evolved schema change events to sink metadata + * applier. + */ + public final boolean isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) { + TableId tableId = event.tableId(); + Optional latestSchema = getLatestOriginalSchema(tableId); + return Boolean.TRUE.equals( + SchemaChangeEventVisitor.visit( + event, + addColumnEvent -> { + // It has not been applied if schema does not even exist + if (!latestSchema.isPresent()) { + return false; + } + List existedColumns = latestSchema.get().getColumns(); + + // It has been applied only if all columns are present in existedColumns + for (AddColumnEvent.ColumnWithPosition column : + addColumnEvent.getAddedColumns()) { + if (!existedColumns.contains(column.getAddColumn())) { + return false; + } + } + return true; + }, + alterColumnTypeEvent -> { + // It has not been applied if schema does not even exist + if (!latestSchema.isPresent()) { + return false; + } + Schema schema = latestSchema.get(); + + // It has been applied only if all column types are set as expected + for (Map.Entry entry : + alterColumnTypeEvent.getTypeMapping().entrySet()) { + if (!schema.getColumn(entry.getKey()).isPresent() + || !schema.getColumn(entry.getKey()) + .get() + .getType() + .equals(entry.getValue())) { + return false; + } + } + return true; + }, + createTableEvent -> { + // It has been applied if such table already exists + return latestSchema.isPresent(); + }, + dropColumnEvent -> { + // It has not been applied if schema does not even exist + if (!latestSchema.isPresent()) { + return false; + } + List existedColumnNames = latestSchema.get().getColumnNames(); + + // It has been applied only if corresponding column types do not exist + return dropColumnEvent.getDroppedColumnNames().stream() + .noneMatch(existedColumnNames::contains); + }, + dropTableEvent -> { + // It has been applied if such table does not exist + return !latestSchema.isPresent(); + }, + renameColumnEvent -> { + // It has been applied if such table already exists + if (!latestSchema.isPresent()) { + return false; + } + List existedColumnNames = latestSchema.get().getColumnNames(); + + // It has been applied only if all previous names do not exist, and all + // new names already exist + for (Map.Entry entry : + renameColumnEvent.getNameMapping().entrySet()) { + if (existedColumnNames.contains(entry.getKey()) + || !existedColumnNames.contains(entry.getValue())) { + return false; + } + } + return true; + }, + truncateTableEvent -> { + // We have no way to ensure if a TruncateTableEvent has been applied + // before. Just assume it's not. + return false; + })); + } + public final boolean schemaExists( Map> schemaMap, TableId tableId) { return schemaMap.containsKey(tableId) && !schemaMap.get(tableId).isEmpty(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 9087ae4b3..8ea3a1f93 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -29,8 +29,6 @@ import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaReque import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse; import org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.event.GetOriginalSchemaResponse; -import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent; @@ -201,18 +199,14 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH if (request instanceof SchemaChangeRequest) { SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; return requestHandler.handleSchemaChangeRequest(schemaChangeRequest); - } else if (request instanceof ReleaseUpstreamRequest) { - return requestHandler.handleReleaseUpstreamRequest(); + } else if (request instanceof SchemaChangeResultRequest) { + return requestHandler.getSchemaChangeResult(); } else if (request instanceof GetEvolvedSchemaRequest) { return CompletableFuture.completedFuture( wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request)))); } else if (request instanceof GetOriginalSchemaRequest) { return CompletableFuture.completedFuture( wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request))); - } else if (request instanceof SchemaChangeResultRequest) { - return requestHandler.getSchemaChangeResult(); - } else if (request instanceof RefreshPendingListsRequest) { - return requestHandler.refreshPendingLists(); } else { throw new IllegalArgumentException( "Unrecognized CoordinationRequest type: " + request); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 60280ce48..99019a6b4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; @@ -32,41 +33,36 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse; -import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; +import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse; +import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.NotThreadSafe; - import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST; import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap; /** A handler to deal with all requests and events for {@link SchemaRegistry}. */ @Internal -@NotThreadSafe public class SchemaRegistryRequestHandler implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class); @@ -80,21 +76,18 @@ public class SchemaRegistryRequestHandler implements Closeable { private final SchemaDerivation schemaDerivation; /** - * Not applied SchemaChangeRequest before receiving all flush success events for its table from - * sink writers. + * Atomic flag indicating if current RequestHandler could accept more schema changes for now. */ - private final List pendingSchemaChanges; + private final AtomicReference schemaChangeStatus; - private final List finishedSchemaChanges; - private final List ignoredSchemaChanges; + private volatile Throwable currentChangeException; + private volatile List currentDerivedSchemaChangeEvents; + private volatile List currentFinishedSchemaChanges; + private volatile List currentIgnoredSchemaChanges; /** Sink writers which have sent flush success events for the request. */ private final Set flushedSinkWriters; - /** Status of the execution of current schema change request. */ - private volatile boolean isSchemaChangeApplying; - /** Actual exception if failed to apply schema change. */ - private volatile Throwable schemaChangeException; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; @@ -106,16 +99,81 @@ public class SchemaRegistryRequestHandler implements Closeable { SchemaDerivation schemaDerivation, SchemaChangeBehavior schemaChangeBehavior) { this.metadataApplier = metadataApplier; - this.activeSinkWriters = new HashSet<>(); - this.flushedSinkWriters = new HashSet<>(); - this.pendingSchemaChanges = new LinkedList<>(); - this.finishedSchemaChanges = new LinkedList<>(); - this.ignoredSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; - this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); - this.isSchemaChangeApplying = false; this.schemaChangeBehavior = schemaChangeBehavior; + + this.activeSinkWriters = new HashSet<>(); + this.flushedSinkWriters = new HashSet<>(); + this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); + + this.currentDerivedSchemaChangeEvents = new ArrayList<>(); + this.currentFinishedSchemaChanges = new ArrayList<>(); + this.currentIgnoredSchemaChanges = new ArrayList<>(); + this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE); + } + + /** + * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. + * + * @param request the received SchemaChangeRequest + */ + public CompletableFuture handleSchemaChangeRequest( + SchemaChangeRequest request) { + if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) { + LOG.info( + "Received schema change event request {} from table {}. Start to buffer requests for others.", + request.getSchemaChangeEvent(), + request.getTableId().toString()); + SchemaChangeEvent event = request.getSchemaChangeEvent(); + + // If this schema change event has been requested by another subTask, ignore it. + if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { + LOG.info("Event {} has been addressed before, ignoring it.", event); + clearCurrentSchemaChangeRequest(); + Preconditions.checkState( + schemaChangeStatus.compareAndSet( + RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), + "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated."); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate())); + } + schemaManager.applyOriginalSchemaChange(event); + List derivedSchemaChangeEvents = + calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); + + // If this schema change event is filtered out by LENIENT mode or merging table route + // strategies, ignore it. + if (derivedSchemaChangeEvents.isEmpty()) { + LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event); + clearCurrentSchemaChangeRequest(); + Preconditions.checkState( + schemaChangeStatus.compareAndSet( + RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), + "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored."); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored())); + } + + // Backfill pre-schema info for sink applying + derivedSchemaChangeEvents.forEach( + e -> { + if (e instanceof SchemaChangeEventWithPreSchema) { + SchemaChangeEventWithPreSchema pe = (SchemaChangeEventWithPreSchema) e; + if (!pe.hasPreSchema()) { + schemaManager + .getLatestEvolvedSchema(pe.tableId()) + .ifPresent(pe::fillPreSchema); + } + } + }); + currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); + return CompletableFuture.completedFuture( + wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); + } else { + LOG.info( + "Schema Registry is busy processing a schema change request, could not handle request {} for now.", + request); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy())); + } } /** @@ -126,27 +184,22 @@ public class SchemaRegistryRequestHandler implements Closeable { */ private void applySchemaChange( TableId tableId, List derivedSchemaChangeEvents) { - isSchemaChangeApplying = true; - schemaChangeException = null; - finishedSchemaChanges.clear(); - ignoredSchemaChanges.clear(); - for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { - ignoredSchemaChanges.add(changeEvent); + currentIgnoredSchemaChanges.add(changeEvent); continue; } } if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) { LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId); - ignoredSchemaChanges.add(changeEvent); + currentIgnoredSchemaChanges.add(changeEvent); } else { try { metadataApplier.applySchemaChange(changeEvent); - LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId); + LOG.info("Applied schema change {} to table {}.", changeEvent, tableId); schemaManager.applyEvolvedSchemaChange(changeEvent); - finishedSchemaChanges.add(changeEvent); + currentFinishedSchemaChanges.add(changeEvent); } catch (Throwable t) { LOG.error( "Failed to apply schema change {} to table {}. Caused by: {}", @@ -154,7 +207,7 @@ public class SchemaRegistryRequestHandler implements Closeable { tableId, t); if (!shouldIgnoreException(t)) { - schemaChangeException = t; + currentChangeException = t; break; } else { LOG.warn( @@ -165,62 +218,9 @@ public class SchemaRegistryRequestHandler implements Closeable { } } } - - PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); - if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) { - startNextSchemaChangeRequest(); - } - isSchemaChangeApplying = false; - } - - /** - * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. - * - * @param request the received SchemaChangeRequest - */ - public CompletableFuture handleSchemaChangeRequest( - SchemaChangeRequest request) { - if (pendingSchemaChanges.isEmpty()) { - LOG.info( - "Received schema change event request from table {}. Start to buffer requests for others.", - request.getTableId().toString()); - if (request.getSchemaChangeEvent() instanceof CreateTableEvent - && schemaManager.originalSchemaExists(request.getTableId())) { - return CompletableFuture.completedFuture( - wrap(new SchemaChangeResponse(Collections.emptyList()))); - } - schemaManager.applyOriginalSchemaChange(request.getSchemaChangeEvent()); - List derivedSchemaChangeEvents = - calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); - CompletableFuture response = - CompletableFuture.completedFuture( - wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); - if (!derivedSchemaChangeEvents.isEmpty()) { - PendingSchemaChange pendingSchemaChange = - new PendingSchemaChange(request, response); - pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents; - pendingSchemaChanges.add(pendingSchemaChange); - pendingSchemaChanges.get(0).startToWaitForReleaseRequest(); - } - return response; - } else { - LOG.info("There are already processing requests. Wait for processing."); - CompletableFuture response = new CompletableFuture<>(); - pendingSchemaChanges.add(new PendingSchemaChange(request, response)); - return response; - } - } - - /** Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing. */ - public CompletableFuture handleReleaseUpstreamRequest() { - CompletableFuture response = - pendingSchemaChanges.get(0).getResponseFuture(); - if (response.isDone() && !isSchemaChangeApplying) { - startNextSchemaChangeRequest(); - } else { - pendingSchemaChanges.get(0).receiveReleaseRequest(); - } - return response; + Preconditions.checkState( + schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED), + "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes"); } /** @@ -239,76 +239,33 @@ public class SchemaRegistryRequestHandler implements Closeable { * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about * @param sinkSubtask the sink subtask succeed flushing */ - public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedException { + public void flushSuccess(TableId tableId, int sinkSubtask) { flushedSinkWriters.add(sinkSubtask); if (flushedSinkWriters.equals(activeSinkWriters)) { + Preconditions.checkState( + schemaChangeStatus.compareAndSet( + RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING), + "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents"); LOG.info( "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); - PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); schemaChangeThreadPool.submit( - () -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents)); - Thread.sleep(1000); - - if (schemaChangeException != null) { - throw new RuntimeException("Failed to apply schema change.", schemaChangeException); - } - - if (isSchemaChangeApplying) { - waitFlushSuccess - .getResponseFuture() - .complete(wrap(new SchemaChangeProcessingResponse())); - } else { - waitFlushSuccess - .getResponseFuture() - .complete(wrap(new ReleaseUpstreamResponse(finishedSchemaChanges))); - } + () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents)); } } - private void startNextSchemaChangeRequest() { - pendingSchemaChanges.remove(0); - flushedSinkWriters.clear(); - while (!pendingSchemaChanges.isEmpty()) { - PendingSchemaChange pendingSchemaChange = pendingSchemaChanges.get(0); - SchemaChangeRequest request = pendingSchemaChange.changeRequest; - if (request.getSchemaChangeEvent() instanceof CreateTableEvent - && schemaManager.evolvedSchemaExists(request.getTableId())) { - pendingSchemaChange - .getResponseFuture() - .complete(wrap(new SchemaChangeResponse(Collections.emptyList()))); - pendingSchemaChanges.remove(0); - } else { - List derivedSchemaChangeEvents = - calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); - pendingSchemaChange - .getResponseFuture() - .complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); - if (!derivedSchemaChangeEvents.isEmpty()) { - pendingSchemaChange.derivedSchemaChangeEvents = derivedSchemaChangeEvents; - pendingSchemaChange.startToWaitForReleaseRequest(); - break; - } - } - } - } - - public CompletableFuture refreshPendingLists() { - pendingSchemaChanges.clear(); - flushedSinkWriters.clear(); - return CompletableFuture.completedFuture(wrap(new RefreshPendingListsResponse())); - } - public CompletableFuture getSchemaChangeResult() { - if (schemaChangeException != null) { - throw new RuntimeException("Failed to apply schema change.", schemaChangeException); - } - - if (isSchemaChangeApplying) { - return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse())); - } else { + Preconditions.checkState( + !schemaChangeStatus.get().equals(RequestStatus.IDLE), + "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results."); + if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) { + // This request has been finished, return it and prepare for the next request + List finishedEvents = clearCurrentSchemaChangeRequest(); return CompletableFuture.supplyAsync( - () -> wrap(new ReleaseUpstreamResponse(finishedSchemaChanges))); + () -> wrap(new SchemaChangeResultResponse(finishedEvents))); + } else { + // Still working on schema change request, waiting it + return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse())); } } @@ -422,63 +379,64 @@ public class SchemaRegistryRequestHandler implements Closeable { } return events; } + case DROP_TABLE: + // We don't drop any tables in Lenient mode. + LOG.info("A drop table event {} has been ignored in Lenient mode.", event); + return Collections.emptyList(); default: return Collections.singletonList(event); } } private boolean shouldIgnoreException(Throwable throwable) { - // In IGNORE mode, will never try to apply schema change events - // In EVOLVE and and LENIENT mode, such failure will not be tolerated + // In EVOLVE and LENIENT mode, such failure will not be tolerated // In EXCEPTION mode, an exception will be thrown once captured return (throwable instanceof UnsupportedSchemaChangeEventException) && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE); } - private static class PendingSchemaChange { - private final SchemaChangeRequest changeRequest; - private List derivedSchemaChangeEvents; - private CompletableFuture responseFuture; - private RequestStatus status; - - public PendingSchemaChange( - SchemaChangeRequest changeRequest, - CompletableFuture responseFuture) { - this.changeRequest = changeRequest; - this.responseFuture = responseFuture; - this.status = RequestStatus.PENDING; - } - - public SchemaChangeRequest getChangeRequest() { - return changeRequest; - } - - public CompletableFuture getResponseFuture() { - return responseFuture; - } - - public RequestStatus getStatus() { - return status; - } - - public void startToWaitForReleaseRequest() { - if (!responseFuture.isDone()) { - throw new IllegalStateException( - "Cannot start to wait for flush success before the SchemaChangeRequest is done."); - } - this.responseFuture = new CompletableFuture<>(); - this.status = RequestStatus.WAIT_RELEASE_REQUEST; - } - - public void receiveReleaseRequest() { - this.status = RECEIVED_RELEASE_REQUEST; + private List clearCurrentSchemaChangeRequest() { + if (currentChangeException != null) { + throw new RuntimeException("Failed to apply schema change.", currentChangeException); } + List finishedSchemaChanges = + new ArrayList<>(currentFinishedSchemaChanges); + flushedSinkWriters.clear(); + currentDerivedSchemaChangeEvents.clear(); + currentFinishedSchemaChanges.clear(); + currentIgnoredSchemaChanges.clear(); + currentChangeException = null; + return finishedSchemaChanges; } - enum RequestStatus { - PENDING, - WAIT_RELEASE_REQUEST, - RECEIVED_RELEASE_REQUEST + // Schema change event state could transfer in the following way: + // + // -------- B -------- + // | | + // v | + // -------- --------------------- + // | IDLE | --- A --> | WAITING_FOR_FLUSH | + // -------- --------------------- + // ^ | + // E C + // \ v + // ------------ ------------ + // | FINISHED | <-- D -- | APPLYING | + // ------------ ------------ + // + // A: When a request came to an idling request handler. + // B: When current request is duplicate or ignored by LENIENT / routed table merging + // strategies. + // C: When schema registry collected enough flush success events, and actually started to apply + // schema changes. + // D: When schema change application finishes (successfully or with exceptions) + // E: When current schema change request result has been retrieved by SchemaOperator, and ready + // for the next request. + private enum RequestStatus { + IDLE, + WAITING_FOR_FLUSH, + APPLYING, + FINISHED } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java deleted file mode 100644 index a0496c935..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; - -/** Request to refresh the pendingSchemaChanges of {@link SchemaRegistryRequestHandler}. */ -public class RefreshPendingListsRequest implements CoordinationRequest { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java deleted file mode 100644 index ff0221deb..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; - -/** Response to refresh the pendingSchemaChanges of {@link RefreshPendingListsRequest}. */ -public class RefreshPendingListsResponse implements CoordinationResponse { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java deleted file mode 100644 index 3da85c184..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.common.event.FlushEvent; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; - -/** - * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to release upstream - * after sending {@link FlushEvent}. - */ -public class ReleaseUpstreamRequest implements CoordinationRequest { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java index b679ab9e5..bf8d84f35 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java @@ -23,8 +23,8 @@ import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; /** - * The response for {@link SchemaChangeResultRequest} or {@link ReleaseUpstreamRequest} from {@link - * SchemaRegistry} to {@link SchemaOperator} if not apply {@link SchemaChangeEvent} in time. + * The response for {@link SchemaChangeResultRequest} from {@link SchemaRegistry} to {@link + * SchemaOperator} if not apply {@link SchemaChangeEvent} in time. */ public class SchemaChangeProcessingResponse implements CoordinationResponse { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java index 142de431e..63d57139b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -38,8 +39,44 @@ public class SchemaChangeResponse implements CoordinationResponse { */ private final List schemaChangeEvents; - public SchemaChangeResponse(List schemaChangeEvents) { + private final ResponseCode responseCode; + + public static SchemaChangeResponse accepted(List schemaChangeEvents) { + return new SchemaChangeResponse(schemaChangeEvents, ResponseCode.ACCEPTED); + } + + public static SchemaChangeResponse busy() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.BUSY); + } + + public static SchemaChangeResponse duplicate() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.DUPLICATE); + } + + public static SchemaChangeResponse ignored() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED); + } + + private SchemaChangeResponse( + List schemaChangeEvents, ResponseCode responseCode) { this.schemaChangeEvents = schemaChangeEvents; + this.responseCode = responseCode; + } + + public boolean isAccepted() { + return ResponseCode.ACCEPTED.equals(responseCode); + } + + public boolean isRegistryBusy() { + return ResponseCode.BUSY.equals(responseCode); + } + + public boolean isDuplicate() { + return ResponseCode.DUPLICATE.equals(responseCode); + } + + public boolean isIgnored() { + return ResponseCode.IGNORED.equals(responseCode); } public List getSchemaChangeEvents() { @@ -55,11 +92,43 @@ public class SchemaChangeResponse implements CoordinationResponse { return false; } SchemaChangeResponse response = (SchemaChangeResponse) o; - return schemaChangeEvents.equals(response.schemaChangeEvents); + return Objects.equals(schemaChangeEvents, response.schemaChangeEvents) + && responseCode == response.responseCode; } @Override public int hashCode() { - return Objects.hash(schemaChangeEvents); + return Objects.hash(schemaChangeEvents, responseCode); + } + + @Override + public String toString() { + return "SchemaChangeResponse{" + + "schemaChangeEvents=" + + schemaChangeEvents + + ", responseCode=" + + responseCode + + '}'; + } + + /** + * Schema Change Response status code. + * + *

- Accepted: Requested schema change request has been accepted exclusively. Any other + * schema change requests will be blocked. + * + *

- Busy: Schema registry is currently busy processing another schema change request. + * + *

- Duplicate: This schema change request has been submitted before, possibly by another + * paralleled subTask. + * + *

- Ignored: This schema change request has been assessed, but no actual evolution is + * required. Possibly caused by LENIENT mode or merging table strategies. + */ + public enum ResponseCode { + ACCEPTED, + BUSY, + DUPLICATE, + IGNORED } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java similarity index 87% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java index bea880962..7039ef086 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultResponse.java @@ -26,10 +26,10 @@ import java.util.List; import java.util.Objects; /** - * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link + * The response for {@link SchemaChangeResultRequest} from {@link SchemaRegistry} to {@link * SchemaOperator}. */ -public class ReleaseUpstreamResponse implements CoordinationResponse { +public class SchemaChangeResultResponse implements CoordinationResponse { private static final long serialVersionUID = 1L; @@ -39,7 +39,7 @@ public class ReleaseUpstreamResponse implements CoordinationResponse { */ private final List finishedSchemaChangeEvents; - public ReleaseUpstreamResponse(List finishedSchemaChangeEvents) { + public SchemaChangeResultResponse(List finishedSchemaChangeEvents) { this.finishedSchemaChangeEvents = finishedSchemaChangeEvents; } @@ -63,7 +63,7 @@ public class ReleaseUpstreamResponse implements CoordinationResponse { if (object == null || getClass() != object.getClass()) { return false; } - ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object; + SchemaChangeResultResponse that = (SchemaChangeResultResponse) object; return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents); } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index 354304599..60952b424 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -163,7 +163,6 @@ public class EventOperatorTestHarness, E ex } public void registerTableSchema(TableId tableId, Schema schema) { - schemaRegistry.handleApplyOriginalSchemaChangeEvent(new CreateTableEvent(tableId, schema)); schemaRegistry.handleCoordinationRequest( new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema))); schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema));