|
|
|
@ -59,7 +59,6 @@ import java.util.concurrent.ExecutorService;
|
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data
|
|
|
|
@ -302,11 +301,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
|
|
|
|
|
// history instance name to initialize FlinkDatabaseHistory
|
|
|
|
|
properties.setProperty(FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName);
|
|
|
|
|
|
|
|
|
|
// dump the properties
|
|
|
|
|
String propsString = properties.entrySet().stream()
|
|
|
|
|
.map(t -> "\t" + t.getKey().toString() + " = " + t.getValue().toString() + "\n")
|
|
|
|
|
.collect(Collectors.joining());
|
|
|
|
|
LOG.info("Debezium Properties:\n{}", propsString);
|
|
|
|
|
this.debeziumConsumer = new DebeziumChangeConsumer<>(
|
|
|
|
|
sourceContext,
|
|
|
|
|
deserializer,
|
|
|
|
|