From 6d6881663a02c50f900739bde2be8bca288f4970 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Wed, 21 Jun 2023 11:59:59 +0800 Subject: [PATCH] [postgres] Backfill task will be able to end when there is not new change data but read the ending lsn --- .../PostgresStreamingChangeEventSource.java | 530 ++++++++++ .../PostgresReplicationConnection.java | 917 +++++++++++++++++ .../postgres/PostgresSQLSourceTest.java | 922 ++++++++++++++++++ .../connectors/postgres/PostgresTestBase.java | 35 +- .../source/PostgresSourceExampleTest.java | 31 +- .../postgres/source/PostgresSourceITCase.java | 15 +- .../table/PostgreSQLConnectorITCase.java | 187 +++- .../postgres/testutils/UniqueDatabase.java | 5 +- 8 files changed, 2551 insertions(+), 91 deletions(-) create mode 100644 flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java create mode 100644 flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java create mode 100644 flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java new file mode 100644 index 000000000..4b97321be --- /dev/null +++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -0,0 +1,530 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.connection.LogicalDecodingMessage; +import io.debezium.connector.postgresql.connection.Lsn; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; +import io.debezium.connector.postgresql.connection.ReplicationConnection; +import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation; +import io.debezium.connector.postgresql.connection.ReplicationStream; +import io.debezium.connector.postgresql.connection.WalPositionLocator; +import io.debezium.connector.postgresql.spi.Snapshotter; +import io.debezium.heartbeat.Heartbeat; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.DelayStrategy; +import io.debezium.util.ElapsedTimeStrategy; +import io.debezium.util.Threads; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Copied from Debezium 1.9.7. + * + *

Line 150~151 : set the ending lsn for the replication connection. + */ +public class PostgresStreamingChangeEventSource + implements StreamingChangeEventSource { + + private static final String KEEP_ALIVE_THREAD_NAME = "keep-alive"; + + /** + * Number of received events without sending anything to Kafka which will trigger a "WAL backlog + * growing" warning. + */ + private static final int GROWING_WAL_WARNING_LOG_INTERVAL = 10_000; + + private static final Logger LOGGER = + LoggerFactory.getLogger(PostgresStreamingChangeEventSource.class); + + // PGOUTPUT decoder sends the messages with larger time gaps than other decoders + // We thus try to read the message multiple times before we make poll pause + private static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5; + + private final PostgresConnection connection; + private final PostgresEventDispatcher dispatcher; + private final ErrorHandler errorHandler; + private final Clock clock; + private final PostgresSchema schema; + private final PostgresConnectorConfig connectorConfig; + private final PostgresTaskContext taskContext; + private final ReplicationConnection replicationConnection; + private final AtomicReference replicationStream = new AtomicReference<>(); + private final Snapshotter snapshotter; + private final DelayStrategy pauseNoMessage; + private final ElapsedTimeStrategy connectionProbeTimer; + + // Offset committing is an asynchronous operation. + // When connector is restarted we cannot be sure about timing of recovery, offset committing + // etc. + // as this is driven by Kafka Connect. This might be a root cause of DBZ-5163. + // This flag will ensure that LSN is flushed only if we are really in message processing mode. + private volatile boolean lsnFlushingAllowed = false; + + /** + * The minimum of (number of event received since the last event sent to Kafka, number of event + * received since last WAL growing warning issued). + */ + private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; + + private Lsn lastCompletelyProcessedLsn; + + public PostgresStreamingChangeEventSource( + PostgresConnectorConfig connectorConfig, + Snapshotter snapshotter, + PostgresConnection connection, + PostgresEventDispatcher dispatcher, + ErrorHandler errorHandler, + Clock clock, + PostgresSchema schema, + PostgresTaskContext taskContext, + ReplicationConnection replicationConnection) { + this.connectorConfig = connectorConfig; + this.connection = connection; + this.dispatcher = dispatcher; + this.errorHandler = errorHandler; + this.clock = clock; + this.schema = schema; + pauseNoMessage = + DelayStrategy.constant(taskContext.getConfig().getPollInterval().toMillis()); + this.taskContext = taskContext; + this.snapshotter = snapshotter; + this.replicationConnection = replicationConnection; + this.connectionProbeTimer = + ElapsedTimeStrategy.constant( + Clock.system(), connectorConfig.statusUpdateInterval()); + } + + @Override + public void init() { + // refresh the schema so we have a latest view of the DB tables + try { + taskContext.refreshSchema(connection, true); + } catch (SQLException e) { + throw new DebeziumException("Error while executing initial schema load", e); + } + } + + @Override + public void execute( + ChangeEventSourceContext context, + PostgresPartition partition, + PostgresOffsetContext offsetContext) + throws InterruptedException { + if (!snapshotter.shouldStream()) { + LOGGER.info("Streaming is not enabled in correct configuration"); + return; + } + + lsnFlushingAllowed = false; + + // replication slot could exist at the time of starting Debezium so we will stream from the + // position in the slot + // instead of the last position in the database + boolean hasStartLsnStoredInContext = offsetContext != null; + + if (!hasStartLsnStoredInContext) { + offsetContext = + PostgresOffsetContext.initialContext(connectorConfig, connection, clock); + } + + try { + final WalPositionLocator walPosition; + + ((PostgresReplicationConnection) replicationConnection) + .setEndingPos(offsetContext.getStreamingStoppingLsn()); + if (hasStartLsnStoredInContext) { + // start streaming from the last recorded position in the offset + final Lsn lsn = + offsetContext.lastCompletelyProcessedLsn() != null + ? offsetContext.lastCompletelyProcessedLsn() + : offsetContext.lsn(); + LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); + walPosition = new WalPositionLocator(offsetContext.lastCommitLsn(), lsn); + replicationStream.compareAndSet( + null, replicationConnection.startStreaming(lsn, walPosition)); + } else { + LOGGER.info( + "No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); + walPosition = new WalPositionLocator(); + replicationStream.compareAndSet( + null, replicationConnection.startStreaming(walPosition)); + } + // for large dbs, the refresh of schema can take too much time + // such that the connection times out. We must enable keep + // alive to ensure that it doesn't time out + ReplicationStream stream = this.replicationStream.get(); + stream.startKeepAlive( + Threads.newSingleThreadExecutor( + PostgresConnector.class, + connectorConfig.getLogicalName(), + KEEP_ALIVE_THREAD_NAME)); + + init(); + + // If we need to do a pre-snapshot streaming catch up, we should allow the snapshot + // transaction to persist + // but normally we want to start streaming without any open transactions. + if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { + connection.commit(); + } + + this.lastCompletelyProcessedLsn = replicationStream.get().startLsn(); + + if (walPosition.searchingEnabled()) { + searchWalPosition(context, stream, walPosition); + try { + if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { + connection.commit(); + } + } catch (Exception e) { + LOGGER.info("Commit failed while preparing for reconnect", e); + } + walPosition.enableFiltering(); + stream.stopKeepAlive(); + replicationConnection.reconnect(); + replicationStream.set( + replicationConnection.startStreaming( + walPosition.getLastEventStoredLsn(), walPosition)); + stream = this.replicationStream.get(); + stream.startKeepAlive( + Threads.newSingleThreadExecutor( + PostgresConnector.class, + connectorConfig.getLogicalName(), + KEEP_ALIVE_THREAD_NAME)); + } + processMessages(context, partition, offsetContext, stream); + } catch (Throwable e) { + errorHandler.setProducerThrowable(e); + } finally { + if (replicationConnection != null) { + LOGGER.debug("stopping streaming..."); + // stop the keep alive thread, this also shuts down the + // executor pool + ReplicationStream stream = replicationStream.get(); + if (stream != null) { + stream.stopKeepAlive(); + } + // TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the + // stream, but it's not reliable atm (see javadoc) + // replicationStream.close(); + // close the connection - this should also disconnect the current stream even if + // it's blocking + try { + if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { + connection.commit(); + } + replicationConnection.close(); + } catch (Exception e) { + LOGGER.debug("Exception while closing the connection", e); + } + replicationStream.set(null); + } + } + } + + private void processMessages( + ChangeEventSourceContext context, + PostgresPartition partition, + PostgresOffsetContext offsetContext, + final ReplicationStream stream) + throws SQLException, InterruptedException { + LOGGER.info("Processing messages"); + int noMessageIterations = 0; + while (context.isRunning() + && (offsetContext.getStreamingStoppingLsn() == null + || (lastCompletelyProcessedLsn.compareTo( + offsetContext.getStreamingStoppingLsn()) + < 0))) { + + boolean receivedMessage = + stream.readPending( + message -> { + final Lsn lsn = stream.lastReceivedLsn(); + + if (message.isLastEventForLsn()) { + lastCompletelyProcessedLsn = lsn; + } + + // Tx BEGIN/END event + if (message.isTransactionalMessage()) { + if (!connectorConfig.shouldProvideTransactionMetadata()) { + LOGGER.trace("Received transactional message {}", message); + // Don't skip on BEGIN message as it would flush LSN for the + // whole transaction + // too early + if (message.getOperation() == Operation.COMMIT) { + commitMessage(partition, offsetContext, lsn); + } + return; + } + + offsetContext.updateWalPosition( + lsn, + lastCompletelyProcessedLsn, + message.getCommitTime(), + toLong(message.getTransactionId()), + taskContext.getSlotXmin(connection), + null); + if (message.getOperation() == Operation.BEGIN) { + dispatcher.dispatchTransactionStartedEvent( + partition, + toString(message.getTransactionId()), + offsetContext); + } else if (message.getOperation() == Operation.COMMIT) { + commitMessage(partition, offsetContext, lsn); + dispatcher.dispatchTransactionCommittedEvent( + partition, offsetContext); + } + maybeWarnAboutGrowingWalBacklog(true); + } else if (message.getOperation() == Operation.MESSAGE) { + offsetContext.updateWalPosition( + lsn, + lastCompletelyProcessedLsn, + message.getCommitTime(), + toLong(message.getTransactionId()), + taskContext.getSlotXmin(connection)); + + // non-transactional message that will not be followed by a + // COMMIT message + if (message.isLastEventForLsn()) { + commitMessage(partition, offsetContext, lsn); + } + + dispatcher.dispatchLogicalDecodingMessage( + partition, + offsetContext, + clock.currentTimeAsInstant().toEpochMilli(), + (LogicalDecodingMessage) message); + + maybeWarnAboutGrowingWalBacklog(true); + } + // DML event + else { + TableId tableId = null; + if (message.getOperation() != Operation.NOOP) { + tableId = PostgresSchema.parse(message.getTable()); + Objects.requireNonNull(tableId); + } + + offsetContext.updateWalPosition( + lsn, + lastCompletelyProcessedLsn, + message.getCommitTime(), + toLong(message.getTransactionId()), + taskContext.getSlotXmin(connection), + tableId); + + boolean dispatched = + message.getOperation() != Operation.NOOP + && dispatcher.dispatchDataChangeEvent( + partition, + tableId, + new PostgresChangeRecordEmitter( + partition, + offsetContext, + clock, + connectorConfig, + schema, + connection, + tableId, + message)); + + maybeWarnAboutGrowingWalBacklog(dispatched); + } + }); + + probeConnectionIfNeeded(); + + if (receivedMessage) { + noMessageIterations = 0; + lsnFlushingAllowed = true; + } else { + if (offsetContext.hasCompletelyProcessedPosition()) { + dispatcher.dispatchHeartbeatEvent(partition, offsetContext); + } + noMessageIterations++; + if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) { + noMessageIterations = 0; + pauseNoMessage.sleepWhen(true); + } + } + if (!isInPreSnapshotCatchUpStreaming(offsetContext)) { + // During catch up streaming, the streaming phase needs to hold a transaction open + // so that + // the phase can stream event up to a specific lsn and the snapshot that occurs + // after the catch up + // streaming will not lose the current view of data. Since we need to hold the + // transaction open + // for the snapshot, this block must not commit during catch up streaming. + connection.commit(); + } + } + } + + private void searchWalPosition( + ChangeEventSourceContext context, + final ReplicationStream stream, + final WalPositionLocator walPosition) + throws SQLException, InterruptedException { + AtomicReference resumeLsn = new AtomicReference<>(); + int noMessageIterations = 0; + + LOGGER.info("Searching for WAL resume position"); + while (context.isRunning() && resumeLsn.get() == null) { + + boolean receivedMessage = + stream.readPending( + message -> { + final Lsn lsn = stream.lastReceivedLsn(); + resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null)); + }); + + if (receivedMessage) { + noMessageIterations = 0; + } else { + noMessageIterations++; + if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) { + noMessageIterations = 0; + pauseNoMessage.sleepWhen(true); + } + } + + probeConnectionIfNeeded(); + } + LOGGER.info("WAL resume position '{}' discovered", resumeLsn.get()); + } + + private void probeConnectionIfNeeded() throws SQLException { + if (connectionProbeTimer.hasElapsed()) { + connection.prepareQuery("SELECT 1"); + connection.commit(); + } + } + + private void commitMessage( + PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) + throws SQLException, InterruptedException { + lastCompletelyProcessedLsn = lsn; + offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); + maybeWarnAboutGrowingWalBacklog(false); + dispatcher.dispatchHeartbeatEvent(partition, offsetContext); + } + + /** + * If we receive change events but all of them get filtered out, we cannot commit any new offset + * with Apache Kafka. This in turn means no LSN is ever acknowledged with the replication slot, + * causing any ever growing WAL backlog. + * + *

This situation typically occurs if there are changes on the database server, (e.g. in an + * excluded database), but none of them is in table.include.list. To prevent this, heartbeats + * can be used, as they will allow us to commit offsets also when not propagating any "real" + * change event. + * + *

The purpose of this method is to detect this situation and log a warning every {@link + * #GROWING_WAL_WARNING_LOG_INTERVAL} filtered events. + * + * @param dispatched Whether an event was sent to the broker or not + */ + private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) { + if (dispatched) { + numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; + } else { + numberOfEventsSinceLastEventSentOrWalGrowingWarning++; + } + + if (numberOfEventsSinceLastEventSentOrWalGrowingWarning > GROWING_WAL_WARNING_LOG_INTERVAL + && !dispatcher.heartbeatsEnabled()) { + LOGGER.warn( + "Received {} events which were all filtered out, so no offset could be committed. " + + "This prevents the replication slot from acknowledging the processed WAL offsets, " + + "causing a growing backlog of non-removeable WAL segments on the database server. " + + "Consider to either adjust your filter configuration or enable heartbeat events " + + "(via the {} option) to avoid this situation.", + numberOfEventsSinceLastEventSentOrWalGrowingWarning, + Heartbeat.HEARTBEAT_INTERVAL_PROPERTY_NAME); + + numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; + } + } + + @Override + public void commitOffset(Map offset) { + try { + ReplicationStream replicationStream = this.replicationStream.get(); + final Lsn commitLsn = + Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)); + final Lsn changeLsn = + Lsn.valueOf( + (Long) + offset.get( + PostgresOffsetContext + .LAST_COMPLETELY_PROCESSED_LSN_KEY)); + final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; + + if (replicationStream != null && lsn != null) { + if (!lsnFlushingAllowed) { + LOGGER.info( + "Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", + lsn); + return; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Flushing LSN to server: {}", lsn); + } + // tell the server the point up to which we've processed data, so it can be free to + // recycle WAL segments + replicationStream.flushLsn(lsn); + } else { + LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); + } + } catch (SQLException e) { + throw new ConnectException(e); + } + } + + /** + * Returns whether the current streaming phase is running a catch up streaming phase that runs + * before a snapshot. This is useful for transaction management. + * + *

During pre-snapshot catch up streaming, we open the snapshot transaction early and hold + * the transaction open throughout the pre snapshot catch up streaming phase so that we know + * where to stop streaming and can start the snapshot phase at a consistent location. This is + * opposed the regular streaming, where we do not a lingering open transaction. + * + * @return true if the current streaming phase is performing catch up streaming + */ + private boolean isInPreSnapshotCatchUpStreaming(PostgresOffsetContext offsetContext) { + return offsetContext.getStreamingStoppingLsn() != null; + } + + private Long toLong(OptionalLong l) { + return l.isPresent() ? Long.valueOf(l.getAsLong()) : null; + } + + private String toString(OptionalLong l) { + return l.isPresent() ? String.valueOf(l.getAsLong()) : null; + } + + @FunctionalInterface + public static interface PgConnectionSupplier { + BaseConnection get() throws SQLException; + } +} diff --git a/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java new file mode 100644 index 000000000..364cb59bd --- /dev/null +++ b/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -0,0 +1,917 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import io.debezium.DebeziumException; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotCreationResult; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.JdbcConnectionException; +import io.debezium.relational.RelationalTableFilters; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.ServerVersion; +import org.postgresql.replication.PGReplicationStream; +import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; +import org.postgresql.util.PSQLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.time.Duration; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.lang.Math.toIntExact; + +/** + * Copied from Debezium 1.9.7. + * + *

The {@link ReplicationConnection} created from {@code createReplicationStream} will hang when + * the wal logs only contain the keepAliveMessage. Support to set an ending Lsn to stop hanging. + * + *

Line 82, 694~695 : add endingPos and its setter. + * + *

Line 554~559, 578~583: ReplicationStream from {@code createReplicationStream} will stop when + * endingPos reached. + */ +public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); + + private final String slotName; + private final String publicationName; + private final RelationalTableFilters tableFilter; + private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; + private final PostgresConnectorConfig.LogicalDecoder plugin; + private final boolean dropSlotOnClose; + private final PostgresConnectorConfig connectorConfig; + private final Duration statusUpdateInterval; + private final MessageDecoder messageDecoder; + private final TypeRegistry typeRegistry; + private final Properties streamParams; + + private Lsn defaultStartingPos; + private SlotCreationResult slotCreationInfo; + private boolean hasInitedSlot; + + private Lsn endingPos; + + /** + * Creates a new replication connection with the given params. + * + * @param config the JDBC configuration for the connection; may not be null + * @param slotName the name of the DB slot for logical replication; may not be null + * @param publicationName the name of the DB publication for logical replication; may not be + * null + * @param tableFilter the tables to watch of the DB publication for logical replication; may not + * be null + * @param publicationAutocreateMode the mode for publication autocreation; may not be null + * @param plugin decoder matching the server side plug-in used for streaming changes; may not be + * null + * @param dropSlotOnClose whether the replication slot should be dropped once the connection is + * closed + * @param statusUpdateInterval the interval at which the replication connection should + * periodically send status + * @param doSnapshot whether the connector is doing snapshot + * @param jdbcConnection general POstgreSQL JDBC connection + * @param typeRegistry registry with PostgreSQL types + * @param streamParams additional parameters to pass to the replication stream + * @param schema the schema; must not be null + *

updates to the server + */ + private PostgresReplicationConnection( + PostgresConnectorConfig config, + String slotName, + String publicationName, + RelationalTableFilters tableFilter, + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, + PostgresConnectorConfig.LogicalDecoder plugin, + boolean dropSlotOnClose, + boolean doSnapshot, + Duration statusUpdateInterval, + PostgresConnection jdbcConnection, + TypeRegistry typeRegistry, + Properties streamParams, + PostgresSchema schema) { + super( + addDefaultSettings(config.getJdbcConfig()), + PostgresConnection.FACTORY, + null, + null, + "\"", + "\""); + + this.connectorConfig = config; + this.slotName = slotName; + this.publicationName = publicationName; + this.tableFilter = tableFilter; + this.publicationAutocreateMode = publicationAutocreateMode; + this.plugin = plugin; + this.dropSlotOnClose = dropSlotOnClose; + this.statusUpdateInterval = statusUpdateInterval; + this.messageDecoder = + plugin.messageDecoder(new MessageDecoderContext(config, schema), jdbcConnection); + this.typeRegistry = typeRegistry; + this.streamParams = streamParams; + this.slotCreationInfo = null; + this.hasInitedSlot = false; + } + + private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { + // first copy the parent's default settings... + // then set some additional replication specific settings + return JdbcConfiguration.adapt( + PostgresConnection.addDefaultSettings( + configuration, PostgresConnection.CONNECTION_STREAMING) + .edit() + .with("replication", "database") + .with( + "preferQueryMode", + "simple") // replication protocol only supports simple query mode + .build()); + } + + private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException { + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO)) { + return connection.readReplicationSlotInfo(slotName, plugin.getPostgresPluginName()); + } + } + + protected void initPublication() { + String createPublicationStmt; + String tableFilterString = null; + if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) { + LOGGER.info("Initializing PgOutput logical decoder publication"); + try { + // Unless the autocommit is disabled the SELECT publication query will stay running + Connection conn = pgConnection(); + conn.setAutoCommit(false); + + String selectPublication = + String.format( + "SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", + publicationName); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(selectPublication)) { + if (rs.next()) { + Long count = rs.getLong(1); + // Close eagerly as the transaction might stay running + if (count == 0L) { + LOGGER.info( + "Creating new publication '{}' for plugin '{}'", + publicationName, + plugin); + switch (publicationAutocreateMode) { + case DISABLED: + throw new ConnectException( + "Publication autocreation is disabled, please create one and restart the connector."); + case ALL_TABLES: + createPublicationStmt = + String.format( + "CREATE PUBLICATION %s FOR ALL TABLES;", + publicationName); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it. + stmt.execute(createPublicationStmt); + break; + case FILTERED: + try { + Set tablesToCapture = determineCapturedTables(); + tableFilterString = + tablesToCapture.stream() + .map(TableId::toDoubleQuotedString) + .collect(Collectors.joining(", ")); + if (tableFilterString.isEmpty()) { + throw new DebeziumException( + String.format( + "No table filters found for filtered publication %s", + publicationName)); + } + createPublicationStmt = + String.format( + "CREATE PUBLICATION %s FOR TABLE %s;", + publicationName, tableFilterString); + LOGGER.info( + "Creating Publication with statement '{}'", + createPublicationStmt); + // Publication doesn't exist, create it but restrict to the + // tableFilter. + stmt.execute(createPublicationStmt); + } catch (Exception e) { + throw new ConnectException( + String.format( + "Unable to create filtered publication %s for %s", + publicationName, tableFilterString), + e); + } + break; + } + } else { + LOGGER.trace( + "A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server " + + "and will be used by the plugin", + publicationName, + plugin, + database()); + } + } + } + conn.commit(); + conn.setAutoCommit(true); + } catch (SQLException e) { + throw new JdbcConnectionException(e); + } + } + } + + private Set determineCapturedTables() throws Exception { + Set allTableIds = + this.connect() + .readTableNames( + pgConnection().getCatalog(), null, null, new String[] {"TABLE"}); + + Set capturedTables = new HashSet<>(); + + for (TableId tableId : allTableIds) { + if (tableFilter.dataCollectionFilter().isIncluded(tableId)) { + LOGGER.trace("Adding table {} to the list of captured tables", tableId); + capturedTables.add(tableId); + } else { + LOGGER.trace( + "Ignoring table {} as it's not included in the filter configuration", + tableId); + } + } + + return capturedTables.stream() + .sorted() + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + protected void initReplicationSlot() throws SQLException, InterruptedException { + ServerInfo.ReplicationSlot slotInfo = getSlotInfo(); + + boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo; + try { + // there's no info for this plugin and slot so create a new slot + if (shouldCreateSlot) { + this.createReplicationSlot(); + } + + // replication connection does not support parsing of SQL statements so we need to + // create + // the connection without executing on connect statements - see JDBC opt + // preferQueryMode=simple + pgConnection(); + final String identifySystemStatement = "IDENTIFY_SYSTEM"; + LOGGER.debug( + "running '{}' to validate replication connection", identifySystemStatement); + final Lsn xlogStart = + queryAndMap( + identifySystemStatement, + rs -> { + if (!rs.next()) { + throw new IllegalStateException( + "The DB connection is not a valid replication connection"); + } + String xlogpos = rs.getString("xlogpos"); + LOGGER.debug("received latest xlogpos '{}'", xlogpos); + return Lsn.valueOf(xlogpos); + }); + + if (slotCreationInfo != null) { + this.defaultStartingPos = slotCreationInfo.startLsn(); + } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) { + // this is a new slot or we weren't able to read a valid flush LSN pos, so we always + // start from the xlog pos that was reported + this.defaultStartingPos = xlogStart; + } else { + Lsn latestFlushedLsn = slotInfo.latestFlushedLsn(); + this.defaultStartingPos = + latestFlushedLsn.compareTo(xlogStart) < 0 ? latestFlushedLsn : xlogStart; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn); + } + } + hasInitedSlot = true; + } catch (SQLException e) { + throw new JdbcConnectionException(e); + } + } + + // Temporary replication slots is a new feature of PostgreSQL 10 + private boolean useTemporarySlot() throws SQLException { + // Temporary replication slots cannot be used due to connection restart + // when finding WAL position + // return dropSlotOnClose && pgConnection().haveMinimumServerVersion(ServerVersion.v10); + return false; + } + + /** + * creating a replication connection and starting to stream involves a few steps: 1. we create + * the connection and ensure that a. the slot exists b. the slot isn't currently being used 2. + * we query to get our potential start position in the slot (lsn) 3. we try and start streaming, + * depending on our options (such as in wal2json) this may fail, which can result in the + * connection being killed and we need to start the process over if we are using a temporary + * slot 4. actually start the streamer + * + *

This method takes care of all of these and this method queries for a default starting + * position If you know where you are starting from you should call {@link #startStreaming(Lsn, + * WalPositionLocator)}, this method delegates to that method + * + * @return + * @throws SQLException + * @throws InterruptedException + */ + @Override + public ReplicationStream startStreaming(WalPositionLocator walPosition) + throws SQLException, InterruptedException { + return startStreaming(null, walPosition); + } + + @Override + public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPosition) + throws SQLException, InterruptedException { + initConnection(); + + connect(); + if (offset == null || !offset.isValid()) { + offset = defaultStartingPos; + } + Lsn lsn = offset; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("starting streaming from LSN '{}'", lsn); + } + + final int maxRetries = connectorConfig.maxRetries(); + final Duration delay = connectorConfig.retryDelay(); + int tryCount = 0; + while (true) { + try { + return createReplicationStream(lsn, walPosition); + } catch (Exception e) { + String message = "Failed to start replication stream at " + lsn; + if (++tryCount > maxRetries) { + if (e.getMessage().matches(".*replication slot .* is active.*")) { + message += + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each."; + } + throw new DebeziumException(message, e); + } else { + LOGGER.warn( + message + ", waiting for {} ms and retrying, attempt number {} over {}", + delay, + tryCount, + maxRetries); + final Metronome metronome = Metronome.sleeper(delay, Clock.SYSTEM); + metronome.pause(); + } + } + } + } + + @Override + public void initConnection() throws SQLException, InterruptedException { + // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html + // For pgoutput specifically, the publication must be created before the slot. + initPublication(); + if (!hasInitedSlot) { + initReplicationSlot(); + } + } + + @Override + public Optional createReplicationSlot() throws SQLException { + // note that some of these options are only supported in Postgres 9.4+, additionally + // the options are not yet exported by the jdbc api wrapper, therefore, we just do + // this ourselves but eventually this should be moved back to the jdbc API + // see https://github.com/pgjdbc/pgjdbc/issues/1305 + + LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin); + String tempPart = ""; + // Exported snapshots are supported in Postgres 9.4+ + boolean canExportSnapshot = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4); + if ((dropSlotOnClose) && !canExportSnapshot) { + LOGGER.warn( + "A slot marked as temporary or with an exported snapshot was created, " + + "but not on a supported version of Postgres, ignoring!"); + } + if (useTemporarySlot()) { + tempPart = "TEMPORARY"; + } + + // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html + // For pgoutput specifically, the publication must be created prior to the slot. + initPublication(); + + try (Statement stmt = pgConnection().createStatement()) { + String createCommand = + String.format( + "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s", + slotName, tempPart, plugin.getPostgresPluginName()); + LOGGER.info("Creating replication slot with command {}", createCommand); + stmt.execute(createCommand); + // when we are in Postgres 9.4+, we can parse the slot creation info, + // otherwise, it returns nothing + if (canExportSnapshot) { + this.slotCreationInfo = parseSlotCreation(stmt.getResultSet()); + } + + return Optional.ofNullable(slotCreationInfo); + } + } + + protected BaseConnection pgConnection() throws SQLException { + return (BaseConnection) connection(false); + } + + private SlotCreationResult parseSlotCreation(ResultSet rs) { + try { + if (rs.next()) { + String slotName = rs.getString("slot_name"); + String startPoint = rs.getString("consistent_point"); + String snapName = rs.getString("snapshot_name"); + String pluginName = rs.getString("output_plugin"); + + return new SlotCreationResult(slotName, startPoint, snapName, pluginName); + } else { + throw new ConnectException("No replication slot found"); + } + } catch (SQLException ex) { + throw new ConnectException("Unable to parse create_replication_slot response", ex); + } + } + + private ReplicationStream createReplicationStream( + final Lsn startLsn, WalPositionLocator walPosition) + throws SQLException, InterruptedException { + PGReplicationStream s; + + try { + try { + s = + startPgReplicationStream( + startLsn, + plugin.forceRds() + ? messageDecoder::optionsWithoutMetadata + : messageDecoder::optionsWithMetadata); + messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true); + } catch (PSQLException e) { + LOGGER.debug( + "Could not register for streaming, retrying without optional options", e); + + // re-init the slot after a failed start of slot, as this + // may have closed the slot + if (useTemporarySlot()) { + initReplicationSlot(); + } + + s = + startPgReplicationStream( + startLsn, + plugin.forceRds() + ? messageDecoder::optionsWithoutMetadata + : messageDecoder::optionsWithMetadata); + messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true); + } + } catch (PSQLException e) { + if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) { + // It is possible we are connecting to an old wal2json plug-in + LOGGER.warn( + "Could not register for streaming with metadata in messages, falling back to messages without metadata"); + + // re-init the slot after a failed start of slot, as this + // may have closed the slot + if (useTemporarySlot()) { + initReplicationSlot(); + } + + s = startPgReplicationStream(startLsn, messageDecoder::optionsWithoutMetadata); + messageDecoder.setContainsMetadata(false); + } else if (e.getMessage() + .matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) { + LOGGER.error("Cannot rewind to last processed WAL position", e); + throw new ConnectException( + "The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0."); + } else { + throw e; + } + } + + final PGReplicationStream stream = s; + + return new ReplicationStream() { + + private static final int CHECK_WARNINGS_AFTER_COUNT = 100; + private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT; + private ExecutorService keepAliveExecutor = null; + private AtomicBoolean keepAliveRunning; + private final Metronome metronome = + Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM); + + // make sure this is volatile since multiple threads may be interested in this value + private volatile Lsn lastReceivedLsn; + + @Override + public void read(ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + processWarnings(false); + ByteBuffer read = stream.read(); + final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace( + "Streaming requested from LSN {}, received LSN {}", + startLsn, + lastReceiveLsn); + if (reachEnd(lastReceivedLsn)) { + lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + processor.process(new ReplicationMessage.NoopMessage(null, null)); + return; + } + if (messageDecoder.shouldMessageBeSkipped( + read, lastReceiveLsn, startLsn, walPosition)) { + return; + } + deserializeMessages(read, processor); + } + + @Override + public boolean readPending(ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + processWarnings(false); + ByteBuffer read = stream.readPending(); + final Lsn lastReceiveLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace( + "Streaming requested from LSN {}, received LSN {}", + startLsn, + lastReceiveLsn); + + if (reachEnd(lastReceiveLsn)) { + lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + processor.process(new ReplicationMessage.NoopMessage(null, null)); + return true; + } + + if (read == null) { + return false; + } + + if (messageDecoder.shouldMessageBeSkipped( + read, lastReceiveLsn, startLsn, walPosition)) { + return true; + } + + deserializeMessages(read, processor); + + return true; + } + + private void deserializeMessages( + ByteBuffer buffer, ReplicationMessageProcessor processor) + throws SQLException, InterruptedException { + lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN()); + LOGGER.trace("Received message at LSN {}", lastReceivedLsn); + messageDecoder.processMessage(buffer, processor, typeRegistry); + } + + @Override + public void close() throws SQLException { + processWarnings(true); + stream.close(); + } + + @Override + public void flushLsn(Lsn lsn) throws SQLException { + doFlushLsn(lsn); + } + + private void doFlushLsn(Lsn lsn) throws SQLException { + stream.setFlushedLSN(lsn.asLogSequenceNumber()); + stream.setAppliedLSN(lsn.asLogSequenceNumber()); + + stream.forceUpdateStatus(); + } + + @Override + public Lsn lastReceivedLsn() { + return lastReceivedLsn; + } + + @Override + public void startKeepAlive(ExecutorService service) { + if (keepAliveExecutor == null) { + keepAliveExecutor = service; + keepAliveRunning = new AtomicBoolean(true); + keepAliveExecutor.submit( + () -> { + while (keepAliveRunning.get()) { + try { + LOGGER.trace( + "Forcing status update with replication stream"); + stream.forceUpdateStatus(); + metronome.pause(); + } catch (Exception exp) { + throw new RuntimeException( + "received unexpected exception will perform keep alive", + exp); + } + } + }); + } + } + + @Override + public void stopKeepAlive() { + if (keepAliveExecutor != null) { + keepAliveRunning.set(false); + keepAliveExecutor.shutdownNow(); + keepAliveExecutor = null; + } + } + + private void processWarnings(final boolean forced) throws SQLException { + if (--warningCheckCounter == 0 || forced) { + warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT; + for (SQLWarning w = connection().getWarnings(); + w != null; + w = w.getNextWarning()) { + LOGGER.debug( + "Server-side message: '{}', state = {}, code = {}", + w.getMessage(), + w.getSQLState(), + w.getErrorCode()); + } + connection().clearWarnings(); + } + } + + @Override + public Lsn startLsn() { + return startLsn; + } + + private boolean reachEnd(Lsn receivedLsn) { + if (receivedLsn == null) { + return false; + } + return endingPos != null + && (!endingPos.isNonStopping()) + && endingPos.compareTo(receivedLsn) < 0; + } + }; + } + + public void setEndingPos(Lsn endingPos) { + this.endingPos = endingPos; + } + + private PGReplicationStream startPgReplicationStream( + final Lsn lsn, + BiFunction< + ChainedLogicalStreamBuilder, + Function, + ChainedLogicalStreamBuilder> + configurator) + throws SQLException { + assert lsn != null; + ChainedLogicalStreamBuilder streamBuilder = + pgConnection() + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName("\"" + slotName + "\"") + .withStartPosition(lsn.asLogSequenceNumber()) + .withSlotOptions(streamParams); + streamBuilder = configurator.apply(streamBuilder, this::hasMinimumVersion); + + if (statusUpdateInterval != null && statusUpdateInterval.toMillis() > 0) { + streamBuilder.withStatusInterval( + toIntExact(statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS); + } + + PGReplicationStream stream = streamBuilder.start(); + + // TODO DBZ-508 get rid of this + // Needed by tests when connections are opened and closed in a fast sequence + try { + Thread.sleep(10); + } catch (Exception e) { + } + stream.forceUpdateStatus(); + return stream; + } + + private Boolean hasMinimumVersion(int version) { + try { + return pgConnection().haveMinimumServerVersion(version); + } catch (SQLException e) { + throw new DebeziumException(e); + } + } + + @Override + public synchronized void close() { + close(true); + } + + public synchronized void close(boolean dropSlot) { + try { + LOGGER.debug("Closing message decoder"); + messageDecoder.close(); + } catch (Throwable e) { + LOGGER.error("Unexpected error while closing message decoder", e); + } + + try { + LOGGER.debug("Closing replication connection"); + super.close(); + } catch (Throwable e) { + LOGGER.error("Unexpected error while closing Postgres connection", e); + } + if (dropSlotOnClose && dropSlot) { + // we're dropping the replication slot via a regular - i.e. not a replication - + // connection + try (PostgresConnection connection = + new PostgresConnection( + connectorConfig.getJdbcConfig(), + PostgresConnection.CONNECTION_DROP_SLOT)) { + connection.dropReplicationSlot(slotName); + } catch (Throwable e) { + LOGGER.error("Unexpected error while dropping replication slot", e); + } + } + } + + @Override + public void reconnect() throws SQLException { + close(false); + // Don't re-execute initial commands on reconnection + connection(false); + } + + protected static class ReplicationConnectionBuilder implements Builder { + + private final PostgresConnectorConfig config; + private String slotName = DEFAULT_SLOT_NAME; + private String publicationName = DEFAULT_PUBLICATION_NAME; + private RelationalTableFilters tableFilter; + private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = + PostgresConnectorConfig.AutoCreateMode.ALL_TABLES; + private PostgresConnectorConfig.LogicalDecoder plugin = + PostgresConnectorConfig.LogicalDecoder.DECODERBUFS; + private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE; + private Duration statusUpdateIntervalVal; + private boolean doSnapshot; + private TypeRegistry typeRegistry; + private PostgresSchema schema; + private Properties slotStreamParams = new Properties(); + private PostgresConnection jdbcConnection; + + protected ReplicationConnectionBuilder(PostgresConnectorConfig config) { + assert config != null; + this.config = config; + } + + @Override + public ReplicationConnectionBuilder withSlot(final String slotName) { + assert slotName != null; + this.slotName = slotName; + return this; + } + + @Override + public Builder withPublication(String publicationName) { + assert publicationName != null; + this.publicationName = publicationName; + return this; + } + + @Override + public Builder withTableFilter(RelationalTableFilters tableFilter) { + assert tableFilter != null; + this.tableFilter = tableFilter; + return this; + } + + @Override + public Builder withPublicationAutocreateMode( + PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) { + assert publicationName != null; + this.publicationAutocreateMode = publicationAutocreateMode; + return this; + } + + @Override + public ReplicationConnectionBuilder withPlugin( + final PostgresConnectorConfig.LogicalDecoder plugin) { + assert plugin != null; + this.plugin = plugin; + return this; + } + + @Override + public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) { + this.dropSlotOnClose = dropSlotOnClose; + return this; + } + + @Override + public ReplicationConnectionBuilder streamParams(final String slotStreamParams) { + if (slotStreamParams != null && !slotStreamParams.isEmpty()) { + this.slotStreamParams = new Properties(); + String[] paramsWithValues = slotStreamParams.split(";"); + for (String paramsWithValue : paramsWithValues) { + String[] paramAndValue = paramsWithValue.split("="); + if (paramAndValue.length == 2) { + this.slotStreamParams.setProperty(paramAndValue[0], paramAndValue[1]); + } else { + LOGGER.warn( + "The following STREAM_PARAMS value is invalid: {}", + paramsWithValue); + } + } + } + return this; + } + + @Override + public ReplicationConnectionBuilder statusUpdateInterval( + final Duration statusUpdateInterval) { + this.statusUpdateIntervalVal = statusUpdateInterval; + return this; + } + + @Override + public Builder doSnapshot(boolean doSnapshot) { + this.doSnapshot = doSnapshot; + return this; + } + + @Override + public Builder jdbcMetadataConnection(PostgresConnection jdbcConnection) { + this.jdbcConnection = jdbcConnection; + return this; + } + + @Override + public ReplicationConnection build() { + assert plugin != null : "Decoding plugin name is not set"; + return new PostgresReplicationConnection( + config, + slotName, + publicationName, + tableFilter, + publicationAutocreateMode, + plugin, + dropSlotOnClose, + doSnapshot, + statusUpdateIntervalVal, + jdbcConnection, + typeRegistry, + slotStreamParams, + schema); + } + + @Override + public Builder withTypeRegistry(TypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + return this; + } + + @Override + public Builder withSchema(PostgresSchema schema) { + this.schema = schema; + return this; + } + } +} diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java new file mode 100644 index 000000000..450bb6912 --- /dev/null +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresSQLSourceTest.java @@ -0,0 +1,922 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import com.jayway.jsonpath.JsonPath; +import com.ververica.cdc.connectors.utils.TestSourceContext; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.OptionalLong; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertRead; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertUpdate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; + +/** Tests for {@link PostgreSQLSource} which also heavily tests {@link DebeziumSourceFunction}. */ +public class PostgresSQLSourceTest extends PostgresTestBase { + private static final Logger LOG = LoggerFactory.getLogger(PostgresSQLSourceTest.class); + private static final String SLOT_NAME = "flink"; + // These tests only passes at the docker postgres:9.6 + private static final PostgreSQLContainer POSTGRES_CONTAINER_OLD = + new PostgreSQLContainer<>( + DockerImageName.parse("debezium/postgres:9.6") + .asCompatibleSubstituteFor("postgres")) + .withDatabaseName(DEFAULT_DB) + .withUsername("postgres") + .withPassword("postgres") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeClass + public static void startAll() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(POSTGRES_CONTAINER_OLD)).join(); + LOG.info("Containers are started."); + } + + @Before + public void before() { + initializePostgresTable(POSTGRES_CONTAINER_OLD, "inventory"); + } + + @Test + public void testConsumingAllEvents() throws Exception { + DebeziumSourceFunction source = createPostgreSqlSourceWithHeartbeatDisabled(); + TestSourceContext sourceContext = new TestSourceContext<>(); + + setupSource(source); + + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + // start the source + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + List records = drain(sourceContext, 9); + assertEquals(9, records.size()); + for (int i = 0; i < records.size(); i++) { + assertRead(records.get(i), "id", 101 + i); + } + + statement.execute( + "INSERT INTO inventory.products VALUES (default,'robot','Toy robot',1.304)"); // 110 + records = drain(sourceContext, 1); + assertInsert(records.get(0), "id", 110); + + statement.execute( + "INSERT INTO inventory.products VALUES (1001,'roy','old robot',1234.56)"); // 1001 + records = drain(sourceContext, 1); + assertInsert(records.get(0), "id", 1001); + + // --------------------------------------------------------------------------------------------------------------- + // Changing the primary key of a row should result in 2 events: INSERT, DELETE + // (TOMBSTONE is dropped) + // --------------------------------------------------------------------------------------------------------------- + statement.execute( + "UPDATE inventory.products SET id=2001, description='really old robot' WHERE id=1001"); + records = drain(sourceContext, 2); + assertDelete(records.get(0), "id", 1001); + assertInsert(records.get(1), "id", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Simple UPDATE (with no schema changes) + // --------------------------------------------------------------------------------------------------------------- + statement.execute("UPDATE inventory.products SET weight=1345.67 WHERE id=2001"); + records = drain(sourceContext, 1); + assertUpdate(records.get(0), "id", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Change our schema with a fully-qualified name; we should still see this event + // --------------------------------------------------------------------------------------------------------------- + // Add a column with default to the 'products' table and explicitly update one record + // ... + statement.execute( + "ALTER TABLE inventory.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL"); + statement.execute("UPDATE inventory.products SET volume=13.5 WHERE id=2001"); + records = drain(sourceContext, 1); + assertUpdate(records.get(0), "id", 2001); + + // cleanup + source.cancel(); + source.close(); + runThread.sync(); + } + } + + @Test + public void testCheckpointAndRestore() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + int prevLsn = 0; + { + // --------------------------------------------------------------------------- + // Step-1: start the source from empty state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source = + createPostgreSqlSourceWithHeartbeatDisabled(); + // we use blocking context to block the source to emit before last snapshot record + final BlockingSourceContext sourceContext = + new BlockingSourceContext<>(8); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until consumer is started + int received = drain(sourceContext, 2).size(); + assertEquals(2, received); + + // we can't perform checkpoint during DB snapshot + assertFalse( + waitForCheckpointLock( + sourceContext.getCheckpointLock(), Duration.ofSeconds(3))); + + // unblock the source context to continue the processing + sourceContext.blocker.release(); + // wait until the source finishes the database snapshot + List records = drain(sourceContext, 9 - received); + assertEquals(9, records.size() + received); + + // state is still empty + assertEquals(0, offsetState.list.size()); + assertEquals(0, historyState.list.size()); + + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 after snapshot finished + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("557", JsonPath.read(state, "$.sourceOffset.txId").toString()); + assertEquals( + "true", JsonPath.read(state, "$.sourceOffset.last_snapshot_record").toString()); + assertEquals("true", JsonPath.read(state, "$.sourceOffset.snapshot").toString()); + assertTrue(state.contains("ts_usec")); + int lsn = JsonPath.read(state, "$.sourceOffset.lsn"); + assertTrue(lsn > prevLsn); + prevLsn = lsn; + + source.cancel(); + source.close(); + runThread.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-3: restore the source from state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source2 = + createPostgreSqlSourceWithHeartbeatDisabled(); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + setupSource(source2, true, offsetState, historyState, true, 0, 1); + final CheckedThread runThread2 = + new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread2.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2)); + + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO inventory.products VALUES (default,'robot','Toy robot',1.304)"); // 110 + List records = drain(sourceContext2, 1); + assertEquals(1, records.size()); + assertInsert(records.get(0), "id", 110); + + // --------------------------------------------------------------------------- + // Step-4: trigger checkpoint-2 during DML operations + // --------------------------------------------------------------------------- + synchronized (sourceContext2.getCheckpointLock()) { + // trigger checkpoint-1 + source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138)); + } + + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals( + "postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("558", JsonPath.read(state, "$.sourceOffset.txId").toString()); + assertTrue(state.contains("ts_usec")); + assertFalse(state.contains("snapshot")); + int lsn = JsonPath.read(state, "$.sourceOffset.lsn"); + assertTrue(lsn > prevLsn); + prevLsn = lsn; + + // execute 2 more DMLs to have more binlog + statement.execute( + "INSERT INTO inventory.products VALUES (1001,'roy','old robot',1234.56)"); // 1001 + statement.execute("UPDATE inventory.products SET weight=1345.67 WHERE id=1001"); + } + + // cancel the source + source2.cancel(); + source2.close(); + runThread2.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-5: restore the source from checkpoint-2 + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source3 = + createPostgreSqlSourceWithHeartbeatDisabled(); + final TestSourceContext sourceContext3 = new TestSourceContext<>(); + setupSource(source3, true, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread3 = + new CheckedThread() { + @Override + public void go() throws Exception { + source3.run(sourceContext3); + } + }; + runThread3.start(); + + // consume the unconsumed binlog + List records = drain(sourceContext3, 2); + assertInsert(records.get(0), "id", 1001); + assertUpdate(records.get(1), "id", 1001); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3)); + + // can continue to receive new events + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + statement.execute("DELETE FROM inventory.products WHERE id=1001"); + } + records = drain(sourceContext3, 1); + assertDelete(records.get(0), "id", 1001); + + // --------------------------------------------------------------------------- + // Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints + // --------------------------------------------------------------------------- + synchronized (sourceContext3.getCheckpointLock()) { + // checkpoint 3 + source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233)); + } + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString()); + assertTrue(state.contains("ts_usec")); + assertFalse(state.contains("snapshot")); + int lsn = JsonPath.read(state, "$.sourceOffset.lsn"); + assertTrue(lsn > prevLsn); + + source3.cancel(); + source3.close(); + runThread3.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-7: restore the source from checkpoint-3 + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source4 = + createPostgreSqlSourceWithHeartbeatDisabled(); + final TestSourceContext sourceContext4 = new TestSourceContext<>(); + setupSource(source4, true, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread4 = + new CheckedThread() { + @Override + public void go() throws Exception { + source4.run(sourceContext4); + } + }; + runThread4.start(); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext4)); + + // --------------------------------------------------------------------------- + // Step-8: trigger checkpoint-2 to make sure we can continue to to further checkpoints + // --------------------------------------------------------------------------- + synchronized (sourceContext4.getCheckpointLock()) { + // checkpoint 3 + source4.snapshotState(new StateSnapshotContextSynchronousImpl(254, 254)); + } + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("561", JsonPath.read(state, "$.sourceOffset.txId").toString()); + assertTrue(state.contains("ts_usec")); + assertFalse(state.contains("snapshot")); + int lsn = JsonPath.read(state, "$.sourceOffset.lsn"); + assertTrue(lsn > prevLsn); + prevLsn = lsn; + + source4.cancel(); + source4.close(); + runThread4.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-9: insert partial and alter table + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source5 = + createPostgreSqlSourceWithHeartbeatDisabled(); + final TestSourceContext sourceContext5 = new TestSourceContext<>(); + setupSource(source5, true, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread5 = + new CheckedThread() { + @Override + public void go() throws Exception { + source5.run(sourceContext5); + } + }; + runThread5.start(); + + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO inventory.products(id, description, weight) VALUES (default, 'Go go go', 111.1)"); + statement.execute( + "ALTER TABLE inventory.products ADD comment_col VARCHAR(100) DEFAULT 'cdc'"); + List records = drain(sourceContext5, 1); + assertInsert(records.get(0), "id", 111); + } + + // --------------------------------------------------------------------------- + // Step-10: trigger checkpoint-4 + // --------------------------------------------------------------------------- + synchronized (sourceContext5.getCheckpointLock()) { + // trigger checkpoint-4 + source5.snapshotState(new StateSnapshotContextSynchronousImpl(300, 300)); + } + assertEquals(1, offsetState.list.size()); + String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8); + assertEquals("postgres_cdc_source", JsonPath.read(state, "$.sourcePartition.server")); + assertEquals("562", JsonPath.read(state, "$.sourceOffset.txId").toString()); + assertTrue(state.contains("ts_usec")); + assertFalse(state.contains("snapshot")); + int pos = JsonPath.read(state, "$.sourceOffset.lsn"); + assertTrue(pos > prevLsn); + + source5.cancel(); + source5.close(); + runThread5.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-11: restore from the checkpoint-4 and insert the partial value + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source6 = + createPostgreSqlSourceWithHeartbeatDisabled(); + final TestSourceContext sourceContext6 = new TestSourceContext<>(); + setupSource(source6, true, offsetState, historyState, true, 0, 1); + + // restart the source + final CheckedThread runThread6 = + new CheckedThread() { + @Override + public void go() throws Exception { + source6.run(sourceContext6); + } + }; + runThread6.start(); + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO inventory.products(id, description, weight) VALUES (default, 'Run!', 22.2)"); + List records = drain(sourceContext6, 1); + assertInsert(records.get(0), "id", 112); + } + + source6.cancel(); + source6.close(); + runThread6.sync(); + } + } + + @Test + public void testFlushLsn() throws Exception { + final TestingListState offsetState = new TestingListState<>(); + final TestingListState historyState = new TestingListState<>(); + final LinkedHashSet flushLsn = new LinkedHashSet<>(); + { + // --------------------------------------------------------------------------- + // Step-1: start the source from empty state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source = + createPostgreSqlSourceWithHeartbeatEnabled(); + final TestSourceContext sourceContext = new TestSourceContext<>(); + // setup source with empty state + setupSource(source, false, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + // wait until consumer is started + int received = drain(sourceContext, 9).size(); + assertEquals(9, received); + + // --------------------------------------------------------------------------- + // Step-2: trigger checkpoint-1 after snapshot finished + // --------------------------------------------------------------------------- + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101)); + } + source.notifyCheckpointComplete(101); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(5, source, sourceContext, 201); + assertEquals(1, source.getPendingOffsetsToCommit().size()); + source.notifyCheckpointComplete(201); + assertEquals(0, source.getPendingOffsetsToCommit().size()); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(1, source, sourceContext, 301); + // do not notify checkpoint complete to see the LSN is not advanced. + assertFalse(flushLsn.add(getConfirmedFlushLsn())); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext)); + + source.cancel(); + source.close(); + runThread.sync(); + } + + { + // --------------------------------------------------------------------------- + // Step-3: restore the source from state + // --------------------------------------------------------------------------- + final DebeziumSourceFunction source2 = + createPostgreSqlSourceWithHeartbeatEnabled(); + final TestSourceContext sourceContext2 = new TestSourceContext<>(); + // setup source with empty state + setupSource(source2, true, offsetState, historyState, true, 0, 1); + + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source2.run(sourceContext2); + } + }; + runThread.start(); + + assertFalse(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(0, source2, sourceContext2, 401); + Thread.sleep(3_000); // waiting heartbeat events, we have set 1s heartbeat interval + // trigger checkpoint once again to make sure ChangeConsumer is initialized + batchInsertAndCheckpoint(0, source2, sourceContext2, 402); + source2.notifyCheckpointComplete(402); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + // verify LSN is advanced even if there is no changes on the table + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + // we have to do some transactions which is not related to the monitored table + statement.execute("CREATE TABLE dummy (a int)"); + } + Thread.sleep(3_000); + batchInsertAndCheckpoint(0, source2, sourceContext2, 404); + source2.notifyCheckpointComplete(404); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + + batchInsertAndCheckpoint(3, source2, sourceContext2, 501); + batchInsertAndCheckpoint(2, source2, sourceContext2, 502); + batchInsertAndCheckpoint(1, source2, sourceContext2, 503); + assertEquals(3, source2.getPendingOffsetsToCommit().size()); + source2.notifyCheckpointComplete(503); + assertTrue(flushLsn.add(getConfirmedFlushLsn())); + assertEquals(0, source2.getPendingOffsetsToCommit().size()); + + // make sure there is no more events + assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext2)); + + source2.cancel(); + source2.close(); + runThread.sync(); + } + + assertEquals(5, flushLsn.size()); + } + + private void batchInsertAndCheckpoint( + int num, + DebeziumSourceFunction source, + TestSourceContext sourceContext, + long checkpointId) + throws Exception { + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + for (int i = 0; i < num; i++) { + statement.execute( + "INSERT INTO inventory.products VALUES (default,'dummy','My Dummy',1.1)"); + } + } + assertEquals(num, drain(sourceContext, num).size()); + synchronized (sourceContext.getCheckpointLock()) { + // trigger checkpoint-1 + source.snapshotState( + new StateSnapshotContextSynchronousImpl(checkpointId, checkpointId)); + } + } + + // ------------------------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------------------------ + + private DebeziumSourceFunction createPostgreSqlSourceWithHeartbeatDisabled() { + return createPostgreSqlSource(0); + } + + private DebeziumSourceFunction createPostgreSqlSourceWithHeartbeatEnabled() { + return createPostgreSqlSource(1000); + } + + private DebeziumSourceFunction createPostgreSqlSource(int heartbeatInterval) { + Properties properties = new Properties(); + properties.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval)); + return PostgreSQLSource.builder() + .hostname(POSTGRES_CONTAINER_OLD.getHost()) + .port(POSTGRES_CONTAINER_OLD.getMappedPort(POSTGRESQL_PORT)) + .database(POSTGRES_CONTAINER_OLD.getDatabaseName()) + .username(POSTGRES_CONTAINER_OLD.getUsername()) + .password(POSTGRES_CONTAINER_OLD.getPassword()) + .schemaList("inventory") + .tableList("inventory.products") + .deserializer(new ForwardDeserializeSchema()) + .slotName(SLOT_NAME) + .debeziumProperties(properties) + .build(); + } + + private String getConfirmedFlushLsn() throws SQLException { + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER_OLD); + Statement statement = connection.createStatement()) { + ResultSet rs = + statement.executeQuery( + String.format( + "select * from pg_replication_slots where slot_name = '%s' and database = '%s' and plugin = '%s'", + SLOT_NAME, + POSTGRES_CONTAINER_OLD.getDatabaseName(), + "decoderbufs")); + if (rs.next()) { + return rs.getString("confirmed_flush_lsn"); + } else { + fail("No replication slot info available"); + } + return null; + } + } + + private List drain(TestSourceContext sourceContext, int expectedRecordCount) + throws Exception { + List allRecords = new ArrayList<>(); + LinkedBlockingQueue> queue = sourceContext.getCollectedOutputs(); + while (allRecords.size() < expectedRecordCount) { + StreamRecord record = queue.poll(100, TimeUnit.SECONDS); + if (record != null) { + allRecords.add(record.getValue()); + } else { + throw new RuntimeException( + "Can't receive " + expectedRecordCount + " elements before timeout."); + } + } + + return allRecords; + } + + private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) + throws Exception { + final Semaphore semaphore = new Semaphore(0); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute( + () -> { + synchronized (checkpointLock) { + semaphore.release(); + } + }); + boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS); + executor.shutdownNow(); + return result; + } + + /** + * Wait for a maximum amount of time until the first record is available. + * + * @param timeout the maximum amount of time to wait; must not be negative + * @return {@code true} if records are available, or {@code false} if the timeout occurred and + * no records are available + */ + private boolean waitForAvailableRecords(Duration timeout, TestSourceContext sourceContext) + throws InterruptedException { + long now = System.currentTimeMillis(); + long stop = now + timeout.toMillis(); + while (System.currentTimeMillis() < stop) { + if (!sourceContext.getCollectedOutputs().isEmpty()) { + break; + } + Thread.sleep(10); // save CPU + } + return !sourceContext.getCollectedOutputs().isEmpty(); + } + + private static void setupSource(DebeziumSourceFunction source) throws Exception { + setupSource( + source, false, null, null, + true, // enable checkpointing; auto commit should be ignored + 0, 1); + } + + private static void setupSource( + DebeziumSourceFunction source, + boolean isRestored, + ListState restoredOffsetState, + ListState restoredHistoryState, + boolean isCheckpointingEnabled, + int subtaskIndex, + int totalNumSubtasks) + throws Exception { + + // run setup procedure in operator life cycle + source.setRuntimeContext( + new MockStreamingRuntimeContext( + isCheckpointingEnabled, totalNumSubtasks, subtaskIndex)); + source.initializeState( + new MockFunctionInitializationContext( + isRestored, + new MockOperatorStateStore(restoredOffsetState, restoredHistoryState))); + source.open(new Configuration()); + } + + private static class ForwardDeserializeSchema + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 2975058057832211228L; + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + out.collect(record); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(SourceRecord.class); + } + } + + private static class MockOperatorStateStore implements OperatorStateStore { + + private final ListState restoredOffsetListState; + private final ListState restoredHistoryListState; + + private MockOperatorStateStore( + ListState restoredOffsetListState, ListState restoredHistoryListState) { + this.restoredOffsetListState = restoredOffsetListState; + this.restoredHistoryListState = restoredHistoryListState; + } + + @Override + @SuppressWarnings("unchecked") + public ListState getUnionListState(ListStateDescriptor stateDescriptor) + throws Exception { + if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) { + return (ListState) restoredOffsetListState; + } else if (stateDescriptor + .getName() + .equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) { + return (ListState) restoredHistoryListState; + } else { + throw new IllegalStateException("Unknown state."); + } + } + + @Override + public BroadcastState getBroadcastState( + MapStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ListState getListState(ListStateDescriptor stateDescriptor) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredStateNames() { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredBroadcastStateNames() { + throw new UnsupportedOperationException(); + } + } + + private static class MockFunctionInitializationContext + implements FunctionInitializationContext { + + private final boolean isRestored; + private final OperatorStateStore operatorStateStore; + + private MockFunctionInitializationContext( + boolean isRestored, OperatorStateStore operatorStateStore) { + this.isRestored = isRestored; + this.operatorStateStore = operatorStateStore; + } + + @Override + public boolean isRestored() { + return isRestored; + } + + @Override + public OptionalLong getRestoredCheckpointId() { + throw new UnsupportedOperationException(); + } + + @Override + public OperatorStateStore getOperatorStateStore() { + return operatorStateStore; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + throw new UnsupportedOperationException(); + } + } + + private static class BlockingSourceContext extends TestSourceContext { + + private final Semaphore blocker = new Semaphore(0); + private final int expectedCount; + private int currentCount = 0; + + private BlockingSourceContext(int expectedCount) { + this.expectedCount = expectedCount; + } + + @Override + public void collect(T t) { + super.collect(t); + currentCount++; + if (currentCount == expectedCount) { + try { + // block the source to emit records + blocker.acquire(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private static final class TestingListState implements ListState { + + private final List list = new ArrayList<>(); + private boolean clearCalled = false; + + @Override + public void clear() { + list.clear(); + clearCalled = true; + } + + @Override + public Iterable get() throws Exception { + return list; + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + list.add(value); + } + + public List getList() { + return list; + } + + boolean isClearCalled() { + return clearCalled; + } + + @Override + public void update(List values) throws Exception { + clear(); + + addAll(values); + } + + @Override + public void addAll(List values) throws Exception { + if (values != null) { + values.forEach( + v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState.")); + + list.addAll(values); + } + } + } +} diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java index 8b496ea9d..06fe1b3c1 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/PostgresTestBase.java @@ -39,6 +39,7 @@ import java.sql.Statement; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -58,9 +59,9 @@ public abstract class PostgresTestBase extends AbstractTestBase { // use newer version of postgresql image to support pgoutput plugin // when testing postgres 13, only 13-alpine supports both amd64 and arm64 protected static final DockerImageName PG_IMAGE = - DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"); + DockerImageName.parse("debezium/postgres:9.6").asCompatibleSubstituteFor("postgres"); - public static final PostgreSQLContainer POSTGERS_CONTAINER = + public static final PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>(PG_IMAGE) .withDatabaseName(DEFAULT_DB) .withUsername("postgres") @@ -72,41 +73,43 @@ public abstract class PostgresTestBase extends AbstractTestBase { // default "fsync=off", "-c", - // to ensure that the slot becomes inactive during the failover - "wal_sender_timeout=1000", - "-c", "max_replication_slots=20"); @BeforeClass public static void startContainers() { LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(POSTGERS_CONTAINER)).join(); + Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join(); LOG.info("Containers are started."); } - protected Connection getJdbcConnection() throws SQLException { + protected Connection getJdbcConnection(PostgreSQLContainer container) throws SQLException { return DriverManager.getConnection( - POSTGERS_CONTAINER.getJdbcUrl(), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword()); + container.getJdbcUrl(), container.getUsername(), container.getPassword()); } - public static Connection getJdbcConnection(String databaseName) throws SQLException { + public static Connection getJdbcConnection(PostgreSQLContainer container, String databaseName) + throws SQLException { return DriverManager.getConnection( - POSTGERS_CONTAINER.withDatabaseName(databaseName).getJdbcUrl(), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword()); + container.withDatabaseName(databaseName).getJdbcUrl(), + container.getUsername(), + container.getPassword()); + } + + public static String getSlotName() { + final Random random = new Random(); + int id = random.nextInt(10000); + return "flink_" + id; } /** * Executes a JDBC statement using the default jdbc config without autocommitting the * connection. */ - protected void initializePostgresTable(String sqlFile) { + protected void initializePostgresTable(PostgreSQLContainer container, String sqlFile) { final String ddlFile = String.format("ddl/%s.sql", sqlFile); final URL ddlTestFile = PostgresTestBase.class.getClassLoader().getResource(ddlFile); assertNotNull("Cannot locate " + ddlFile, ddlTestFile); - try (Connection connection = getJdbcConnection(); + try (Connection connection = getJdbcConnection(container); Statement statement = connection.createStatement()) { final List statements = Arrays.stream( diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceExampleTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceExampleTest.java index 849e12ef1..8b70a069c 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceExampleTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceExampleTest.java @@ -83,11 +83,11 @@ public class PostgresSourceExampleTest extends PostgresTestBase { // 9 records in the inventory.products table private final UniqueDatabase inventoryDatabase = new UniqueDatabase( - POSTGERS_CONTAINER, + POSTGRES_CONTAINER, DB_NAME_PREFIX, SCHEMA_NAME, - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword()); + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); @Test @Ignore("Test ignored because it won't stop and is used for manual test") @@ -100,13 +100,13 @@ public class PostgresSourceExampleTest extends PostgresTestBase { JdbcIncrementalSource postgresIncrementalSource = PostgresSourceBuilder.PostgresIncrementalSource.builder() - .hostname(POSTGERS_CONTAINER.getHost()) - .port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .hostname(POSTGRES_CONTAINER.getHost()) + .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)) .database(inventoryDatabase.getDatabaseName()) .schemaList(SCHEMA_NAME) .tableList(TABLE_ID) - .username(POSTGERS_CONTAINER.getUsername()) - .password(POSTGERS_CONTAINER.getPassword()) + .username(POSTGRES_CONTAINER.getUsername()) + .password(POSTGRES_CONTAINER.getPassword()) .slotName(SLOT_NAME) .decodingPluginName(PLUGIN_NAME) .deserializer(deserializer) @@ -130,6 +130,7 @@ public class PostgresSourceExampleTest extends PostgresTestBase { } @Test + @Ignore public void testConsumingAllEvents() throws Exception { final DataType dataType = DataTypes.ROW( @@ -144,13 +145,13 @@ public class PostgresSourceExampleTest extends PostgresTestBase { JdbcIncrementalSource postgresIncrementalSource = PostgresSourceBuilder.PostgresIncrementalSource.builder() - .hostname(POSTGERS_CONTAINER.getHost()) - .port(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .hostname(POSTGRES_CONTAINER.getHost()) + .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)) .database(inventoryDatabase.getDatabaseName()) .schemaList(SCHEMA_NAME) .tableList(TABLE_ID) - .username(POSTGERS_CONTAINER.getUsername()) - .password(POSTGERS_CONTAINER.getPassword()) + .username(POSTGRES_CONTAINER.getUsername()) + .password(POSTGRES_CONTAINER.getPassword()) .slotName(SLOT_NAME) .decodingPluginName(PLUGIN_NAME) .deserializer(buildRowDataDebeziumDeserializeSchema(dataType)) @@ -247,8 +248,8 @@ public class PostgresSourceExampleTest extends PostgresTestBase { private PostgresConnection getConnection() throws SQLException { Map properties = new HashMap<>(); - properties.put("hostname", POSTGERS_CONTAINER.getHost()); - properties.put("port", String.valueOf(POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT))); + properties.put("hostname", POSTGRES_CONTAINER.getHost()); + properties.put("port", String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))); properties.put("dbname", inventoryDatabase.getDatabaseName()); properties.put("user", inventoryDatabase.getUsername()); properties.put("password", inventoryDatabase.getPassword()); @@ -295,9 +296,9 @@ public class PostgresSourceExampleTest extends PostgresTestBase { SlotState slotState = connection.getReplicationSlotState(SLOT_NAME, PLUGIN_NAME); while (slotState == null) { - log.info("slot state is null, wait a little bit"); + log.info("Waiting until the replication slot is ready ..."); try { - Thread.sleep(1000L); + Thread.sleep(2000L); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java index 0132c0b37..b5e7638af 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -46,7 +46,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -86,11 +85,11 @@ public class PostgresSourceITCase extends PostgresTestBase { private final UniqueDatabase customDatabase = new UniqueDatabase( - POSTGERS_CONTAINER, + POSTGRES_CONTAINER, DB_NAME_PREFIX, SCHEMA_NAME, - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword()); + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); /** First part stream events, which is made by {@link #makeFirstPartStreamEvents}. */ private final List firstPartStreamEvents = @@ -365,17 +364,13 @@ public class PostgresSourceITCase extends PostgresTestBase { expectedStreamData.addAll(firstPartStreamEvents); expectedStreamData.addAll(secondPartStreamEvents); } + // wait for the stream reading + Thread.sleep(2000L); assertEqualsInAnyOrder(expectedStreamData, fetchRows(iterator, expectedStreamData.size())); assertTrue(!hasNextData(iterator)); } - private String getSlotName() { - final Random random = new Random(); - int id = random.nextInt(10000); - return "flink_" + id; - } - private void sleepMs(long millis) { try { Thread.sleep(millis); diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 58716b1df..7946936ab 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.postgres.table; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; @@ -27,7 +28,6 @@ import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.connectors.postgres.PostgresTestBase; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -47,10 +47,8 @@ import static org.junit.Assert.assertTrue; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; /** Integration tests for PostgreSQL Table source. */ -@Ignore @RunWith(Parameterized.class) public class PostgreSQLConnectorITCase extends PostgresTestBase { - private static final String SLOT_NAME = "flinktest"; private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -74,6 +72,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { @Before public void before() { TestValuesTableFactory.clearAllData(); + env.setRestartStrategy(RestartStrategies.noRestart()); if (parallelismSnapshot) { env.setParallelism(4); env.enableCheckpointing(200); @@ -85,7 +84,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { @Test public void testConsumingAllEvents() throws SQLException, ExecutionException, InterruptedException { - initializePostgresTable("inventory"); + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -105,15 +104,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'slot.name' = '%s'" + ")", - POSTGERS_CONTAINER.getHost(), - POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword(), - POSTGERS_CONTAINER.getDatabaseName(), + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", parallelismSnapshot, - SLOT_NAME); + getSlotName()); String sinkDDL = "CREATE TABLE sink (" + " name STRING," @@ -134,7 +133,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { waitForSnapshotStarted("sink"); - try (Connection connection = getJdbcConnection(); + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); + + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( @@ -191,9 +194,82 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testStartupFromLatestOffset() throws Exception { + if (!parallelismSnapshot) { + return; + } + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'slot.name' = '%s'," + + " 'scan.startup.mode' = 'latest-offset'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "products", + parallelismSnapshot, + getSlotName()); + String sinkDDL = + "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + // wait for the source startup, we don't have a better way to wait it, use sleep for now + Thread.sleep(10000L); + + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM inventory.products WHERE id=111;"); + } + + waitForSinkSize("sink", 5); + + String[] expected = + new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + @Test public void testExceptionForReplicaIdentity() throws Exception { - initializePostgresTable("replica_identity"); + initializePostgresTable(POSTGRES_CONTAINER, "replica_identity"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -213,15 +289,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'slot.name' = '%s'" + ")", - POSTGERS_CONTAINER.getHost(), - POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword(), - POSTGERS_CONTAINER.getDatabaseName(), + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", parallelismSnapshot, - "replica_identity_slot"); + getSlotName()); String sinkDDL = "CREATE TABLE sink (" + " name STRING," @@ -241,7 +317,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name"); waitForSnapshotStarted("sink"); - try (Connection connection = getJdbcConnection(); + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); + + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( @@ -271,7 +351,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { @Test public void testAllTypes() throws Throwable { - initializePostgresTable("column_type_test"); + initializePostgresTable(POSTGRES_CONTAINER, "column_type_test"); String sourceDDL = String.format( @@ -309,15 +389,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'slot.name' = '%s'" + ")", - POSTGERS_CONTAINER.getHost(), - POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword(), - POSTGERS_CONTAINER.getDatabaseName(), + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), "inventory", "full_types", parallelismSnapshot, - SLOT_NAME); + getSlotName()); String sinkDDL = "CREATE TABLE sink (" + " id INTEGER NOT NULL," @@ -340,7 +420,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " time_c TIME(0)," + " default_numeric_c DECIMAL," + " geography_c STRING," - + " geometry_c STRING" + + " geometry_c STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'values'," + " 'sink-insert-only' = 'false'" @@ -351,9 +432,12 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { // async submit job TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types"); - waitForSnapshotStarted("sink"); + waitForSinkSize("sink", 1); + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); - try (Connection connection = getJdbcConnection(); + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute("UPDATE inventory.full_types SET small_c=0 WHERE id=1;"); } @@ -363,8 +447,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { List expected = Arrays.asList( "+I(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", - "-U(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", - "+U(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"); + "-D(1,[50],32767,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})", + "+I(1,[50],0,65535,2147483647,5.5,6.6,123.12345,404.4,true,Hello World,a,abc,abcd..xyz,2020-07-17T18:00:22.123,2020-07-17T18:00:22.123456,2020-07-17,18:00:22,500,{\"hexewkb\":\"0105000020e610000001000000010200000002000000a779c7293a2465400b462575025a46c0c66d3480b7fc6440c3d32b65195246c0\",\"srid\":4326},{\"hexewkb\":\"0101000020730c00001c7c613255de6540787aa52c435c42c0\",\"srid\":3187})"); List actual = TestValuesTableFactory.getRawResults("sink"); assertEquals(expected, actual); @@ -373,7 +457,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { @Test public void testMetadataColumns() throws Throwable { - initializePostgresTable("inventory"); + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -397,15 +481,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'slot.name' = '%s'" + ")", - POSTGERS_CONTAINER.getHost(), - POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword(), - POSTGERS_CONTAINER.getDatabaseName(), + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", parallelismSnapshot, - "meta_data_slot"); + getSlotName()); String sinkDDL = "CREATE TABLE sink (" @@ -429,8 +513,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); waitForSnapshotStarted("sink"); + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); - try (Connection connection = getJdbcConnection(); + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( @@ -476,7 +563,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { @Test public void testUpsertMode() throws Exception { - initializePostgresTable("replica_identity"); + initializePostgresTable(POSTGRES_CONTAINER, "replica_identity"); String sourceDDL = String.format( "CREATE TABLE debezium_source (" @@ -498,14 +585,14 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'changelog-mode' = '%s'" + ")", - POSTGERS_CONTAINER.getHost(), - POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), - POSTGERS_CONTAINER.getUsername(), - POSTGERS_CONTAINER.getPassword(), - POSTGERS_CONTAINER.getDatabaseName(), + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), "inventory", "products", - "replica_identity_slot", + getSlotName(), parallelismSnapshot, "upsert"); String sinkDDL = @@ -527,7 +614,11 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name"); waitForSnapshotStarted("sink"); - try (Connection connection = getJdbcConnection(); + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); + + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( @@ -586,7 +677,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { while (sinkSize(sinkName) == 0) { - Thread.sleep(100); + Thread.sleep(300); } } diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java index 7f4d5e8c4..ef41142c0 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/testutils/UniqueDatabase.java @@ -108,7 +108,7 @@ public class UniqueDatabase { private void createDatabase(String databaseName) throws SQLException { try (Connection connection = - PostgresTestBase.getJdbcConnection(PostgresTestBase.DEFAULT_DB)) { + PostgresTestBase.getJdbcConnection(container, PostgresTestBase.DEFAULT_DB)) { try (Statement statement = connection.createStatement()) { statement.execute("CREATE DATABASE " + databaseName); } @@ -123,7 +123,8 @@ public class UniqueDatabase { try { createDatabase(databaseName); - try (Connection connection = PostgresTestBase.getJdbcConnection(databaseName); + try (Connection connection = + PostgresTestBase.getJdbcConnection(container, databaseName); Statement statement = connection.createStatement()) { final List statements = Arrays.stream(