diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java index 2e528eb9c..aa5a8e5f9 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java @@ -50,6 +50,7 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import org.apache.kafka.connect.source.SourceRecord; import java.util.Optional; +import java.util.UUID; import java.util.function.Supplier; import static com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME; @@ -78,12 +79,16 @@ public class MySqlParallelSource private final DebeziumDeserializationSchema deserializationSchema; private final Configuration config; private final String startupMode; + private final String historyInstanceName; public MySqlParallelSource( DebeziumDeserializationSchema deserializationSchema, Configuration config) { this.deserializationSchema = deserializationSchema; this.config = config; this.startupMode = config.get(SCAN_STARTUP_MODE); + this.historyInstanceName = + config.toMap() + .getOrDefault(DATABASE_HISTORY_INSTANCE_NAME, UUID.randomUUID().toString()); } @Override @@ -117,9 +122,7 @@ public class MySqlParallelSource // set the DatabaseHistory name for each reader, will used by debezium reader readerConfiguration.setString( DATABASE_HISTORY_INSTANCE_NAME, - config.toMap().get(DATABASE_HISTORY_INSTANCE_NAME) - + "_" - + readerContext.getIndexOfSubtask()); + historyInstanceName + "_" + readerContext.getIndexOfSubtask()); return readerConfiguration; }