|
|
|
@ -50,6 +50,7 @@ import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
|
|
|
|
|
import org.apache.kafka.connect.source.SourceRecord;
|
|
|
|
|
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
import java.util.UUID;
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME;
|
|
|
|
@ -78,12 +79,16 @@ public class MySqlParallelSource<T>
|
|
|
|
|
private final DebeziumDeserializationSchema<T> deserializationSchema;
|
|
|
|
|
private final Configuration config;
|
|
|
|
|
private final String startupMode;
|
|
|
|
|
private final String historyInstanceName;
|
|
|
|
|
|
|
|
|
|
public MySqlParallelSource(
|
|
|
|
|
DebeziumDeserializationSchema<T> deserializationSchema, Configuration config) {
|
|
|
|
|
this.deserializationSchema = deserializationSchema;
|
|
|
|
|
this.config = config;
|
|
|
|
|
this.startupMode = config.get(SCAN_STARTUP_MODE);
|
|
|
|
|
this.historyInstanceName =
|
|
|
|
|
config.toMap()
|
|
|
|
|
.getOrDefault(DATABASE_HISTORY_INSTANCE_NAME, UUID.randomUUID().toString());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -117,9 +122,7 @@ public class MySqlParallelSource<T>
|
|
|
|
|
// set the DatabaseHistory name for each reader, will used by debezium reader
|
|
|
|
|
readerConfiguration.setString(
|
|
|
|
|
DATABASE_HISTORY_INSTANCE_NAME,
|
|
|
|
|
config.toMap().get(DATABASE_HISTORY_INSTANCE_NAME)
|
|
|
|
|
+ "_"
|
|
|
|
|
+ readerContext.getIndexOfSubtask());
|
|
|
|
|
historyInstanceName + "_" + readerContext.getIndexOfSubtask());
|
|
|
|
|
return readerConfiguration;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|