From 2d47b15782a08e10eb560997f18ac1cd1a5cb62b Mon Sep 17 00:00:00 2001 From: Junwang Zhao Date: Mon, 21 Mar 2022 22:32:35 +0800 Subject: [PATCH] [hotfix][mysql] Avoid schema isn't known error when scanNewlyAddedTable enabled (#965) --- .../task/context/MySqlErrorHandler.java | 74 +++++++++++ .../task/context/StatefulTaskContext.java | 4 +- .../mysql/source/MySqlSourceITCase.java | 119 +++++++++++++++++- 3 files changed, 191 insertions(+), 6 deletions(-) create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java new file mode 100644 index 000000000..f5ddee095 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.debezium.task.context; + +import io.debezium.DebeziumException; +import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.connector.mysql.MySqlConnector; +import io.debezium.connector.mysql.MySqlTaskContext; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.relational.TableId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A subclass implementation of {@link ErrorHandler} which filter some {@link DebeziumException}, we + * use this class instead of {@link io.debezium.connector.mysql.MySqlErrorHandler}. + */ +public class MySqlErrorHandler extends ErrorHandler { + private static final Logger LOG = LoggerFactory.getLogger(MySqlErrorHandler.class); + private static final Pattern NOT_FOUND_TABLE_MSG_PATTERN = + Pattern.compile( + "Encountered change event for table (.+)\\.(.+) whose schema isn't known to this connector"); + + MySqlTaskContext context; + + public MySqlErrorHandler( + String logicalName, ChangeEventQueue queue, MySqlTaskContext context) { + super(MySqlConnector.class, logicalName, queue); + this.context = context; + } + + @Override + protected boolean isRetriable(Throwable throwable) { + return false; + } + + @Override + public void setProducerThrowable(Throwable producerThrowable) { + if (producerThrowable.getCause() instanceof DebeziumException) { + DebeziumException e = (DebeziumException) producerThrowable.getCause(); + String detailMessage = e.getMessage(); + Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage); + if (matcher.find()) { + String databaseName = matcher.group(1); + String tableName = matcher.group(2); + TableId tableId = new TableId(databaseName, null, tableName); + if (context.getSchema().schemaFor(tableId) == null) { + LOG.warn("Schema for table " + tableId + " is null"); + return; + } + } + } + super.setProducerThrowable(producerThrowable); + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 39f22d953..89adf1eef 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -31,7 +31,6 @@ import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlDatabaseSchema; -import io.debezium.connector.mysql.MySqlErrorHandler; import io.debezium.connector.mysql.MySqlOffsetContext; import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.connector.mysql.MySqlTopicSelector; @@ -153,7 +152,8 @@ public class StatefulTaskContext { this.streamingChangeEventSourceMetrics = changeEventSourceMetricsFactory.getStreamingMetrics( taskContext, queue, metadataProvider); - this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue); + this.errorHandler = + new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, taskContext); } private void validateAndLoadDatabaseHistory( diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index 1bfc8d953..4aec4b202 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -138,7 +138,23 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { @Test public void testNewlyAddedTableForExistsPipelineOnce() throws Exception { testNewlyAddedTableOneByOne( - 1, FailoverType.NONE, FailoverPhase.NEVER, "address_hangzhou", "address_beijing"); + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + false, + "address_hangzhou", + "address_beijing"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineOnceWithAheadBinlog() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "address_hangzhou", + "address_beijing"); } @Test @@ -147,6 +163,19 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { DEFAULT_PARALLELISM, FailoverType.NONE, FailoverPhase.NEVER, + false, + "address_hangzhou", + "address_beijing", + "address_shanghai"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlog() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.NONE, + FailoverPhase.NEVER, + true, "address_hangzhou", "address_beijing", "address_shanghai"); @@ -155,7 +184,24 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { @Test public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception { testNewlyAddedTableOneByOne( - 1, FailoverType.NONE, FailoverPhase.NEVER, "address_hangzhou", "address_beijing"); + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + false, + "address_hangzhou", + "address_beijing"); + } + + @Test + public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadBinlog() + throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.NONE, + FailoverPhase.NEVER, + true, + "address_hangzhou", + "address_beijing"); } @Test @@ -164,6 +210,18 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { DEFAULT_PARALLELISM, FailoverType.JM, FailoverPhase.SNAPSHOT, + false, + "address_hangzhou", + "address_beijing"); + } + + @Test + public void testJobManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception { + testNewlyAddedTableOneByOne( + DEFAULT_PARALLELISM, + FailoverType.JM, + FailoverPhase.SNAPSHOT, + true, "address_hangzhou", "address_beijing"); } @@ -171,7 +229,23 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { @Test public void testTaskManagerFailoverForNewlyAddedTable() throws Exception { testNewlyAddedTableOneByOne( - 1, FailoverType.TM, FailoverPhase.BINLOG, "address_hangzhou", "address_beijing"); + 1, + FailoverType.TM, + FailoverPhase.BINLOG, + false, + "address_hangzhou", + "address_beijing"); + } + + @Test + public void testTaskManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception { + testNewlyAddedTableOneByOne( + 1, + FailoverType.TM, + FailoverPhase.BINLOG, + false, + "address_hangzhou", + "address_beijing"); } private void testMySqlParallelSource( @@ -304,6 +378,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { int parallelism, FailoverType failoverType, FailoverPhase failoverPhase, + boolean makeBinlogBeforeCapture, String... captureAddressTables) throws Exception { @@ -324,6 +399,10 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { Arrays.asList(captureAddressTables) .subList(0, round + 1) .toArray(new String[0]); + String newlyAddedTable = captureAddressTables[round]; + if (makeBinlogBeforeCapture) { + makeBinlogBeforeCaptureForAddressTable(getConnection(), newlyAddedTable); + } StreamExecutionEnvironment env = getStreamExecutionEnvironment(finishedSavePointPath, parallelism); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); @@ -346,7 +425,6 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { JobClient jobClient = tableResult.getJobClient().get(); // step 2: assert fetched snapshot data in this round - String newlyAddedTable = captureAddressTables[round]; String cityName = newlyAddedTable.split("_")[1]; List expectedSnapshotDataThisRound = Arrays.asList( @@ -359,6 +437,22 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { format( "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", newlyAddedTable, cityName, cityName)); + if (makeBinlogBeforeCapture) { + expectedSnapshotDataThisRound = + Arrays.asList( + format( + "+I[%s, 416874195632735147, China, %s, %s West Town address 1]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 416927583791428523, China, %s, %s West Town address 2]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614379, China, %s, %s West Town address 3]", + newlyAddedTable, cityName, cityName), + format( + "+I[%s, 417022095255614381, China, %s, %s West Town address 5]", + newlyAddedTable, cityName, cityName)); + } // trigger failover after some snapshot data read finished if (failoverPhase == FailoverPhase.SNAPSHOT @@ -620,6 +714,23 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { } } + private void makeBinlogBeforeCaptureForAddressTable(JdbcConnection connection, String tableName) + throws SQLException { + try { + connection.setAutoCommit(false); + // make binlog before the capture of the table + String tableId = customDatabase.getDatabaseName() + "." + tableName; + String cityName = tableName.split("_")[1]; + connection.execute( + format( + "INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')", + tableId, cityName, cityName)); + connection.commit(); + } finally { + connection.close(); + } + } + private MySqlConnection getConnection() { Map properties = new HashMap<>(); properties.put("database.hostname", MYSQL_CONTAINER.getHost());