From cf50566f34277a8185926945301f2cb451295e68 Mon Sep 17 00:00:00 2001
From: Tuple <zhaojunwang.zjw@alibaba-inc.com>
Date: Fri, 26 Nov 2021 19:49:52 +0800
Subject: [PATCH] [mysql] Fix there is duplicate records outputted when keep
 updating table during snapshot read (#592)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元组 <zhaojunwang.zjw@alibaba-inc.com>
---
 .../mysql/source/utils/RecordUtils.java       |  13 +-
 .../source/reader/MySqlSourceReaderTest.java  | 118 +++++++++++++++++-
 2 files changed, 120 insertions(+), 11 deletions(-)

diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java
index b0f50f7b2..a7111a659 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -171,20 +171,13 @@ public class RecordUtils {
                                             binlog.key(),
                                             binlog.valueSchema(),
                                             envelope.read(updateAfter, source, ts));
-                            normalizedBinlogRecords.add(record);
+                            snapshotRecords.put(key, record);
                             break;
                         case DELETE:
-                            if (snapshotRecords.containsKey(key)) {
-                                snapshotRecords.remove(key);
-                            } else {
-                                throw new IllegalStateException(
-                                        String.format(
-                                                "The delete record %s doesn't exists in table split %s.",
-                                                binlog, split));
-                            }
+                            snapshotRecords.remove(key);
                             break;
                         case CREATE:
-                            normalizedBinlogRecords.add(binlog);
+                            snapshotRecords.put(key, binlog);
                             break;
                         case READ:
                             throw new IllegalStateException(
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 3d07f4c6d..2ea3bb6be 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Collector;
@@ -33,10 +35,12 @@ import org.apache.flink.util.Collector;
 import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
 import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
 import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
+import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
 import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
+import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
 import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
 import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
@@ -50,20 +54,32 @@ import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.Test;
 
+import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 /** Tests for {@link MySqlSourceReader}. */
 public class MySqlSourceReaderTest extends MySqlSourceTestBase {
 
     private final UniqueDatabase customerDatabase =
             new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
 
     @Test
     public void testBinlogReadFailoverCrossTransaction() throws Exception {
@@ -126,10 +142,110 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
         restartReader.close();
     }
 
+    @Test
+    public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception {
+        inventoryDatabase.createAndInitialize();
+        String tableName = inventoryDatabase.getDatabaseName() + ".products";
+        // use default split size which is large to make sure we only have one snapshot split
+        final MySqlSourceConfig sourceConfig =
+                new MySqlSourceConfigFactory()
+                        .startupOptions(StartupOptions.initial())
+                        .databaseList(inventoryDatabase.getDatabaseName())
+                        .tableList(tableName)
+                        .includeSchemaChanges(false)
+                        .hostname(MYSQL_CONTAINER.getHost())
+                        .port(MYSQL_CONTAINER.getDatabasePort())
+                        .username(customerDatabase.getUsername())
+                        .password(customerDatabase.getPassword())
+                        .serverTimeZone(ZoneId.of("UTC").toString())
+                        .createConfig(0);
+        final MySqlSnapshotSplitAssigner assigner =
+                new MySqlSnapshotSplitAssigner(
+                        sourceConfig,
+                        DEFAULT_PARALLELISM,
+                        Collections.singletonList(TableId.parse(tableName)),
+                        false);
+        assigner.open();
+        MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit) assigner.getNext().get();
+        // should contain only one split
+        assertFalse(assigner.getNext().isPresent());
+        // and the split is a full range one
+        assertNull(snapshotSplit.getSplitStart());
+        assertNull(snapshotSplit.getSplitEnd());
+
+        final AtomicBoolean finishReading = new AtomicBoolean(false);
+        final CountDownLatch updatingExecuted = new CountDownLatch(1);
+        TestingReaderContext testingReaderContext = new TestingReaderContext();
+        MySqlSourceReader<SourceRecord> reader = createReader(sourceConfig, testingReaderContext);
+        reader.start();
+
+        Thread updateWorker =
+                new Thread(
+                        () -> {
+                            try (Connection connection = inventoryDatabase.getJdbcConnection();
+                                    Statement statement = connection.createStatement()) {
+                                boolean flagSet = false;
+                                while (!finishReading.get()) {
+                                    statement.execute(
+                                            "UPDATE products SET  description='"
+                                                    + UUID.randomUUID().toString()
+                                                    + "' WHERE id=101");
+                                    if (!flagSet) {
+                                        updatingExecuted.countDown();
+                                        flagSet = true;
+                                    }
+                                }
+                            } catch (Exception throwables) {
+                                throwables.printStackTrace();
+                            }
+                        });
+
+        // start to keep updating the products table
+        updateWorker.start();
+        // wait until the updating executed
+        updatingExecuted.await();
+        // start to read chunks of the products table
+        reader.addSplits(Collections.singletonList(snapshotSplit));
+        reader.notifyNoMoreSplits();
+
+        TestingReaderOutput<SourceRecord> output = new TestingReaderOutput<>();
+        while (true) {
+            InputStatus status = reader.pollNext(output);
+            if (status == InputStatus.END_OF_INPUT) {
+                break;
+            }
+            if (status == InputStatus.NOTHING_AVAILABLE) {
+                reader.isAvailable().get();
+            }
+        }
+        // stop the updating worker
+        finishReading.set(true);
+        updateWorker.join();
+
+        // check the result
+        ArrayList<SourceRecord> emittedRecords = output.getEmittedRecords();
+        Map<Object, SourceRecord> recordByKey = new HashMap<>();
+        for (SourceRecord record : emittedRecords) {
+            SourceRecord existed = recordByKey.get(record.key());
+            if (existed != null) {
+                fail(
+                        String.format(
+                                "The emitted record contains duplicate records on key\n%s\n%s\n",
+                                existed, record));
+            } else {
+                recordByKey.put(record.key(), record);
+            }
+        }
+    }
+
     private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration) {
+        return createReader(configuration, new TestingReaderContext());
+    }
+
+    private MySqlSourceReader<SourceRecord> createReader(
+            MySqlSourceConfig configuration, SourceReaderContext readerContext) {
         final FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
                 new FutureCompletingBlockingQueue<>();
-        final SourceReaderContext readerContext = new TestingReaderContext();
         final MySqlRecordEmitter<SourceRecord> recordEmitter =
                 new MySqlRecordEmitter<>(
                         new ForwardDeserializeSchema(),