|
|
|
@ -28,8 +28,6 @@ import io.debezium.connector.postgresql.connection.PostgresConnection;
|
|
|
|
|
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
|
|
|
|
|
import io.debezium.connector.postgresql.spi.SlotState;
|
|
|
|
|
|
|
|
|
|
import java.sql.SQLException;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The Postgres source enumerator that enumerates receive the split request and assign the split to
|
|
|
|
|
* source readers.
|
|
|
|
@ -60,32 +58,25 @@ public class PostgresSourceEnumerator extends IncrementalSourceEnumerator {
|
|
|
|
|
* reading the globalStreamSplit to catch all data changes.
|
|
|
|
|
*/
|
|
|
|
|
private void createSlotForGlobalStreamSplit() {
|
|
|
|
|
SlotState slotInfo = null;
|
|
|
|
|
try (PostgresConnection connection = postgresDialect.openJdbcConnection()) {
|
|
|
|
|
slotInfo =
|
|
|
|
|
SlotState slotInfo =
|
|
|
|
|
connection.getReplicationSlotState(
|
|
|
|
|
postgresDialect.getSlotName(), postgresDialect.getPluginName());
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new RuntimeException(
|
|
|
|
|
String.format(
|
|
|
|
|
"Fail to get the replication slot info, the slot name is %s.",
|
|
|
|
|
postgresDialect.getSlotName()),
|
|
|
|
|
e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// skip creating the replication slot when the slot exists.
|
|
|
|
|
if (slotInfo != null) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// skip creating the replication slot when the slot exists.
|
|
|
|
|
if (slotInfo != null) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
PostgresReplicationConnection replicationConnection =
|
|
|
|
|
postgresDialect.openPostgresReplicationConnection();
|
|
|
|
|
postgresDialect.openPostgresReplicationConnection(connection);
|
|
|
|
|
replicationConnection.createReplicationSlot();
|
|
|
|
|
replicationConnection.close(false);
|
|
|
|
|
|
|
|
|
|
} catch (Throwable t) {
|
|
|
|
|
throw new FlinkRuntimeException(
|
|
|
|
|
"Create Slot For Global Stream Split failed due to ", t);
|
|
|
|
|
String.format(
|
|
|
|
|
"Fail to get or create slot for global stream split, the slot name is %s. Due to: ",
|
|
|
|
|
postgresDialect.getSlotName()),
|
|
|
|
|
t);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|