[hotfix][mysql] Avoid schema isn't known error when scanNewlyAddedTable enabled (#965)

pull/933/head
Junwang Zhao 3 years ago committed by GitHub
parent a96e70a996
commit 2d47b15782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.debezium.task.context;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.TableId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A subclass implementation of {@link ErrorHandler} which filter some {@link DebeziumException}, we
* use this class instead of {@link io.debezium.connector.mysql.MySqlErrorHandler}.
*/
public class MySqlErrorHandler extends ErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(MySqlErrorHandler.class);
private static final Pattern NOT_FOUND_TABLE_MSG_PATTERN =
Pattern.compile(
"Encountered change event for table (.+)\\.(.+) whose schema isn't known to this connector");
MySqlTaskContext context;
public MySqlErrorHandler(
String logicalName, ChangeEventQueue<?> queue, MySqlTaskContext context) {
super(MySqlConnector.class, logicalName, queue);
this.context = context;
}
@Override
protected boolean isRetriable(Throwable throwable) {
return false;
}
@Override
public void setProducerThrowable(Throwable producerThrowable) {
if (producerThrowable.getCause() instanceof DebeziumException) {
DebeziumException e = (DebeziumException) producerThrowable.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
if (matcher.find()) {
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
}
}
}
super.setProducerThrowable(producerThrowable);
}
}

@ -31,7 +31,6 @@ import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlErrorHandler;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTopicSelector;
@ -153,7 +152,8 @@ public class StatefulTaskContext {
this.streamingChangeEventSourceMetrics =
changeEventSourceMetricsFactory.getStreamingMetrics(
taskContext, queue, metadataProvider);
this.errorHandler = new MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
this.errorHandler =
new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, taskContext);
}
private void validateAndLoadDatabaseHistory(

@ -138,7 +138,23 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
@Test
public void testNewlyAddedTableForExistsPipelineOnce() throws Exception {
testNewlyAddedTableOneByOne(
1, FailoverType.NONE, FailoverPhase.NEVER, "address_hangzhou", "address_beijing");
1,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"address_hangzhou",
"address_beijing");
}
@Test
public void testNewlyAddedTableForExistsPipelineOnceWithAheadBinlog() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"address_hangzhou",
"address_beijing");
}
@Test
@ -147,6 +163,19 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"address_hangzhou",
"address_beijing",
"address_shanghai");
}
@Test
public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlog() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"address_hangzhou",
"address_beijing",
"address_shanghai");
@ -155,7 +184,24 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
@Test
public void testNewlyAddedTableForExistsPipelineSingleParallelism() throws Exception {
testNewlyAddedTableOneByOne(
1, FailoverType.NONE, FailoverPhase.NEVER, "address_hangzhou", "address_beijing");
1,
FailoverType.NONE,
FailoverPhase.NEVER,
false,
"address_hangzhou",
"address_beijing");
}
@Test
public void testNewlyAddedTableForExistsPipelineSingleParallelismWithAheadBinlog()
throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"address_hangzhou",
"address_beijing");
}
@Test
@ -164,6 +210,18 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
DEFAULT_PARALLELISM,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
false,
"address_hangzhou",
"address_beijing");
}
@Test
public void testJobManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
true,
"address_hangzhou",
"address_beijing");
}
@ -171,7 +229,23 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
@Test
public void testTaskManagerFailoverForNewlyAddedTable() throws Exception {
testNewlyAddedTableOneByOne(
1, FailoverType.TM, FailoverPhase.BINLOG, "address_hangzhou", "address_beijing");
1,
FailoverType.TM,
FailoverPhase.BINLOG,
false,
"address_hangzhou",
"address_beijing");
}
@Test
public void testTaskManagerFailoverForNewlyAddedTableWithAheadBinlog() throws Exception {
testNewlyAddedTableOneByOne(
1,
FailoverType.TM,
FailoverPhase.BINLOG,
false,
"address_hangzhou",
"address_beijing");
}
private void testMySqlParallelSource(
@ -304,6 +378,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
boolean makeBinlogBeforeCapture,
String... captureAddressTables)
throws Exception {
@ -324,6 +399,10 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
Arrays.asList(captureAddressTables)
.subList(0, round + 1)
.toArray(new String[0]);
String newlyAddedTable = captureAddressTables[round];
if (makeBinlogBeforeCapture) {
makeBinlogBeforeCaptureForAddressTable(getConnection(), newlyAddedTable);
}
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@ -346,7 +425,6 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
JobClient jobClient = tableResult.getJobClient().get();
// step 2: assert fetched snapshot data in this round
String newlyAddedTable = captureAddressTables[round];
String cityName = newlyAddedTable.split("_")[1];
List<String> expectedSnapshotDataThisRound =
Arrays.asList(
@ -359,6 +437,22 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName));
if (makeBinlogBeforeCapture) {
expectedSnapshotDataThisRound =
Arrays.asList(
format(
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
newlyAddedTable, cityName, cityName),
format(
"+I[%s, 417022095255614381, China, %s, %s West Town address 5]",
newlyAddedTable, cityName, cityName));
}
// trigger failover after some snapshot data read finished
if (failoverPhase == FailoverPhase.SNAPSHOT
@ -620,6 +714,23 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
}
private void makeBinlogBeforeCaptureForAddressTable(JdbcConnection connection, String tableName)
throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog before the capture of the table
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
format(
"INSERT INTO %s VALUES(417022095255614381, 'China','%s','%s West Town address 5')",
tableId, cityName, cityName));
connection.commit();
} finally {
connection.close();
}
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());

Loading…
Cancel
Save