[postgres] Backfill task will be able to end when there is not new change data but read the ending lsn

pull/2205/head
Hang Ruan 2 years ago
parent 1a69cef424
commit 6d6881663a

@ -0,0 +1,530 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Threads;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Copied from Debezium 1.9.7.
*
* <p>Line 150~151 : set the ending lsn for the replication connection.
*/
public class PostgresStreamingChangeEventSource
implements StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> {
private static final String KEEP_ALIVE_THREAD_NAME = "keep-alive";
/**
* Number of received events without sending anything to Kafka which will trigger a "WAL backlog
* growing" warning.
*/
private static final int GROWING_WAL_WARNING_LOG_INTERVAL = 10_000;
private static final Logger LOGGER =
LoggerFactory.getLogger(PostgresStreamingChangeEventSource.class);
// PGOUTPUT decoder sends the messages with larger time gaps than other decoders
// We thus try to read the message multiple times before we make poll pause
private static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5;
private final PostgresConnection connection;
private final PostgresEventDispatcher<TableId> dispatcher;
private final ErrorHandler errorHandler;
private final Clock clock;
private final PostgresSchema schema;
private final PostgresConnectorConfig connectorConfig;
private final PostgresTaskContext taskContext;
private final ReplicationConnection replicationConnection;
private final AtomicReference<ReplicationStream> replicationStream = new AtomicReference<>();
private final Snapshotter snapshotter;
private final DelayStrategy pauseNoMessage;
private final ElapsedTimeStrategy connectionProbeTimer;
// Offset committing is an asynchronous operation.
// When connector is restarted we cannot be sure about timing of recovery, offset committing
// etc.
// as this is driven by Kafka Connect. This might be a root cause of DBZ-5163.
// This flag will ensure that LSN is flushed only if we are really in message processing mode.
private volatile boolean lsnFlushingAllowed = false;
/**
* The minimum of (number of event received since the last event sent to Kafka, number of event
* received since last WAL growing warning issued).
*/
private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
private Lsn lastCompletelyProcessedLsn;
public PostgresStreamingChangeEventSource(
PostgresConnectorConfig connectorConfig,
Snapshotter snapshotter,
PostgresConnection connection,
PostgresEventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
PostgresSchema schema,
PostgresTaskContext taskContext,
ReplicationConnection replicationConnection) {
this.connectorConfig = connectorConfig;
this.connection = connection;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
this.schema = schema;
pauseNoMessage =
DelayStrategy.constant(taskContext.getConfig().getPollInterval().toMillis());
this.taskContext = taskContext;
this.snapshotter = snapshotter;
this.replicationConnection = replicationConnection;
this.connectionProbeTimer =
ElapsedTimeStrategy.constant(
Clock.system(), connectorConfig.statusUpdateInterval());
}
@Override
public void init() {
// refresh the schema so we have a latest view of the DB tables
try {
taskContext.refreshSchema(connection, true);
} catch (SQLException e) {
throw new DebeziumException("Error while executing initial schema load", e);
}
}
@Override
public void execute(
ChangeEventSourceContext context,
PostgresPartition partition,
PostgresOffsetContext offsetContext)
throws InterruptedException {
if (!snapshotter.shouldStream()) {
LOGGER.info("Streaming is not enabled in correct configuration");
return;
}
lsnFlushingAllowed = false;
// replication slot could exist at the time of starting Debezium so we will stream from the
// position in the slot
// instead of the last position in the database
boolean hasStartLsnStoredInContext = offsetContext != null;
if (!hasStartLsnStoredInContext) {
offsetContext =
PostgresOffsetContext.initialContext(connectorConfig, connection, clock);
}
try {
final WalPositionLocator walPosition;
((PostgresReplicationConnection) replicationConnection)
.setEndingPos(offsetContext.getStreamingStoppingLsn());
if (hasStartLsnStoredInContext) {
// start streaming from the last recorded position in the offset
final Lsn lsn =
offsetContext.lastCompletelyProcessedLsn() != null
? offsetContext.lastCompletelyProcessedLsn()
: offsetContext.lsn();
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(offsetContext.lastCommitLsn(), lsn);
replicationStream.compareAndSet(
null, replicationConnection.startStreaming(lsn, walPosition));
} else {
LOGGER.info(
"No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
walPosition = new WalPositionLocator();
replicationStream.compareAndSet(
null, replicationConnection.startStreaming(walPosition));
}
// for large dbs, the refresh of schema can take too much time
// such that the connection times out. We must enable keep
// alive to ensure that it doesn't time out
ReplicationStream stream = this.replicationStream.get();
stream.startKeepAlive(
Threads.newSingleThreadExecutor(
PostgresConnector.class,
connectorConfig.getLogicalName(),
KEEP_ALIVE_THREAD_NAME));
init();
// If we need to do a pre-snapshot streaming catch up, we should allow the snapshot
// transaction to persist
// but normally we want to start streaming without any open transactions.
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
connection.commit();
}
this.lastCompletelyProcessedLsn = replicationStream.get().startLsn();
if (walPosition.searchingEnabled()) {
searchWalPosition(context, stream, walPosition);
try {
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
connection.commit();
}
} catch (Exception e) {
LOGGER.info("Commit failed while preparing for reconnect", e);
}
walPosition.enableFiltering();
stream.stopKeepAlive();
replicationConnection.reconnect();
replicationStream.set(
replicationConnection.startStreaming(
walPosition.getLastEventStoredLsn(), walPosition));
stream = this.replicationStream.get();
stream.startKeepAlive(
Threads.newSingleThreadExecutor(
PostgresConnector.class,
connectorConfig.getLogicalName(),
KEEP_ALIVE_THREAD_NAME));
}
processMessages(context, partition, offsetContext, stream);
} catch (Throwable e) {
errorHandler.setProducerThrowable(e);
} finally {
if (replicationConnection != null) {
LOGGER.debug("stopping streaming...");
// stop the keep alive thread, this also shuts down the
// executor pool
ReplicationStream stream = replicationStream.get();
if (stream != null) {
stream.stopKeepAlive();
}
// TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the
// stream, but it's not reliable atm (see javadoc)
// replicationStream.close();
// close the connection - this should also disconnect the current stream even if
// it's blocking
try {
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
connection.commit();
}
replicationConnection.close();
} catch (Exception e) {
LOGGER.debug("Exception while closing the connection", e);
}
replicationStream.set(null);
}
}
}
private void processMessages(
ChangeEventSourceContext context,
PostgresPartition partition,
PostgresOffsetContext offsetContext,
final ReplicationStream stream)
throws SQLException, InterruptedException {
LOGGER.info("Processing messages");
int noMessageIterations = 0;
while (context.isRunning()
&& (offsetContext.getStreamingStoppingLsn() == null
|| (lastCompletelyProcessedLsn.compareTo(
offsetContext.getStreamingStoppingLsn())
< 0))) {
boolean receivedMessage =
stream.readPending(
message -> {
final Lsn lsn = stream.lastReceivedLsn();
if (message.isLastEventForLsn()) {
lastCompletelyProcessedLsn = lsn;
}
// Tx BEGIN/END event
if (message.isTransactionalMessage()) {
if (!connectorConfig.shouldProvideTransactionMetadata()) {
LOGGER.trace("Received transactional message {}", message);
// Don't skip on BEGIN message as it would flush LSN for the
// whole transaction
// too early
if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
}
return;
}
offsetContext.updateWalPosition(
lsn,
lastCompletelyProcessedLsn,
message.getCommitTime(),
toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
null);
if (message.getOperation() == Operation.BEGIN) {
dispatcher.dispatchTransactionStartedEvent(
partition,
toString(message.getTransactionId()),
offsetContext);
} else if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
dispatcher.dispatchTransactionCommittedEvent(
partition, offsetContext);
}
maybeWarnAboutGrowingWalBacklog(true);
} else if (message.getOperation() == Operation.MESSAGE) {
offsetContext.updateWalPosition(
lsn,
lastCompletelyProcessedLsn,
message.getCommitTime(),
toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection));
// non-transactional message that will not be followed by a
// COMMIT message
if (message.isLastEventForLsn()) {
commitMessage(partition, offsetContext, lsn);
}
dispatcher.dispatchLogicalDecodingMessage(
partition,
offsetContext,
clock.currentTimeAsInstant().toEpochMilli(),
(LogicalDecodingMessage) message);
maybeWarnAboutGrowingWalBacklog(true);
}
// DML event
else {
TableId tableId = null;
if (message.getOperation() != Operation.NOOP) {
tableId = PostgresSchema.parse(message.getTable());
Objects.requireNonNull(tableId);
}
offsetContext.updateWalPosition(
lsn,
lastCompletelyProcessedLsn,
message.getCommitTime(),
toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
tableId);
boolean dispatched =
message.getOperation() != Operation.NOOP
&& dispatcher.dispatchDataChangeEvent(
partition,
tableId,
new PostgresChangeRecordEmitter(
partition,
offsetContext,
clock,
connectorConfig,
schema,
connection,
tableId,
message));
maybeWarnAboutGrowingWalBacklog(dispatched);
}
});
probeConnectionIfNeeded();
if (receivedMessage) {
noMessageIterations = 0;
lsnFlushingAllowed = true;
} else {
if (offsetContext.hasCompletelyProcessedPosition()) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
noMessageIterations++;
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
noMessageIterations = 0;
pauseNoMessage.sleepWhen(true);
}
}
if (!isInPreSnapshotCatchUpStreaming(offsetContext)) {
// During catch up streaming, the streaming phase needs to hold a transaction open
// so that
// the phase can stream event up to a specific lsn and the snapshot that occurs
// after the catch up
// streaming will not lose the current view of data. Since we need to hold the
// transaction open
// for the snapshot, this block must not commit during catch up streaming.
connection.commit();
}
}
}
private void searchWalPosition(
ChangeEventSourceContext context,
final ReplicationStream stream,
final WalPositionLocator walPosition)
throws SQLException, InterruptedException {
AtomicReference<Lsn> resumeLsn = new AtomicReference<>();
int noMessageIterations = 0;
LOGGER.info("Searching for WAL resume position");
while (context.isRunning() && resumeLsn.get() == null) {
boolean receivedMessage =
stream.readPending(
message -> {
final Lsn lsn = stream.lastReceivedLsn();
resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null));
});
if (receivedMessage) {
noMessageIterations = 0;
} else {
noMessageIterations++;
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
noMessageIterations = 0;
pauseNoMessage.sleepWhen(true);
}
}
probeConnectionIfNeeded();
}
LOGGER.info("WAL resume position '{}' discovered", resumeLsn.get());
}
private void probeConnectionIfNeeded() throws SQLException {
if (connectionProbeTimer.hasElapsed()) {
connection.prepareQuery("SELECT 1");
connection.commit();
}
}
private void commitMessage(
PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn)
throws SQLException, InterruptedException {
lastCompletelyProcessedLsn = lsn;
offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn);
maybeWarnAboutGrowingWalBacklog(false);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
/**
* If we receive change events but all of them get filtered out, we cannot commit any new offset
* with Apache Kafka. This in turn means no LSN is ever acknowledged with the replication slot,
* causing any ever growing WAL backlog.
*
* <p>This situation typically occurs if there are changes on the database server, (e.g. in an
* excluded database), but none of them is in table.include.list. To prevent this, heartbeats
* can be used, as they will allow us to commit offsets also when not propagating any "real"
* change event.
*
* <p>The purpose of this method is to detect this situation and log a warning every {@link
* #GROWING_WAL_WARNING_LOG_INTERVAL} filtered events.
*
* @param dispatched Whether an event was sent to the broker or not
*/
private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) {
if (dispatched) {
numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
} else {
numberOfEventsSinceLastEventSentOrWalGrowingWarning++;
}
if (numberOfEventsSinceLastEventSentOrWalGrowingWarning > GROWING_WAL_WARNING_LOG_INTERVAL
&& !dispatcher.heartbeatsEnabled()) {
LOGGER.warn(
"Received {} events which were all filtered out, so no offset could be committed. "
+ "This prevents the replication slot from acknowledging the processed WAL offsets, "
+ "causing a growing backlog of non-removeable WAL segments on the database server. "
+ "Consider to either adjust your filter configuration or enable heartbeat events "
+ "(via the {} option) to avoid this situation.",
numberOfEventsSinceLastEventSentOrWalGrowingWarning,
Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME);
numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
}
}
@Override
public void commitOffset(Map<String, ?> offset) {
try {
ReplicationStream replicationStream = this.replicationStream.get();
final Lsn commitLsn =
Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY));
final Lsn changeLsn =
Lsn.valueOf(
(Long)
offset.get(
PostgresOffsetContext
.LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;
if (replicationStream != null && lsn != null) {
if (!lsnFlushingAllowed) {
LOGGER.info(
"Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet",
lsn);
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing LSN to server: {}", lsn);
}
// tell the server the point up to which we've processed data, so it can be free to
// recycle WAL segments
replicationStream.flushLsn(lsn);
} else {
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
}
} catch (SQLException e) {
throw new ConnectException(e);
}
}
/**
* Returns whether the current streaming phase is running a catch up streaming phase that runs
* before a snapshot. This is useful for transaction management.
*
* <p>During pre-snapshot catch up streaming, we open the snapshot transaction early and hold
* the transaction open throughout the pre snapshot catch up streaming phase so that we know
* where to stop streaming and can start the snapshot phase at a consistent location. This is
* opposed the regular streaming, where we do not a lingering open transaction.
*
* @return true if the current streaming phase is performing catch up streaming
*/
private boolean isInPreSnapshotCatchUpStreaming(PostgresOffsetContext offsetContext) {
return offsetContext.getStreamingStoppingLsn() != null;
}
private Long toLong(OptionalLong l) {
return l.isPresent() ? Long.valueOf(l.getAsLong()) : null;
}
private String toString(OptionalLong l) {
return l.isPresent() ? String.valueOf(l.getAsLong()) : null;
}
@FunctionalInterface
public static interface PgConnectionSupplier {
BaseConnection get() throws SQLException;
}
}

@ -0,0 +1,917 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection;
import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.lang.Math.toIntExact;
/**
* Copied from Debezium 1.9.7.
*
* <p>The {@link ReplicationConnection} created from {@code createReplicationStream} will hang when
* the wal logs only contain the keepAliveMessage. Support to set an ending Lsn to stop hanging.
*
* <p>Line 82, 694~695 : add endingPos and its setter.
*
* <p>Line 554~559, 578~583: ReplicationStream from {@code createReplicationStream} will stop when
* endingPos reached.
*/
public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
private final String slotName;
private final String publicationName;
private final RelationalTableFilters tableFilter;
private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
private final PostgresConnectorConfig.LogicalDecoder plugin;
private final boolean dropSlotOnClose;
private final PostgresConnectorConfig connectorConfig;
private final Duration statusUpdateInterval;
private final MessageDecoder messageDecoder;
private final TypeRegistry typeRegistry;
private final Properties streamParams;
private Lsn defaultStartingPos;
private SlotCreationResult slotCreationInfo;
private boolean hasInitedSlot;
private Lsn endingPos;
/**
* Creates a new replication connection with the given params.
*
* @param config the JDBC configuration for the connection; may not be null
* @param slotName the name of the DB slot for logical replication; may not be null
* @param publicationName the name of the DB publication for logical replication; may not be
* null
* @param tableFilter the tables to watch of the DB publication for logical replication; may not
* be null
* @param publicationAutocreateMode the mode for publication autocreation; may not be null
* @param plugin decoder matching the server side plug-in used for streaming changes; may not be
* null
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is
* closed
* @param statusUpdateInterval the interval at which the replication connection should
* periodically send status
* @param doSnapshot whether the connector is doing snapshot
* @param jdbcConnection general POstgreSQL JDBC connection
* @param typeRegistry registry with PostgreSQL types
* @param streamParams additional parameters to pass to the replication stream
* @param schema the schema; must not be null
* <p>updates to the server
*/
private PostgresReplicationConnection(
PostgresConnectorConfig config,
String slotName,
String publicationName,
RelationalTableFilters tableFilter,
PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
PostgresConnectorConfig.LogicalDecoder plugin,
boolean dropSlotOnClose,
boolean doSnapshot,
Duration statusUpdateInterval,
PostgresConnection jdbcConnection,
TypeRegistry typeRegistry,
Properties streamParams,
PostgresSchema schema) {
super(
addDefaultSettings(config.getJdbcConfig()),
PostgresConnection.FACTORY,
null,
null,
"\"",
"\"");
this.connectorConfig = config;
this.slotName = slotName;
this.publicationName = publicationName;
this.tableFilter = tableFilter;
this.publicationAutocreateMode = publicationAutocreateMode;
this.plugin = plugin;
this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateInterval = statusUpdateInterval;
this.messageDecoder =
plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection);
this.typeRegistry = typeRegistry;
this.streamParams = streamParams;
this.slotCreationInfo = null;
this.hasInitedSlot = false;
}
private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
// first copy the parent's default settings...
// then set some additional replication specific settings
return JdbcConfiguration.adapt(
PostgresConnection.addDefaultSettings(
configuration, PostgresConnection.CONNECTION_STREAMING)
.edit()
.with("replication", "database")
.with(
"preferQueryMode",
"simple") // replication protocol only supports simple query mode
.build());
}
private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
try (PostgresConnection connection =
new PostgresConnection(
connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) {
return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName());
}
}
protected void initPublication() {
String createPublicationStmt;
String tableFilterString = null;
if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
LOGGER.info("Initializing PgOutput logical decoder publication");
try {
// Unless the autocommit is disabled the SELECT publication query will stay running
Connection conn = pgConnection();
conn.setAutoCommit(false);
String selectPublication =
String.format(
"SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'",
publicationName);
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(selectPublication)) {
if (rs.next()) {
Long count = rs.getLong(1);
// Close eagerly as the transaction might stay running
if (count == 0L) {
LOGGER.info(
"Creating new publication '{}' for plugin '{}'",
publicationName,
plugin);
switch (publicationAutocreateMode) {
case DISABLED:
throw new ConnectException(
"Publication autocreation is disabled, please create one and restart the connector.");
case ALL_TABLES:
createPublicationStmt =
String.format(
"CREATE PUBLICATION %s FOR ALL TABLES;",
publicationName);
LOGGER.info(
"Creating Publication with statement '{}'",
createPublicationStmt);
// Publication doesn't exist, create it.
stmt.execute(createPublicationStmt);
break;
case FILTERED:
try {
Set<TableId> tablesToCapture = determineCapturedTables();
tableFilterString =
tablesToCapture.stream()
.map(TableId::toDoubleQuotedString)
.collect(Collectors.joining(", "));
if (tableFilterString.isEmpty()) {
throw new DebeziumException(
String.format(
"No table filters found for filtered publication %s",
publicationName));
}
createPublicationStmt =
String.format(
"CREATE PUBLICATION %s FOR TABLE %s;",
publicationName, tableFilterString);
LOGGER.info(
"Creating Publication with statement '{}'",
createPublicationStmt);
// Publication doesn't exist, create it but restrict to the
// tableFilter.
stmt.execute(createPublicationStmt);
} catch (Exception e) {
throw new ConnectException(
String.format(
"Unable to create filtered publication %s for %s",
publicationName, tableFilterString),
e);
}
break;
}
} else {
LOGGER.trace(
"A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server "
+ "and will be used by the plugin",
publicationName,
plugin,
database());
}
}
}
conn.commit();
conn.setAutoCommit(true);
} catch (SQLException e) {
throw new JdbcConnectionException(e);
}
}
}
private Set<TableId> determineCapturedTables() throws Exception {
Set<TableId> allTableIds =
this.connect()
.readTableNames(
pgConnection().getCatalog(), null, null, new String[] {"TABLE"});
Set<TableId> capturedTables = new HashSet<>();
for (TableId tableId : allTableIds) {
if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
LOGGER.trace("Adding table {} to the list of captured tables", tableId);
capturedTables.add(tableId);
} else {
LOGGER.trace(
"Ignoring table {} as it's not included in the filter configuration",
tableId);
}
}
return capturedTables.stream()
.sorted()
.collect(Collectors.toCollection(LinkedHashSet::new));
}
protected void initReplicationSlot() throws SQLException, InterruptedException {
ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
try {
// there's no info for this plugin and slot so create a new slot
if (shouldCreateSlot) {
this.createReplicationSlot();
}
// replication connection does not support parsing of SQL statements so we need to
// create
// the connection without executing on connect statements - see JDBC opt
// preferQueryMode=simple
pgConnection();
final String identifySystemStatement = "IDENTIFY_SYSTEM";
LOGGER.debug(
"running '{}' to validate replication connection", identifySystemStatement);
final Lsn xlogStart =
queryAndMap(
identifySystemStatement,
rs -> {
if (!rs.next()) {
throw new IllegalStateException(
"The DB connection is not a valid replication connection");
}
String xlogpos = rs.getString("xlogpos");
LOGGER.debug("received latest xlogpos '{}'", xlogpos);
return Lsn.valueOf(xlogpos);
});
if (slotCreationInfo != null) {
this.defaultStartingPos = slotCreationInfo.startLsn();
} else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
// this is a new slot or we weren't able to read a valid flush LSN pos, so we always
// start from the xlog pos that was reported
this.defaultStartingPos = xlogStart;
} else {
Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
this.defaultStartingPos =
latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn);
}
}
hasInitedSlot = true;
} catch (SQLException e) {
throw new JdbcConnectionException(e);
}
}
// Temporary replication slots is a new feature of PostgreSQL 10
private boolean useTemporarySlot() throws SQLException {
// Temporary replication slots cannot be used due to connection restart
// when finding WAL position
// return dropSlotOnClose && pgConnection().haveMinimumServerVersion(ServerVersion.v10);
return false;
}
/**
* creating a replication connection and starting to stream involves a few steps: 1. we create
* the connection and ensure that a. the slot exists b. the slot isn't currently being used 2.
* we query to get our potential start position in the slot (lsn) 3. we try and start streaming,
* depending on our options (such as in wal2json) this may fail, which can result in the
* connection being killed and we need to start the process over if we are using a temporary
* slot 4. actually start the streamer
*
* <p>This method takes care of all of these and this method queries for a default starting
* position If you know where you are starting from you should call {@link #startStreaming(Lsn,
* WalPositionLocator)}, this method delegates to that method
*
* @return
* @throws SQLException
* @throws InterruptedException
*/
@Override
public ReplicationStream startStreaming(WalPositionLocator walPosition)
throws SQLException, InterruptedException {
return startStreaming(null, walPosition);
}
@Override
public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition)
throws SQLException, InterruptedException {
initConnection();
connect();
if (offset == null || !offset.isValid()) {
offset = defaultStartingPos;
}
Lsn lsn = offset;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("starting streaming from LSN '{}'", lsn);
}
final int maxRetries = connectorConfig.maxRetries();
final Duration delay = connectorConfig.retryDelay();
int tryCount = 0;
while (true) {
try {
return createReplicationStream(lsn, walPosition);
} catch (Exception e) {
String message = "Failed to start replication stream at " + lsn;
if (++tryCount > maxRetries) {
if (e.getMessage().matches(".*replication slot .* is active.*")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
}
throw new DebeziumException(message, e);
} else {
LOGGER.warn(
message + ", waiting for {} ms and retrying, attempt number {} over {}",
delay,
tryCount,
maxRetries);
final Metronome metronome = Metronome.sleeper(delay, Clock.SYSTEM);
metronome.pause();
}
}
}
}
@Override
public void initConnection() throws SQLException, InterruptedException {
// See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
// For pgoutput specifically, the publication must be created before the slot.
initPublication();
if (!hasInitedSlot) {
initReplicationSlot();
}
}
@Override
public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
// note that some of these options are only supported in Postgres 9.4+, additionally
// the options are not yet exported by the jdbc api wrapper, therefore, we just do
// this ourselves but eventually this should be moved back to the jdbc API
// see https://github.com/pgjdbc/pgjdbc/issues/1305
LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
String tempPart = "";
// Exported snapshots are supported in Postgres 9.4+
boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
if ((dropSlotOnClose) && !canExportSnapshot) {
LOGGER.warn(
"A slot marked as temporary or with an exported snapshot was created, "
+ "but not on a supported version of Postgres, ignoring!");
}
if (useTemporarySlot()) {
tempPart = "TEMPORARY";
}
// See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
// For pgoutput specifically, the publication must be created prior to the slot.
initPublication();
try (Statement stmt = pgConnection().createStatement()) {
String createCommand =
String.format(
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
slotName, tempPart, plugin.getPostgresPluginName());
LOGGER.info("Creating replication slot with command {}", createCommand);
stmt.execute(createCommand);
// when we are in Postgres 9.4+, we can parse the slot creation info,
// otherwise, it returns nothing
if (canExportSnapshot) {
this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
}
return Optional.ofNullable(slotCreationInfo);
}
}
protected BaseConnection pgConnection() throws SQLException {
return (BaseConnection) connection(false);
}
private SlotCreationResult parseSlotCreation(ResultSet rs) {
try {
if (rs.next()) {
String slotName = rs.getString("slot_name");
String startPoint = rs.getString("consistent_point");
String snapName = rs.getString("snapshot_name");
String pluginName = rs.getString("output_plugin");
return new SlotCreationResult(slotName, startPoint, snapName, pluginName);
} else {
throw new ConnectException("No replication slot found");
}
} catch (SQLException ex) {
throw new ConnectException("Unable to parse create_replication_slot response", ex);
}
}
private ReplicationStream createReplicationStream(
final Lsn startLsn, WalPositionLocator walPosition)
throws SQLException, InterruptedException {
PGReplicationStream s;
try {
try {
s =
startPgReplicationStream(
startLsn,
plugin.forceRds()
? messageDecoder::optionsWithoutMetadata
: messageDecoder::optionsWithMetadata);
messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
} catch (PSQLException e) {
LOGGER.debug(
"Could not register for streaming, retrying without optional options", e);
// re-init the slot after a failed start of slot, as this
// may have closed the slot
if (useTemporarySlot()) {
initReplicationSlot();
}
s =
startPgReplicationStream(
startLsn,
plugin.forceRds()
? messageDecoder::optionsWithoutMetadata
: messageDecoder::optionsWithMetadata);
messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
}
} catch (PSQLException e) {
if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
// It is possible we are connecting to an old wal2json plug-in
LOGGER.warn(
"Could not register for streaming with metadata in messages, falling back to messages without metadata");
// re-init the slot after a failed start of slot, as this
// may have closed the slot
if (useTemporarySlot()) {
initReplicationSlot();
}
s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata);
messageDecoder.setContainsMetadata(false);
} else if (e.getMessage()
.matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
LOGGER.error("Cannot rewind to last processed WAL position", e);
throw new ConnectException(
"The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
} else {
throw e;
}
}
final PGReplicationStream stream = s;
return new ReplicationStream() {
private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
private ExecutorService keepAliveExecutor = null;
private AtomicBoolean keepAliveRunning;
private final Metronome metronome =
Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
// make sure this is volatile since multiple threads may be interested in this value
private volatile Lsn lastReceivedLsn;
@Override
public void read(ReplicationMessageProcessor processor)
throws SQLException, InterruptedException {
processWarnings(false);
ByteBuffer read = stream.read();
final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace(
"Streaming requested from LSN {}, received LSN {}",
startLsn,
lastReceiveLsn);
if (reachEnd(lastReceivedLsn)) {
lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
processor.process(new ReplicationMessage.NoopMessage(null, null));
return;
}
if (messageDecoder.shouldMessageBeSkipped(
read, lastReceiveLsn, startLsn, walPosition)) {
return;
}
deserializeMessages(read, processor);
}
@Override
public boolean readPending(ReplicationMessageProcessor processor)
throws SQLException, InterruptedException {
processWarnings(false);
ByteBuffer read = stream.readPending();
final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace(
"Streaming requested from LSN {}, received LSN {}",
startLsn,
lastReceiveLsn);
if (reachEnd(lastReceiveLsn)) {
lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
processor.process(new ReplicationMessage.NoopMessage(null, null));
return true;
}
if (read == null) {
return false;
}
if (messageDecoder.shouldMessageBeSkipped(
read, lastReceiveLsn, startLsn, walPosition)) {
return true;
}
deserializeMessages(read, processor);
return true;
}
private void deserializeMessages(
ByteBuffer buffer, ReplicationMessageProcessor processor)
throws SQLException, InterruptedException {
lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
messageDecoder.processMessage(buffer, processor, typeRegistry);
}
@Override
public void close() throws SQLException {
processWarnings(true);
stream.close();
}
@Override
public void flushLsn(Lsn lsn) throws SQLException {
doFlushLsn(lsn);
}
private void doFlushLsn(Lsn lsn) throws SQLException {
stream.setFlushedLSN(lsn.asLogSequenceNumber());
stream.setAppliedLSN(lsn.asLogSequenceNumber());
stream.forceUpdateStatus();
}
@Override
public Lsn lastReceivedLsn() {
return lastReceivedLsn;
}
@Override
public void startKeepAlive(ExecutorService service) {
if (keepAliveExecutor == null) {
keepAliveExecutor = service;
keepAliveRunning = new AtomicBoolean(true);
keepAliveExecutor.submit(
() -> {
while (keepAliveRunning.get()) {
try {
LOGGER.trace(
"Forcing status update with replication stream");
stream.forceUpdateStatus();
metronome.pause();
} catch (Exception exp) {
throw new RuntimeException(
"received unexpected exception will perform keep alive",
exp);
}
}
});
}
}
@Override
public void stopKeepAlive() {
if (keepAliveExecutor != null) {
keepAliveRunning.set(false);
keepAliveExecutor.shutdownNow();
keepAliveExecutor = null;
}
}
private void processWarnings(final boolean forced) throws SQLException {
if (--warningCheckCounter == 0 || forced) {
warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
for (SQLWarning w = connection().getWarnings();
w != null;
w = w.getNextWarning()) {
LOGGER.debug(
"Server-side message: '{}', state = {}, code = {}",
w.getMessage(),
w.getSQLState(),
w.getErrorCode());
}
connection().clearWarnings();
}
}
@Override
public Lsn startLsn() {
return startLsn;
}
private boolean reachEnd(Lsn receivedLsn) {
if (receivedLsn == null) {
return false;
}
return endingPos != null
&& (!endingPos.isNonStopping())
&& endingPos.compareTo(receivedLsn) < 0;
}
};
}
public void setEndingPos(Lsn endingPos) {
this.endingPos = endingPos;
}
private PGReplicationStream startPgReplicationStream(
final Lsn lsn,
BiFunction<
ChainedLogicalStreamBuilder,
Function<Integer, Boolean>,
ChainedLogicalStreamBuilder>
configurator)
throws SQLException {
assert lsn != null;
ChainedLogicalStreamBuilder streamBuilder =
pgConnection()
.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName("\"" + slotName + "\"")
.withStartPosition(lsn.asLogSequenceNumber())
.withSlotOptions(streamParams);
streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion);
if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) {
streamBuilder.withStatusInterval(
toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
}
PGReplicationStream stream = streamBuilder.start();
// TODO DBZ-508 get rid of this
// Needed by tests when connections are opened and closed in a fast sequence
try {
Thread.sleep(10);
} catch (Exception e) {
}
stream.forceUpdateStatus();
return stream;
}
private Boolean hasMinimumVersion(int version) {
try {
return pgConnection().haveMinimumServerVersion(version);
} catch (SQLException e) {
throw new DebeziumException(e);
}
}
@Override
public synchronized void close() {
close(true);
}
public synchronized void close(boolean dropSlot) {
try {
LOGGER.debug("Closing message decoder");
messageDecoder.close();
} catch (Throwable e) {
LOGGER.error("Unexpected error while closing message decoder", e);
}
try {
LOGGER.debug("Closing replication connection");
super.close();
} catch (Throwable e) {
LOGGER.error("Unexpected error while closing Postgres connection", e);
}
if (dropSlotOnClose && dropSlot) {
// we're dropping the replication slot via a regular - i.e. not a replication -
// connection
try (PostgresConnection connection =
new PostgresConnection(
connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_DROP_SLOT)) {
connection.dropReplicationSlot(slotName);
} catch (Throwable e) {
LOGGER.error("Unexpected error while dropping replication slot", e);
}
}
}
@Override
public void reconnect() throws SQLException {
close(false);
// Don't re-execute initial commands on reconnection
connection(false);
}
protected static class ReplicationConnectionBuilder implements Builder {
private final PostgresConnectorConfig config;
private String slotName = DEFAULT_SLOT_NAME;
private String publicationName = DEFAULT_PUBLICATION_NAME;
private RelationalTableFilters tableFilter;
private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode =
PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
private PostgresConnectorConfig.LogicalDecoder plugin =
PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Duration statusUpdateIntervalVal;
private boolean doSnapshot;
private TypeRegistry typeRegistry;
private PostgresSchema schema;
private Properties slotStreamParams = new Properties();
private PostgresConnection jdbcConnection;
protected ReplicationConnectionBuilder(PostgresConnectorConfig config) {
assert config != null;
this.config = config;
}
@Override
public ReplicationConnectionBuilder withSlot(final String slotName) {
assert slotName != null;
this.slotName = slotName;
return this;
}
@Override
public Builder withPublication(String publicationName) {
assert publicationName != null;
this.publicationName = publicationName;
return this;
}
@Override
public Builder withTableFilter(RelationalTableFilters tableFilter) {
assert tableFilter != null;
this.tableFilter = tableFilter;
return this;
}
@Override
public Builder withPublicationAutocreateMode(
PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
assert publicationName != null;
this.publicationAutocreateMode = publicationAutocreateMode;
return this;
}
@Override
public ReplicationConnectionBuilder withPlugin(
final PostgresConnectorConfig.LogicalDecoder plugin) {
assert plugin != null;
this.plugin = plugin;
return this;
}
@Override
public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) {
this.dropSlotOnClose = dropSlotOnClose;
return this;
}
@Override
public ReplicationConnectionBuilder streamParams(final String slotStreamParams) {
if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
this.slotStreamParams = new Properties();
String[] paramsWithValues = slotStreamParams.split(";");
for (String paramsWithValue : paramsWithValues) {
String[] paramAndValue = paramsWithValue.split("=");
if (paramAndValue.length == 2) {
this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]);
} else {
LOGGER.warn(
"The following STREAM_PARAMS value is invalid: {}",
paramsWithValue);
}
}
}
return this;
}
@Override
public ReplicationConnectionBuilder statusUpdateInterval(
final Duration statusUpdateInterval) {
this.statusUpdateIntervalVal = statusUpdateInterval;
return this;
}
@Override
public Builder doSnapshot(boolean doSnapshot) {
this.doSnapshot = doSnapshot;
return this;
}
@Override
public Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) {
this.jdbcConnection = jdbcConnection;
return this;
}
@Override
public ReplicationConnection build() {
assert plugin != null : "Decoding plugin name is not set";
return new PostgresReplicationConnection(
config,
slotName,
publicationName,
tableFilter,
publicationAutocreateMode,
plugin,
dropSlotOnClose,
doSnapshot,
statusUpdateIntervalVal,
jdbcConnection,
typeRegistry,
slotStreamParams,
schema);
}
@Override
public Builder withTypeRegistry(TypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
return this;
}
@Override
public Builder withSchema(PostgresSchema schema) {
this.schema = schema;
return this;
}
}
}

@ -0,0 +1,922 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.jayway.jsonpath.JsonPath;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertRead;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertUpdate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */
public class PostgresSQLSourceTest extends PostgresTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PostgresSQLSourceTest.class);
private static final String SLOT_NAME = "flink";
// These tests only passes at the docker postgres:9.6
private static final PostgreSQLContainer<?> POSTGRES_CONTAINER_OLD =
new PostgreSQLContainer<>(
DockerImageName.parse("debezium/postgres:9.6")
.asCompatibleSubstituteFor("postgres"))
.withDatabaseName(DEFAULT_DB)
.withUsername("postgres")
.withPassword("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOG));
@BeforeClass
public static void startAll() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(POSTGRES_CONTAINER_OLD)).join();
LOG.info("Containers are started.");
}
@Before
public void before() {
initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory");
}
@Test
public void testConsumingAllEvents() throws Exception {
DebeziumSourceFunction<SourceRecord> source = createPostgreSqlSourceWithHeartbeatDisabled();
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
setupSource(source);
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
// start the source
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
List<SourceRecord> records = drain(sourceContext, 9);
assertEquals(9, records.size());
for (int i = 0; i < records.size(); i++) {
assertRead(records.get(i), "id", 101 + i);
}
statement.execute(
"INSERT INTO inventory.products VALUES (default,'robot','Toy robot',1.304)"); // 110
records = drain(sourceContext, 1);
assertInsert(records.get(0), "id", 110);
statement.execute(
"INSERT INTO inventory.products VALUES (1001,'roy','old robot',1234.56)"); // 1001
records = drain(sourceContext, 1);
assertInsert(records.get(0), "id", 1001);
// ---------------------------------------------------------------------------------------------------------------
// Changing the primary key of a row should result in 2 events: INSERT, DELETE
// (TOMBSTONE is dropped)
// ---------------------------------------------------------------------------------------------------------------
statement.execute(
"UPDATE inventory.products SET id=2001, description='really old robot' WHERE id=1001");
records = drain(sourceContext, 2);
assertDelete(records.get(0), "id", 1001);
assertInsert(records.get(1), "id", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Simple UPDATE (with no schema changes)
// ---------------------------------------------------------------------------------------------------------------
statement.execute("UPDATE inventory.products SET weight=1345.67 WHERE id=2001");
records = drain(sourceContext, 1);
assertUpdate(records.get(0), "id", 2001);
// ---------------------------------------------------------------------------------------------------------------
// Change our schema with a fully-qualified name; we should still see this event
// ---------------------------------------------------------------------------------------------------------------
// Add a column with default to the 'products' table and explicitly update one record
// ...
statement.execute(
"ALTER TABLE inventory.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL");
statement.execute("UPDATE inventory.products SET volume=13.5 WHERE id=2001");
records = drain(sourceContext, 1);
assertUpdate(records.get(0), "id", 2001);
// cleanup
source.cancel();
source.close();
runThread.sync();
}
}
@Test
public void testCheckpointAndRestore() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
int prevLsn = 0;
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source =
createPostgreSqlSourceWithHeartbeatDisabled();
// we use blocking context to block the source to emit before last snapshot record
final BlockingSourceContext<SourceRecord> sourceContext =
new BlockingSourceContext<>(8);
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until consumer is started
int received = drain(sourceContext, 2).size();
assertEquals(2, received);
// we can't perform checkpoint during DB snapshot
assertFalse(
waitForCheckpointLock(
sourceContext.getCheckpointLock(), Duration.ofSeconds(3)));
// unblock the source context to continue the processing
sourceContext.blocker.release();
// wait until the source finishes the database snapshot
List<SourceRecord> records = drain(sourceContext, 9 - received);
assertEquals(9, records.size() + received);
// state is still empty
assertEquals(0, offsetState.list.size());
assertEquals(0, historyState.list.size());
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1 after snapshot finished
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("557", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertEquals(
"true", JsonPath.read(state, "$.sourceOffset.last_snapshot_record").toString());
assertEquals("true", JsonPath.read(state, "$.sourceOffset.snapshot").toString());
assertTrue(state.contains("ts_usec"));
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
assertTrue(lsn > prevLsn);
prevLsn = lsn;
source.cancel();
source.close();
runThread.sync();
}
{
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 =
createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread2 =
new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread2.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products VALUES (default,'robot','Toy robot',1.304)"); // 110
List<SourceRecord> records = drain(sourceContext2, 1);
assertEquals(1, records.size());
assertInsert(records.get(0), "id", 110);
// ---------------------------------------------------------------------------
// Step-4: trigger checkpoint-2 during DML operations
// ---------------------------------------------------------------------------
synchronized (sourceContext2.getCheckpointLock()) {
// trigger checkpoint-1
source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals(
"postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("558", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot"));
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
assertTrue(lsn > prevLsn);
prevLsn = lsn;
// execute 2 more DMLs to have more binlog
statement.execute(
"INSERT INTO inventory.products VALUES (1001,'roy','old robot',1234.56)"); // 1001
statement.execute("UPDATE inventory.products SET weight=1345.67 WHERE id=1001");
}
// cancel the source
source2.cancel();
source2.close();
runThread2.sync();
}
{
// ---------------------------------------------------------------------------
// Step-5: restore the source from checkpoint-2
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source3 =
createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext3 = new TestSourceContext<>();
setupSource(source3, true, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread3 =
new CheckedThread() {
@Override
public void go() throws Exception {
source3.run(sourceContext3);
}
};
runThread3.start();
// consume the unconsumed binlog
List<SourceRecord> records = drain(sourceContext3, 2);
assertInsert(records.get(0), "id", 1001);
assertUpdate(records.get(1), "id", 1001);
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3));
// can continue to receive new events
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
statement.execute("DELETE FROM inventory.products WHERE id=1001");
}
records = drain(sourceContext3, 1);
assertDelete(records.get(0), "id", 1001);
// ---------------------------------------------------------------------------
// Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints
// ---------------------------------------------------------------------------
synchronized (sourceContext3.getCheckpointLock()) {
// checkpoint 3
source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot"));
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
assertTrue(lsn > prevLsn);
source3.cancel();
source3.close();
runThread3.sync();
}
{
// ---------------------------------------------------------------------------
// Step-7: restore the source from checkpoint-3
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source4 =
createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext4 = new TestSourceContext<>();
setupSource(source4, true, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread4 =
new CheckedThread() {
@Override
public void go() throws Exception {
source4.run(sourceContext4);
}
};
runThread4.start();
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext4));
// ---------------------------------------------------------------------------
// Step-8: trigger checkpoint-2 to make sure we can continue to to further checkpoints
// ---------------------------------------------------------------------------
synchronized (sourceContext4.getCheckpointLock()) {
// checkpoint 3
source4.snapshotState(new StateSnapshotContextSynchronousImpl(254, 254));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot"));
int lsn = JsonPath.read(state, "$.sourceOffset.lsn");
assertTrue(lsn > prevLsn);
prevLsn = lsn;
source4.cancel();
source4.close();
runThread4.sync();
}
{
// ---------------------------------------------------------------------------
// Step-9: insert partial and alter table
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source5 =
createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext5 = new TestSourceContext<>();
setupSource(source5, true, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread5 =
new CheckedThread() {
@Override
public void go() throws Exception {
source5.run(sourceContext5);
}
};
runThread5.start();
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products(id, description, weight) VALUES (default, 'Go go go', 111.1)");
statement.execute(
"ALTER TABLE inventory.products ADD comment_col VARCHAR(100) DEFAULT 'cdc'");
List<SourceRecord> records = drain(sourceContext5, 1);
assertInsert(records.get(0), "id", 111);
}
// ---------------------------------------------------------------------------
// Step-10: trigger checkpoint-4
// ---------------------------------------------------------------------------
synchronized (sourceContext5.getCheckpointLock()) {
// trigger checkpoint-4
source5.snapshotState(new StateSnapshotContextSynchronousImpl(300, 300));
}
assertEquals(1, offsetState.list.size());
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server"));
assertEquals("562", JsonPath.read(state, "$.sourceOffset.txId").toString());
assertTrue(state.contains("ts_usec"));
assertFalse(state.contains("snapshot"));
int pos = JsonPath.read(state, "$.sourceOffset.lsn");
assertTrue(pos > prevLsn);
source5.cancel();
source5.close();
runThread5.sync();
}
{
// ---------------------------------------------------------------------------
// Step-11: restore from the checkpoint-4 and insert the partial value
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source6 =
createPostgreSqlSourceWithHeartbeatDisabled();
final TestSourceContext<SourceRecord> sourceContext6 = new TestSourceContext<>();
setupSource(source6, true, offsetState, historyState, true, 0, 1);
// restart the source
final CheckedThread runThread6 =
new CheckedThread() {
@Override
public void go() throws Exception {
source6.run(sourceContext6);
}
};
runThread6.start();
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products(id, description, weight) VALUES (default, 'Run!', 22.2)");
List<SourceRecord> records = drain(sourceContext6, 1);
assertInsert(records.get(0), "id", 112);
}
source6.cancel();
source6.close();
runThread6.sync();
}
}
@Test
public void testFlushLsn() throws Exception {
final TestingListState<byte[]> offsetState = new TestingListState<>();
final TestingListState<String> historyState = new TestingListState<>();
final LinkedHashSet<String> flushLsn = new LinkedHashSet<>();
{
// ---------------------------------------------------------------------------
// Step-1: start the source from empty state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source =
createPostgreSqlSourceWithHeartbeatEnabled();
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
// setup source with empty state
setupSource(source, false, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source.run(sourceContext);
}
};
runThread.start();
// wait until consumer is started
int received = drain(sourceContext, 9).size();
assertEquals(9, received);
// ---------------------------------------------------------------------------
// Step-2: trigger checkpoint-1 after snapshot finished
// ---------------------------------------------------------------------------
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
}
source.notifyCheckpointComplete(101);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(5, source, sourceContext, 201);
assertEquals(1, source.getPendingOffsetsToCommit().size());
source.notifyCheckpointComplete(201);
assertEquals(0, source.getPendingOffsetsToCommit().size());
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(1, source, sourceContext, 301);
// do not notify checkpoint complete to see the LSN is not advanced.
assertFalse(flushLsn.add(getConfirmedFlushLsn()));
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext));
source.cancel();
source.close();
runThread.sync();
}
{
// ---------------------------------------------------------------------------
// Step-3: restore the source from state
// ---------------------------------------------------------------------------
final DebeziumSourceFunction<SourceRecord> source2 =
createPostgreSqlSourceWithHeartbeatEnabled();
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
// setup source with empty state
setupSource(source2, true, offsetState, historyState, true, 0, 1);
final CheckedThread runThread =
new CheckedThread() {
@Override
public void go() throws Exception {
source2.run(sourceContext2);
}
};
runThread.start();
assertFalse(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(0, source2, sourceContext2, 401);
Thread.sleep(3_000); // waiting heartbeat events, we have set 1s heartbeat interval
// trigger checkpoint once again to make sure ChangeConsumer is initialized
batchInsertAndCheckpoint(0, source2, sourceContext2, 402);
source2.notifyCheckpointComplete(402);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
// verify LSN is advanced even if there is no changes on the table
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
// we have to do some transactions which is not related to the monitored table
statement.execute("CREATE TABLE dummy (a int)");
}
Thread.sleep(3_000);
batchInsertAndCheckpoint(0, source2, sourceContext2, 404);
source2.notifyCheckpointComplete(404);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
batchInsertAndCheckpoint(3, source2, sourceContext2, 501);
batchInsertAndCheckpoint(2, source2, sourceContext2, 502);
batchInsertAndCheckpoint(1, source2, sourceContext2, 503);
assertEquals(3, source2.getPendingOffsetsToCommit().size());
source2.notifyCheckpointComplete(503);
assertTrue(flushLsn.add(getConfirmedFlushLsn()));
assertEquals(0, source2.getPendingOffsetsToCommit().size());
// make sure there is no more events
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext2));
source2.cancel();
source2.close();
runThread.sync();
}
assertEquals(5, flushLsn.size());
}
private void batchInsertAndCheckpoint(
int num,
DebeziumSourceFunction<SourceRecord> source,
TestSourceContext<SourceRecord> sourceContext,
long checkpointId)
throws Exception {
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
for (int i = 0; i < num; i++) {
statement.execute(
"INSERT INTO inventory.products VALUES (default,'dummy','My Dummy',1.1)");
}
}
assertEquals(num, drain(sourceContext, num).size());
synchronized (sourceContext.getCheckpointLock()) {
// trigger checkpoint-1
source.snapshotState(
new StateSnapshotContextSynchronousImpl(checkpointId, checkpointId));
}
}
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatDisabled() {
return createPostgreSqlSource(0);
}
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSourceWithHeartbeatEnabled() {
return createPostgreSqlSource(1000);
}
private DebeziumSourceFunction<SourceRecord> createPostgreSqlSource(int heartbeatInterval) {
Properties properties = new Properties();
properties.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval));
return PostgreSQLSource.<SourceRecord>builder()
.hostname(POSTGRES_CONTAINER_OLD.getHost())
.port(POSTGRES_CONTAINER_OLD.getMappedPort(POSTGRESQL_PORT))
.database(POSTGRES_CONTAINER_OLD.getDatabaseName())
.username(POSTGRES_CONTAINER_OLD.getUsername())
.password(POSTGRES_CONTAINER_OLD.getPassword())
.schemaList("inventory")
.tableList("inventory.products")
.deserializer(new ForwardDeserializeSchema())
.slotName(SLOT_NAME)
.debeziumProperties(properties)
.build();
}
private String getConfirmedFlushLsn() throws SQLException {
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD);
Statement statement = connection.createStatement()) {
ResultSet rs =
statement.executeQuery(
String.format(
"select * from pg_replication_slots where slot_name = '%s' and database = '%s' and plugin = '%s'",
SLOT_NAME,
POSTGRES_CONTAINER_OLD.getDatabaseName(),
"decoderbufs"));
if (rs.next()) {
return rs.getString("confirmed_flush_lsn");
} else {
fail("No replication slot info available");
}
return null;
}
}
private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount)
throws Exception {
List<T> allRecords = new ArrayList<>();
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
while (allRecords.size() < expectedRecordCount) {
StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
if (record != null) {
allRecords.add(record.getValue());
} else {
throw new RuntimeException(
"Can't receive " + expectedRecordCount + " elements before timeout.");
}
}
return allRecords;
}
private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout)
throws Exception {
final Semaphore semaphore = new Semaphore(0);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(
() -> {
synchronized (checkpointLock) {
semaphore.release();
}
});
boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
executor.shutdownNow();
return result;
}
/**
* Wait for a maximum amount of time until the first record is available.
*
* @param timeout the maximum amount of time to wait; must not be negative
* @return {@code true} if records are available, or {@code false} if the timeout occurred and
* no records are available
*/
private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext)
throws InterruptedException {
long now = System.currentTimeMillis();
long stop = now + timeout.toMillis();
while (System.currentTimeMillis() < stop) {
if (!sourceContext.getCollectedOutputs().isEmpty()) {
break;
}
Thread.sleep(10); // save CPU
}
return !sourceContext.getCollectedOutputs().isEmpty();
}
private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
setupSource(
source, false, null, null,
true, // enable checkpointing; auto commit should be ignored
0, 1);
}
private static <T, S1, S2> void setupSource(
DebeziumSourceFunction<T> source,
boolean isRestored,
ListState<S1> restoredOffsetState,
ListState<S2> restoredHistoryState,
boolean isCheckpointingEnabled,
int subtaskIndex,
int totalNumSubtasks)
throws Exception {
// run setup procedure in operator life cycle
source.setRuntimeContext(
new MockStreamingRuntimeContext(
isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
source.initializeState(
new MockFunctionInitializationContext(
isRestored,
new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
source.open(new Configuration());
}
private static class ForwardDeserializeSchema
implements DebeziumDeserializationSchema<SourceRecord> {
private static final long serialVersionUID = 2975058057832211228L;
@Override
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
out.collect(record);
}
@Override
public TypeInformation<SourceRecord> getProducedType() {
return TypeInformation.of(SourceRecord.class);
}
}
private static class MockOperatorStateStore implements OperatorStateStore {
private final ListState<?> restoredOffsetListState;
private final ListState<?> restoredHistoryListState;
private MockOperatorStateStore(
ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
this.restoredOffsetListState = restoredOffsetListState;
this.restoredHistoryListState = restoredHistoryListState;
}
@Override
@SuppressWarnings("unchecked")
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) {
return (ListState<S>) restoredOffsetListState;
} else if (stateDescriptor
.getName()
.equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) {
return (ListState<S>) restoredHistoryListState;
} else {
throw new IllegalStateException("Unknown state.");
}
}
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)
throws Exception {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredStateNames() {
throw new UnsupportedOperationException();
}
@Override
public Set<String> getRegisteredBroadcastStateNames() {
throw new UnsupportedOperationException();
}
}
private static class MockFunctionInitializationContext
implements FunctionInitializationContext {
private final boolean isRestored;
private final OperatorStateStore operatorStateStore;
private MockFunctionInitializationContext(
boolean isRestored, OperatorStateStore operatorStateStore) {
this.isRestored = isRestored;
this.operatorStateStore = operatorStateStore;
}
@Override
public boolean isRestored() {
return isRestored;
}
@Override
public OptionalLong getRestoredCheckpointId() {
throw new UnsupportedOperationException();
}
@Override
public OperatorStateStore getOperatorStateStore() {
return operatorStateStore;
}
@Override
public KeyedStateStore getKeyedStateStore() {
throw new UnsupportedOperationException();
}
}
private static class BlockingSourceContext<T> extends TestSourceContext<T> {
private final Semaphore blocker = new Semaphore(0);
private final int expectedCount;
private int currentCount = 0;
private BlockingSourceContext(int expectedCount) {
this.expectedCount = expectedCount;
}
@Override
public void collect(T t) {
super.collect(t);
currentCount++;
if (currentCount == expectedCount) {
try {
// block the source to emit records
blocker.acquire();
} catch (InterruptedException e) {
// ignore
}
}
}
}
private static final class TestingListState<T> implements ListState<T> {
private final List<T> list = new ArrayList<>();
private boolean clearCalled = false;
@Override
public void clear() {
list.clear();
clearCalled = true;
}
@Override
public Iterable<T> get() throws Exception {
return list;
}
@Override
public void add(T value) throws Exception {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
list.add(value);
}
public List<T> getList() {
return list;
}
boolean isClearCalled() {
return clearCalled;
}
@Override
public void update(List<T> values) throws Exception {
clear();
addAll(values);
}
@Override
public void addAll(List<T> values) throws Exception {
if (values != null) {
values.forEach(
v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
list.addAll(values);
}
}
}
}

@ -39,6 +39,7 @@ import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -58,9 +59,9 @@ public abstract class PostgresTestBase extends AbstractTestBase {
// use newer version of postgresql image to support pgoutput plugin
// when testing postgres 13, only 13-alpine supports both amd64 and arm64
protected static final DockerImageName PG_IMAGE =
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres");
DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres");
public static final PostgreSQLContainer<?> POSTGERS_CONTAINER =
public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
new PostgreSQLContainer<>(PG_IMAGE)
.withDatabaseName(DEFAULT_DB)
.withUsername("postgres")
@ -72,41 +73,43 @@ public abstract class PostgresTestBase extends AbstractTestBase {
// default
"fsync=off",
"-c",
// to ensure that the slot becomes inactive during the failover
"wal_sender_timeout=1000",
"-c",
"max_replication_slots=20");
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(POSTGERS_CONTAINER)).join();
Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
LOG.info("Containers are started.");
}
protected Connection getJdbcConnection() throws SQLException {
protected Connection getJdbcConnection(PostgreSQLContainer container) throws SQLException {
return DriverManager.getConnection(
POSTGERS_CONTAINER.getJdbcUrl(),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword());
container.getJdbcUrl(), container.getUsername(), container.getPassword());
}
public static Connection getJdbcConnection(String databaseName) throws SQLException {
public static Connection getJdbcConnection(PostgreSQLContainer container, String databaseName)
throws SQLException {
return DriverManager.getConnection(
POSTGERS_CONTAINER.withDatabaseName(databaseName).getJdbcUrl(),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword());
container.withDatabaseName(databaseName).getJdbcUrl(),
container.getUsername(),
container.getPassword());
}
public static String getSlotName() {
final Random random = new Random();
int id = random.nextInt(10000);
return "flink_" + id;
}
/**
* Executes a JDBC statement using the default jdbc config without autocommitting the
* connection.
*/
protected void initializePostgresTable(String sqlFile) {
protected void initializePostgresTable(PostgreSQLContainer container, String sqlFile) {
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
final URL ddlTestFile = PostgresTestBase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try (Connection connection = getJdbcConnection();
try (Connection connection = getJdbcConnection(container);
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(

@ -83,11 +83,11 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
// 9 records in the inventory.products table
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(
POSTGERS_CONTAINER,
POSTGRES_CONTAINER,
DB_NAME_PREFIX,
SCHEMA_NAME,
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword());
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
@ -100,13 +100,13 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
JdbcIncrementalSource<String> postgresIncrementalSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname(POSTGERS_CONTAINER.getHost())
.port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.hostname(POSTGRES_CONTAINER.getHost())
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.database(inventoryDatabase.getDatabaseName())
.schemaList(SCHEMA_NAME)
.tableList(TABLE_ID)
.username(POSTGERS_CONTAINER.getUsername())
.password(POSTGERS_CONTAINER.getPassword())
.username(POSTGRES_CONTAINER.getUsername())
.password(POSTGRES_CONTAINER.getPassword())
.slotName(SLOT_NAME)
.decodingPluginName(PLUGIN_NAME)
.deserializer(deserializer)
@ -130,6 +130,7 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
}
@Test
@Ignore
public void testConsumingAllEvents() throws Exception {
final DataType dataType =
DataTypes.ROW(
@ -144,13 +145,13 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
JdbcIncrementalSource<RowData> postgresIncrementalSource =
PostgresSourceBuilder.PostgresIncrementalSource.<RowData>builder()
.hostname(POSTGERS_CONTAINER.getHost())
.port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.hostname(POSTGRES_CONTAINER.getHost())
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.database(inventoryDatabase.getDatabaseName())
.schemaList(SCHEMA_NAME)
.tableList(TABLE_ID)
.username(POSTGERS_CONTAINER.getUsername())
.password(POSTGERS_CONTAINER.getPassword())
.username(POSTGRES_CONTAINER.getUsername())
.password(POSTGRES_CONTAINER.getPassword())
.slotName(SLOT_NAME)
.decodingPluginName(PLUGIN_NAME)
.deserializer(buildRowDataDebeziumDeserializeSchema(dataType))
@ -247,8 +248,8 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
private PostgresConnection getConnection() throws SQLException {
Map<String, String> properties = new HashMap<>();
properties.put("hostname", POSTGERS_CONTAINER.getHost());
properties.put("port", String.valueOf(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
properties.put("hostname", POSTGRES_CONTAINER.getHost());
properties.put("port", String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
properties.put("dbname", inventoryDatabase.getDatabaseName());
properties.put("user", inventoryDatabase.getUsername());
properties.put("password", inventoryDatabase.getPassword());
@ -295,9 +296,9 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
SlotState slotState = connection.getReplicationSlotState(SLOT_NAME, PLUGIN_NAME);
while (slotState == null) {
log.info("slot state is null, wait a little bit");
log.info("Waiting until the replication slot is ready ...");
try {
Thread.sleep(1000L);
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}

@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -86,11 +85,11 @@ public class PostgresSourceITCase extends PostgresTestBase {
private final UniqueDatabase customDatabase =
new UniqueDatabase(
POSTGERS_CONTAINER,
POSTGRES_CONTAINER,
DB_NAME_PREFIX,
SCHEMA_NAME,
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword());
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
/** First part stream events, which is made by {@link #makeFirstPartStreamEvents}. */
private final List<String> firstPartStreamEvents =
@ -365,17 +364,13 @@ public class PostgresSourceITCase extends PostgresTestBase {
expectedStreamData.addAll(firstPartStreamEvents);
expectedStreamData.addAll(secondPartStreamEvents);
}
// wait for the stream reading
Thread.sleep(2000L);
assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size()));
assertTrue(!hasNextData(iterator));
}
private String getSlotName() {
final Random random = new Random();
int id = random.nextInt(10000);
return "flink_" + id;
}
private void sleepMs(long millis) {
try {
Thread.sleep(millis);

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
@ -27,7 +28,6 @@ import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -47,10 +47,8 @@ import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Integration tests for PostgreSQL Table source. */
@Ignore
@RunWith(Parameterized.class)
public class PostgreSQLConnectorITCase extends PostgresTestBase {
private static final String SLOT_NAME = "flinktest";
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@ -74,6 +72,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setRestartStrategy(RestartStrategies.noRestart());
if (parallelismSnapshot) {
env.setParallelism(4);
env.enableCheckpointing(200);
@ -85,7 +84,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
@Test
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
initializePostgresTable("inventory");
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -105,15 +104,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
parallelismSnapshot,
SLOT_NAME);
getSlotName());
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
@ -134,7 +133,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000);
// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@ -191,9 +194,82 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
if (!parallelismSnapshot) {
return;
}
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'slot.name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset'"
+ ")",
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
parallelismSnapshot,
getSlotName());
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(10000L);
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.products WHERE id=111;");
}
waitForSinkSize("sink", 5);
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testExceptionForReplicaIdentity() throws Exception {
initializePostgresTable("replica_identity");
initializePostgresTable(POSTGRES_CONTAINER, "replica_identity");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -213,15 +289,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
parallelismSnapshot,
"replica_identity_slot");
getSlotName());
String sinkDDL =
"CREATE TABLE sink ("
+ " name STRING,"
@ -241,7 +317,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
"INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000);
// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@ -271,7 +351,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
@Test
public void testAllTypes() throws Throwable {
initializePostgresTable("column_type_test");
initializePostgresTable(POSTGRES_CONTAINER, "column_type_test");
String sourceDDL =
String.format(
@ -309,15 +389,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"full_types",
parallelismSnapshot,
SLOT_NAME);
getSlotName());
String sinkDDL =
"CREATE TABLE sink ("
+ " id INTEGER NOT NULL,"
@ -340,7 +420,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " time_c TIME(0),"
+ " default_numeric_c DECIMAL,"
+ " geography_c STRING,"
+ " geometry_c STRING"
+ " geometry_c STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
@ -351,9 +432,12 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
waitForSnapshotStarted("sink");
waitForSinkSize("sink", 1);
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000);
try (Connection connection = getJdbcConnection();
// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id=1;");
}
@ -363,8 +447,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
List<String> expected =
Arrays.asList(
"+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
"-U(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
"+U(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
"-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})",
"+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEquals(expected, actual);
@ -373,7 +457,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
@Test
public void testMetadataColumns() throws Throwable {
initializePostgresTable("inventory");
initializePostgresTable(POSTGRES_CONTAINER, "inventory");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -397,15 +481,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
parallelismSnapshot,
"meta_data_slot");
getSlotName());
String sinkDDL =
"CREATE TABLE sink ("
@ -429,8 +513,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
waitForSnapshotStarted("sink");
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000);
try (Connection connection = getJdbcConnection();
// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@ -476,7 +563,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
@Test
public void testUpsertMode() throws Exception {
initializePostgresTable("replica_identity");
initializePostgresTable(POSTGRES_CONTAINER, "replica_identity");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
@ -498,14 +585,14 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'changelog-mode' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"inventory",
"products",
"replica_identity_slot",
getSlotName(),
parallelismSnapshot,
"upsert");
String sinkDDL =
@ -527,7 +614,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
"INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name");
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000);
// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute(
@ -586,7 +677,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
Thread.sleep(300);
}
}

@ -108,7 +108,7 @@ public class UniqueDatabase {
private void createDatabase(String databaseName) throws SQLException {
try (Connection connection =
PostgresTestBase.getJdbcConnection(PostgresTestBase.DEFAULT_DB)) {
PostgresTestBase.getJdbcConnection(container, PostgresTestBase.DEFAULT_DB)) {
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE " + databaseName);
}
@ -123,7 +123,8 @@ public class UniqueDatabase {
try {
createDatabase(databaseName);
try (Connection connection = PostgresTestBase.getJdbcConnection(databaseName);
try (Connection connection =
PostgresTestBase.getJdbcConnection(container, databaseName);
Statement statement = connection.createStatement()) {
final List<String> statements =
Arrays.stream(

Loading…
Cancel
Save