From 5efa34958b913bcf24eb5678cd33d6e731e23f97 Mon Sep 17 00:00:00 2001 From: Qingsheng Ren Date: Wed, 9 Nov 2022 18:47:53 +0800 Subject: [PATCH] [mysql] Improve error message under the case that start reading from earliest but schema change happened before (#1724) --- docs/content/connectors/mysql-cdc(ZH).md | 4 +- docs/content/connectors/mysql-cdc.md | 5 +- .../task/context/MySqlErrorHandler.java | 74 +++++++++++++++---- .../task/context/StatefulTaskContext.java | 3 +- .../exception/SchemaOutOfSyncException.java | 27 +++++++ .../reader/BinlogSplitReaderTest.java | 42 +++++++++++ 6 files changed, 138 insertions(+), 17 deletions(-) create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java diff --git a/docs/content/connectors/mysql-cdc(ZH).md b/docs/content/connectors/mysql-cdc(ZH).md index 62e6ad93f..f9f997015 100644 --- a/docs/content/connectors/mysql-cdc(ZH).md +++ b/docs/content/connectors/mysql-cdc(ZH).md @@ -589,8 +589,10 @@ CREATE TABLE mysql_source (...) WITH ( ) ``` -**注意**:MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"。 +**注意**: +1. MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"。 该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。 +2. 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。 ### DataStream Source diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index 4da3f5a72..ce07ce77d 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -597,8 +597,11 @@ CREATE TABLE mysql_source (...) WITH ( ) ``` -**Note:** MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix +**Notes:** +1. MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix "Binlog offset on checkpoint {checkpoint-id}". It could be useful if you want to restart the job from a specific checkpointed position. +2. If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp +could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed. ### DataStream Source diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java index 3fe6ec2ac..8579395d1 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java @@ -16,12 +16,17 @@ package com.ververica.cdc.connectors.mysql.debezium.task.context; +import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.table.StartupMode; import io.debezium.DebeziumException; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.mysql.MySqlConnector; import io.debezium.connector.mysql.MySqlTaskContext; import io.debezium.pipeline.ErrorHandler; import io.debezium.relational.TableId; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +43,17 @@ public class MySqlErrorHandler extends ErrorHandler { Pattern.compile( "Encountered change event for table (.+)\\.(.+) whose schema isn't known to this connector"); - MySqlTaskContext context; + private final MySqlTaskContext context; + private final MySqlSourceConfig sourceConfig; public MySqlErrorHandler( - String logicalName, ChangeEventQueue queue, MySqlTaskContext context) { + String logicalName, + ChangeEventQueue queue, + MySqlTaskContext context, + MySqlSourceConfig sourceConfig) { super(MySqlConnector.class, logicalName, queue); this.context = context; + this.sourceConfig = sourceConfig; } @Override @@ -53,20 +63,56 @@ public class MySqlErrorHandler extends ErrorHandler { @Override public void setProducerThrowable(Throwable producerThrowable) { - if (producerThrowable.getCause() instanceof DebeziumException) { - DebeziumException e = (DebeziumException) producerThrowable.getCause(); - String detailMessage = e.getMessage(); - Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage); - if (matcher.find()) { - String databaseName = matcher.group(1); - String tableName = matcher.group(2); - TableId tableId = new TableId(databaseName, null, tableName); - if (context.getSchema().schemaFor(tableId) == null) { - LOG.warn("Schema for table " + tableId + " is null"); - return; - } + if (isTableNotFoundException(producerThrowable)) { + Matcher matcher = + NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage()); + String databaseName = matcher.group(1); + String tableName = matcher.group(2); + TableId tableId = new TableId(databaseName, null, tableName); + if (context.getSchema().schemaFor(tableId) == null) { + LOG.warn("Schema for table " + tableId + " is null"); + return; } } + + if (isSchemaOutOfSyncException(producerThrowable)) { + super.setProducerThrowable( + new SchemaOutOfSyncException( + "Internal schema representation is probably out of sync with real database schema. " + + "The reason could be that the table schema was changed after the starting " + + "binlog offset, which is not supported when startup mode is set to " + + sourceConfig.getStartupOptions().startupMode, + producerThrowable)); + return; + } + super.setProducerThrowable(producerThrowable); } + + private boolean isTableNotFoundException(Throwable t) { + if (!(t.getCause() instanceof DebeziumException)) { + return false; + } + DebeziumException e = (DebeziumException) t.getCause(); + String detailMessage = e.getMessage(); + Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage); + return matcher.find(); + } + + private boolean isSchemaOutOfSyncException(Throwable t) { + Throwable rootCause = ExceptionUtils.getRootCause(t); + return rootCause instanceof ConnectException + && rootCause + .getMessage() + .endsWith( + "internal schema representation is probably out of sync with real database schema") + && isSettingStartingOffset(); + } + + private boolean isSettingStartingOffset() { + StartupMode startupMode = sourceConfig.getStartupOptions().startupMode; + return startupMode == StartupMode.EARLIEST_OFFSET + || startupMode == StartupMode.TIMESTAMP + || startupMode == StartupMode.SPECIFIC_OFFSETS; + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index fd1ade927..80c10bd45 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -165,7 +165,8 @@ public class StatefulTaskContext { changeEventSourceMetricsFactory.getStreamingMetrics( taskContext, queue, metadataProvider); this.errorHandler = - new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, taskContext); + new MySqlErrorHandler( + connectorConfig.getLogicalName(), queue, taskContext, sourceConfig); } private void validateAndLoadDatabaseHistory( diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java new file mode 100644 index 000000000..ca9f2014c --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/exception/SchemaOutOfSyncException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.debezium.task.context.exception; + +/** + * A wrapper class for clearly show the possible reason of a schema-out-of-sync exception thrown + * inside Debezium. + */ +public class SchemaOutOfSyncException extends Exception { + public SchemaOutOfSyncException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index e1f776bf9..264f9ce73 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -19,11 +19,13 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; +import org.apache.flink.util.ExceptionUtils; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher; import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException; import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner; @@ -65,7 +67,9 @@ import java.util.stream.Collectors; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; /** Tests for {@link BinlogSplitReader}. */ @@ -363,6 +367,43 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { assertEqualsInOrder(Arrays.asList(expected), actual); } + @Test + public void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception { + customerDatabase.createAndInitialize(); + MySqlSourceConfig sourceConfig = + getConfig(StartupOptions.earliest(), new String[] {"customers"}); + binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + String tableId = customerDatabase.qualifiedTableName("customers"); + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + + // Add a column to the table + addColumnToTable(mySqlConnection, tableId); + + // Create reader and submit splits + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); + BinlogSplitReader reader = createBinlogReader(sourceConfig); + reader.submitSplit(split); + + // An exception is expected here because the table schema is changed, which is not allowed + // under earliest startup mode. + Throwable throwable = + assertThrows(Throwable.class, () -> readBinlogSplits(dataType, reader, 1)); + Optional schemaOutOfSyncException = + ExceptionUtils.findThrowable(throwable, SchemaOutOfSyncException.class); + assertTrue(schemaOutOfSyncException.isPresent()); + assertEquals( + "Internal schema representation is probably out of sync with real database schema. " + + "The reason could be that the table schema was changed after the starting " + + "binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET", + schemaOutOfSyncException.get().getMessage()); + } + @Test public void testReadBinlogFromBinlogFilePosition() throws Exception { // Preparations @@ -1002,5 +1043,6 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase { private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception { connection.execute( "ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213"); + connection.commit(); } }