diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index b0f50f7b2..a7111a659 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -171,20 +171,13 @@ public class RecordUtils { binlog.key(), binlog.valueSchema(), envelope.read(updateAfter, source, ts)); - normalizedBinlogRecords.add(record); + snapshotRecords.put(key, record); break; case DELETE: - if (snapshotRecords.containsKey(key)) { - snapshotRecords.remove(key); - } else { - throw new IllegalStateException( - String.format( - "The delete record %s doesn't exists in table split %s.", - binlog, split)); - } + snapshotRecords.remove(key); break; case CREATE: - normalizedBinlogRecords.add(binlog); + snapshotRecords.put(key, binlog); break; case READ: throw new IllegalStateException( diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 3d07f4c6d..2ea3bb6be 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -26,6 +26,8 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.apache.flink.core.io.InputStatus; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.util.Collector; @@ -33,10 +35,12 @@ import org.apache.flink.util.Collector; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner; +import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import com.ververica.cdc.connectors.mysql.table.StartupOptions; @@ -50,20 +54,32 @@ import io.debezium.relational.history.TableChanges; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; +import java.sql.Connection; import java.sql.SQLException; +import java.sql.Statement; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; /** Tests for {@link MySqlSourceReader}. */ public class MySqlSourceReaderTest extends MySqlSourceTestBase { private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); @Test public void testBinlogReadFailoverCrossTransaction() throws Exception { @@ -126,10 +142,110 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase { restartReader.close(); } + @Test + public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception { + inventoryDatabase.createAndInitialize(); + String tableName = inventoryDatabase.getDatabaseName() + ".products"; + // use default split size which is large to make sure we only have one snapshot split + final MySqlSourceConfig sourceConfig = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(tableName) + .includeSchemaChanges(false) + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(customerDatabase.getUsername()) + .password(customerDatabase.getPassword()) + .serverTimeZone(ZoneId.of("UTC").toString()) + .createConfig(0); + final MySqlSnapshotSplitAssigner assigner = + new MySqlSnapshotSplitAssigner( + sourceConfig, + DEFAULT_PARALLELISM, + Collections.singletonList(TableId.parse(tableName)), + false); + assigner.open(); + MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit) assigner.getNext().get(); + // should contain only one split + assertFalse(assigner.getNext().isPresent()); + // and the split is a full range one + assertNull(snapshotSplit.getSplitStart()); + assertNull(snapshotSplit.getSplitEnd()); + + final AtomicBoolean finishReading = new AtomicBoolean(false); + final CountDownLatch updatingExecuted = new CountDownLatch(1); + TestingReaderContext testingReaderContext = new TestingReaderContext(); + MySqlSourceReader reader = createReader(sourceConfig, testingReaderContext); + reader.start(); + + Thread updateWorker = + new Thread( + () -> { + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + boolean flagSet = false; + while (!finishReading.get()) { + statement.execute( + "UPDATE products SET description='" + + UUID.randomUUID().toString() + + "' WHERE id=101"); + if (!flagSet) { + updatingExecuted.countDown(); + flagSet = true; + } + } + } catch (Exception throwables) { + throwables.printStackTrace(); + } + }); + + // start to keep updating the products table + updateWorker.start(); + // wait until the updating executed + updatingExecuted.await(); + // start to read chunks of the products table + reader.addSplits(Collections.singletonList(snapshotSplit)); + reader.notifyNoMoreSplits(); + + TestingReaderOutput output = new TestingReaderOutput<>(); + while (true) { + InputStatus status = reader.pollNext(output); + if (status == InputStatus.END_OF_INPUT) { + break; + } + if (status == InputStatus.NOTHING_AVAILABLE) { + reader.isAvailable().get(); + } + } + // stop the updating worker + finishReading.set(true); + updateWorker.join(); + + // check the result + ArrayList emittedRecords = output.getEmittedRecords(); + Map recordByKey = new HashMap<>(); + for (SourceRecord record : emittedRecords) { + SourceRecord existed = recordByKey.get(record.key()); + if (existed != null) { + fail( + String.format( + "The emitted record contains duplicate records on key\n%s\n%s\n", + existed, record)); + } else { + recordByKey.put(record.key(), record); + } + } + } + private MySqlSourceReader createReader(MySqlSourceConfig configuration) { + return createReader(configuration, new TestingReaderContext()); + } + + private MySqlSourceReader createReader( + MySqlSourceConfig configuration, SourceReaderContext readerContext) { final FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); - final SourceReaderContext readerContext = new TestingReaderContext(); final MySqlRecordEmitter recordEmitter = new MySqlRecordEmitter<>( new ForwardDeserializeSchema(),