[FLINK-37217][mysql] Fix `MySqlErrorHandler` TableNotFoundException Unable to obtain table correctly

pull/3892/head
huyuanfeng 1 week ago
parent 3e16a66972
commit 99766c6e71

@ -32,6 +32,7 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -60,12 +61,9 @@ public class MySqlErrorHandler extends ErrorHandler {
@Override
public void setProducerThrowable(Throwable producerThrowable) {
if (isTableNotFoundException(producerThrowable)) {
Matcher matcher =
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
Optional<TableId> notFoundTable = extractNotFoundTableId(producerThrowable);
if (notFoundTable.isPresent()) {
TableId tableId = notFoundTable.get();
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
@ -86,14 +84,20 @@ public class MySqlErrorHandler extends ErrorHandler {
super.setProducerThrowable(producerThrowable);
}
private boolean isTableNotFoundException(Throwable t) {
private Optional<TableId> extractNotFoundTableId(Throwable t) {
if (!(t.getCause() instanceof DebeziumException)) {
return false;
return Optional.empty();
}
DebeziumException e = (DebeziumException) t.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
return matcher.find();
if (matcher.find()) {
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
return Optional.of(new TableId(databaseName, null, tableName));
} else {
return Optional.empty();
}
}
private boolean isSchemaOutOfSyncException(Throwable t) {

Loading…
Cancel
Save