[mysql] Fix there is duplicate records outputted when keep updating table during snapshot read (#592)

Signed-off-by: 元组 <zhaojunwang.zjw@alibaba-inc.com>
(cherry picked from commit cf50566f34)
pull/661/head
Tuple 3 years ago committed by Jark Wu
parent 492700bbbf
commit 6d3078e87f
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -171,20 +171,13 @@ public class RecordUtils {
binlog.key(),
binlog.valueSchema(),
envelope.read(updateAfter, source, ts));
normalizedBinlogRecords.add(record);
snapshotRecords.put(key, record);
break;
case DELETE:
if (snapshotRecords.containsKey(key)) {
snapshotRecords.remove(key);
} else {
throw new IllegalStateException(
String.format(
"The delete record %s doesn't exists in table split %s.",
binlog, split));
}
snapshotRecords.remove(key);
break;
case CREATE:
normalizedBinlogRecords.add(binlog);
snapshotRecords.put(key, binlog);
break;
case READ:
throw new IllegalStateException(

@ -26,6 +26,8 @@ import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
@ -33,10 +35,12 @@ import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
@ -50,20 +54,32 @@ import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
/** Tests for {@link MySqlSourceReader}. */
public class MySqlSourceReaderTest extends MySqlSourceTestBase {
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
@Test
public void testBinlogReadFailoverCrossTransaction() throws Exception {
@ -126,10 +142,110 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
restartReader.close();
}
@Test
public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception {
inventoryDatabase.createAndInitialize();
String tableName = inventoryDatabase.getDatabaseName() + ".products";
// use default split size which is large to make sure we only have one snapshot split
final MySqlSourceConfig sourceConfig =
new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(tableName)
.includeSchemaChanges(false)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString())
.createConfig(0);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig,
DEFAULT_PARALLELISM,
Collections.singletonList(TableId.parse(tableName)),
false);
assigner.open();
MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit) assigner.getNext().get();
// should contain only one split
assertFalse(assigner.getNext().isPresent());
// and the split is a full range one
assertNull(snapshotSplit.getSplitStart());
assertNull(snapshotSplit.getSplitEnd());
final AtomicBoolean finishReading = new AtomicBoolean(false);
final CountDownLatch updatingExecuted = new CountDownLatch(1);
TestingReaderContext testingReaderContext = new TestingReaderContext();
MySqlSourceReader<SourceRecord> reader = createReader(sourceConfig, testingReaderContext);
reader.start();
Thread updateWorker =
new Thread(
() -> {
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
boolean flagSet = false;
while (!finishReading.get()) {
statement.execute(
"UPDATE products SET description='"
+ UUID.randomUUID().toString()
+ "' WHERE id=101");
if (!flagSet) {
updatingExecuted.countDown();
flagSet = true;
}
}
} catch (Exception throwables) {
throwables.printStackTrace();
}
});
// start to keep updating the products table
updateWorker.start();
// wait until the updating executed
updatingExecuted.await();
// start to read chunks of the products table
reader.addSplits(Collections.singletonList(snapshotSplit));
reader.notifyNoMoreSplits();
TestingReaderOutput<SourceRecord> output = new TestingReaderOutput<>();
while (true) {
InputStatus status = reader.pollNext(output);
if (status == InputStatus.END_OF_INPUT) {
break;
}
if (status == InputStatus.NOTHING_AVAILABLE) {
reader.isAvailable().get();
}
}
// stop the updating worker
finishReading.set(true);
updateWorker.join();
// check the result
ArrayList<SourceRecord> emittedRecords = output.getEmittedRecords();
Map<Object, SourceRecord> recordByKey = new HashMap<>();
for (SourceRecord record : emittedRecords) {
SourceRecord existed = recordByKey.get(record.key());
if (existed != null) {
fail(
String.format(
"The emitted record contains duplicate records on key\n%s\n%s\n",
existed, record));
} else {
recordByKey.put(record.key(), record);
}
}
}
private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration) {
return createReader(configuration, new TestingReaderContext());
}
private MySqlSourceReader<SourceRecord> createReader(
MySqlSourceConfig configuration, SourceReaderContext readerContext) {
final FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final SourceReaderContext readerContext = new TestingReaderContext();
final MySqlRecordEmitter<SourceRecord> recordEmitter =
new MySqlRecordEmitter<>(
new ForwardDeserializeSchema(),

Loading…
Cancel
Save