[mysql] Adopt streaming merge during snapshotSplit scan to avoid OOM (#1219)

This closes #1219
pull/1354/head
ehui 3 years ago committed by GitHub
parent be4249156e
commit 1fdf28fe49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -41,6 +41,7 @@ import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,14 +49,23 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.normalizedSplitRecords;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isLowWatermarkEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.splitKeyRangeContains;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.upsertBinlog;
import static org.apache.flink.util.Preconditions.checkState;
/**
* A snapshot reader that reads data from Table in split level, the split is assigned by primary key
@ -103,6 +113,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
statefulTaskContext.getConnection(),
statefulTaskContext.getDispatcher(),
statefulTaskContext.getTopicSelector(),
statefulTaskContext.getSnapshotReceiver(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
executor.submit(
@ -229,23 +240,53 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
// data input: [low watermark event][snapshot events][high watermark event][binlog
// events][binlog-end event]
// data output: [low watermark event][normalized events][high watermark event]
boolean reachBinlogStart = false;
boolean reachBinlogEnd = false;
final List<SourceRecord> sourceRecords = new ArrayList<>();
SourceRecord lowWatermark = null;
SourceRecord highWatermark = null;
Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
while (!reachBinlogEnd) {
checkReadException();
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
sourceRecords.add(event.getRecord());
if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
SourceRecord record = event.getRecord();
if (lowWatermark == null) {
lowWatermark = record;
assertLowWatermark(lowWatermark);
continue;
}
if (highWatermark == null && isHighWatermarkEvent(record)) {
highWatermark = record;
// snapshot events capture end and begin to capture binlog events
reachBinlogStart = true;
continue;
}
if (reachBinlogStart && RecordUtils.isEndWatermarkEvent(record)) {
// capture to end watermark events, stop the loop
reachBinlogEnd = true;
break;
}
if (!reachBinlogStart) {
snapshotRecords.put((Struct) record.key(), record);
} else {
if (isRequiredBinlogRecord(record)) {
// upsert binlog events through the record key
upsertBinlog(snapshotRecords, record);
}
}
}
}
// snapshot split return its data once
hasNextElement.set(false);
return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
.iterator();
final List<SourceRecord> normalizedRecords = new ArrayList<>();
normalizedRecords.add(lowWatermark);
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
normalizedRecords.add(highWatermark);
return normalizedRecords.iterator();
}
// the data has been polled, no more data
reachEnd.compareAndSet(false, true);
@ -262,6 +303,24 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
}
}
private void assertLowWatermark(SourceRecord lowWatermark) {
checkState(
isLowWatermarkEvent(lowWatermark),
String.format(
"The first record should be low watermark signal event, but actual is %s",
lowWatermark));
}
private boolean isRequiredBinlogRecord(SourceRecord record) {
if (isDataChangeRecord(record)) {
Object[] key =
getSplitKey(currentSnapshotSplit.getSplitKeyType(), record, nameAdjuster);
return splitKeyRangeContains(
key, currentSnapshotSplit.getSplitStart(), currentSnapshotSplit.getSplitEnd());
}
return false;
}
@Override
public void close() {
try {

@ -76,6 +76,7 @@ public class MySqlSnapshotSplitReadTask
private final Clock clock;
private final MySqlSnapshotSplit snapshotSplit;
private final TopicSelector<TableId> topicSelector;
private final EventDispatcher.SnapshotReceiver snapshotReceiver;
private final SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
public MySqlSnapshotSplitReadTask(
@ -85,6 +86,7 @@ public class MySqlSnapshotSplitReadTask
MySqlConnection jdbcConnection,
EventDispatcherImpl<TableId> dispatcher,
TopicSelector<TableId> topicSelector,
EventDispatcher.SnapshotReceiver snapshotReceiver,
Clock clock,
MySqlSnapshotSplit snapshotSplit) {
super(connectorConfig, snapshotChangeEventSourceMetrics);
@ -95,6 +97,7 @@ public class MySqlSnapshotSplitReadTask
this.clock = clock;
this.snapshotSplit = snapshotSplit;
this.topicSelector = topicSelector;
this.snapshotReceiver = snapshotReceiver;
this.snapshotChangeEventSourceMetrics = snapshotChangeEventSourceMetrics;
}
@ -188,8 +191,6 @@ public class MySqlSnapshotSplitReadTask
RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,
TableId tableId)
throws Exception {
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
LOG.debug("Snapshotting table {}", tableId);
createDataEventsForTable(
snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));

@ -39,6 +39,7 @@ import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.data.Envelope;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
@ -68,6 +69,7 @@ import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINL
public class StatefulTaskContext {
private static final Logger LOG = LoggerFactory.getLogger(StatefulTaskContext.class);
private static final int DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN = 1024;
private static final Clock clock = Clock.SYSTEM;
private final MySqlSourceConfig sourceConfig;
@ -84,6 +86,7 @@ public class StatefulTaskContext {
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
private StreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;
private EventDispatcherImpl<TableId> dispatcher;
private EventDispatcher.SnapshotReceiver snapshotReceiver;
private SignalEventDispatcher signalEventDispatcher;
private ChangeEventQueue<DataChangeEvent> queue;
private ErrorHandler errorHandler;
@ -120,7 +123,7 @@ public class StatefulTaskContext {
final int queueSize =
mySqlSplit.isSnapshotSplit()
? Integer.MAX_VALUE
? sourceConfig.getSplitSize() + DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN
: connectorConfig.getMaxQueueSize();
this.queue =
new ChangeEventQueue.Builder<DataChangeEvent>()
@ -146,6 +149,8 @@ public class StatefulTaskContext {
metadataProvider,
schemaNameAdjuster);
this.snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();
this.signalEventDispatcher =
new SignalEventDispatcher(
offsetContext.getPartition(), topicSelector.getPrimaryTopic(), queue);
@ -352,6 +357,10 @@ public class StatefulTaskContext {
return dispatcher;
}
public EventDispatcher.SnapshotReceiver getSnapshotReceiver() {
return snapshotReceiver;
}
public SignalEventDispatcher getSignalEventDispatcher() {
return signalEventDispatcher;
}

@ -38,7 +38,6 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@ -53,7 +52,6 @@ import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEvent
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WATERMARK_KIND;
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.util.Preconditions.checkState;
/** Utility class to deal record. */
public class RecordUtils {
@ -75,125 +73,50 @@ public class RecordUtils {
return row;
}
/**
* Normalize the records of snapshot split which represents the split records state on high
* watermark. data input: [low watermark event] [snapshot events ] [high watermark event]
* [binlog events] [binlog-end event] data output: [low watermark event] [normalized events]
* [high watermark event]
*/
public static List<SourceRecord> normalizedSplitRecords(
MySqlSnapshotSplit snapshotSplit,
List<SourceRecord> sourceRecords,
SchemaNameAdjuster nameAdjuster) {
List<SourceRecord> normalizedRecords = new ArrayList<>();
Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
List<SourceRecord> binlogRecords = new ArrayList<>();
if (!sourceRecords.isEmpty()) {
SourceRecord lowWatermark = sourceRecords.get(0);
checkState(
isLowWatermarkEvent(lowWatermark),
String.format(
"The first record should be low watermark signal event, but is %s",
lowWatermark));
SourceRecord highWatermark = null;
int i = 1;
for (; i < sourceRecords.size(); i++) {
SourceRecord sourceRecord = sourceRecords.get(i);
if (!isHighWatermarkEvent(sourceRecord)) {
snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord);
} else {
highWatermark = sourceRecord;
i++;
/** upsert binlog events to snapshot events collection. */
public static void upsertBinlog(
Map<Struct, SourceRecord> snapshotRecords, SourceRecord binlogRecord) {
Struct key = (Struct) binlogRecord.key();
Struct value = (Struct) binlogRecord.value();
if (value != null) {
Envelope.Operation operation =
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
switch (operation) {
case CREATE:
case UPDATE:
Envelope envelope = Envelope.fromSchema(binlogRecord.valueSchema());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Instant fetchTs =
Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP));
SourceRecord record =
new SourceRecord(
binlogRecord.sourcePartition(),
binlogRecord.sourceOffset(),
binlogRecord.topic(),
binlogRecord.kafkaPartition(),
binlogRecord.keySchema(),
binlogRecord.key(),
binlogRecord.valueSchema(),
envelope.read(after, source, fetchTs));
snapshotRecords.put(key, record);
break;
}
}
if (i < sourceRecords.size() - 1) {
List<SourceRecord> allBinlogRecords =
sourceRecords.subList(i, sourceRecords.size() - 1);
for (SourceRecord binlog : allBinlogRecords) {
if (isDataChangeRecord(binlog)) {
Object[] key =
getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster);
if (splitKeyRangeContains(
key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) {
binlogRecords.add(binlog);
}
}
}
}
checkState(
isHighWatermarkEvent(highWatermark),
String.format(
"The last record should be high watermark signal event, but is %s",
highWatermark));
normalizedRecords =
upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords);
}
return normalizedRecords;
}
private static List<SourceRecord> upsertBinlog(
SourceRecord lowWatermarkEvent,
SourceRecord highWatermarkEvent,
Map<Struct, SourceRecord> snapshotRecords,
List<SourceRecord> binlogRecords) {
// upsert binlog events to snapshot events of split
if (!binlogRecords.isEmpty()) {
for (SourceRecord binlog : binlogRecords) {
Struct key = (Struct) binlog.key();
Struct value = (Struct) binlog.value();
if (value != null) {
Envelope.Operation operation =
Envelope.Operation.forCode(
value.getString(Envelope.FieldName.OPERATION));
switch (operation) {
case CREATE:
case UPDATE:
Envelope envelope = Envelope.fromSchema(binlog.valueSchema());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Instant fetchTs =
Instant.ofEpochMilli(
(Long) source.get(Envelope.FieldName.TIMESTAMP));
SourceRecord record =
new SourceRecord(
binlog.sourcePartition(),
binlog.sourceOffset(),
binlog.topic(),
binlog.kafkaPartition(),
binlog.keySchema(),
binlog.key(),
binlog.valueSchema(),
envelope.read(after, source, fetchTs));
snapshotRecords.put(key, record);
break;
case DELETE:
snapshotRecords.remove(key);
break;
case READ:
throw new IllegalStateException(
String.format(
"Binlog record shouldn't use READ operation, the the record is %s.",
binlog));
}
}
case DELETE:
snapshotRecords.remove(key);
break;
case READ:
throw new IllegalStateException(
String.format(
"Binlog record shouldn't use READ operation, the the record is %s.",
binlogRecord));
}
}
final List<SourceRecord> normalizedRecords = new ArrayList<>();
normalizedRecords.add(lowWatermarkEvent);
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
normalizedRecords.add(highWatermarkEvent);
return normalizedRecords;
}
/**
* Format message timestamp(source.ts_ms) value to 0L for all records read in snapshot phase.
*/
private static List<SourceRecord> formatMessageTimestamp(
public static List<SourceRecord> formatMessageTimestamp(
Collection<SourceRecord> snapshotRecords) {
return snapshotRecords.stream()
.map(

@ -34,16 +34,25 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.data.Envelope;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionSchema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
@ -69,6 +78,8 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
@Test
public void testReadSingleSnapshotSplit() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
@ -84,13 +95,16 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]"
};
List<String> actual = readTableSnapshotSplits(mySqlSplits, sourceConfig, 1, dataType);
List<String> actual =
readTableSnapshotSplits(mySqlSplits, statefulTaskContext, 1, dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@Test
public void testReadAllSnapshotSplitsForOneTable() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final DataType dataType =
DataTypes.ROW(
@ -114,13 +128,16 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
"+I[110, user_10, Shanghai, 123567891234]"
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@Test
public void testReadAllSplitForTableWithSingleLine() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"}, 10);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final DataType dataType =
DataTypes.ROW(
@ -131,7 +148,8 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
String[] expected = new String[] {"+I[20001, LEVEL_1, user_1, user with level 1]"};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -139,6 +157,8 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
public void testReadAllSnapshotSplitsForTables() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customer_card_single_line"}, 10);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
DataType dataType =
DataTypes.ROW(
@ -172,7 +192,8 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
"+I[50003, LEVEL_1, user_14, user with level 1]"
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -180,6 +201,8 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
public void testThrowRuntimeExceptionInSnapshotScan() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customers_1"}, 10);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
DataType dataType =
DataTypes.ROW(
@ -196,7 +219,7 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
String exceptionMessage = String.format("Snapshotting of table %s failed.", tableToDrop);
try {
readTableSnapshotSplits(mySqlSplits, sourceConfig, mySqlSplits.size(), dataType);
readTableSnapshotSplits(mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
fail("Should fail.");
} catch (Exception e) {
assertTrue(e instanceof FlinkRuntimeException);
@ -204,15 +227,169 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
}
}
@Test
public void testChangingDataInSnapshotScan() throws Exception {
String tableName = "customers_even_dist";
MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, 10);
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
String[] changingDataSql =
new String[] {
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"
};
StatefulTaskContext statefulTaskContext =
new MakeBinlogEventTaskContext(
sourceConfig,
binaryLogClient,
mySqlConnection,
() -> executeSql(sourceConfig, changingDataSql));
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Shanghai, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Shanghai, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]"
};
List<String> actual =
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@Test
public void testInsertDataInSnapshotScan() throws Exception {
String tableName = "customers_even_dist";
MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, 10);
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
String[] insertDataSql =
new String[] {
"INSERT INTO " + tableId + " VALUES(111, 'user_11','Shanghai','123567891234')",
"INSERT INTO " + tableId + " VALUES(112, 'user_12','Shanghai','123567891234')",
};
String[] recoveryDataSql =
new String[] {
"DELETE FROM " + tableId + " where id = 111",
"DELETE FROM " + tableId + " where id = 112",
};
StatefulTaskContext statefulTaskContext =
new MakeBinlogEventTaskContext(
sourceConfig,
binaryLogClient,
mySqlConnection,
() -> executeSql(sourceConfig, insertDataSql));
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Shanghai, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Shanghai, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]",
"+I[111, user_11, Shanghai, 123567891234]",
"+I[112, user_12, Shanghai, 123567891234]"
};
List<String> actual =
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
executeSql(sourceConfig, recoveryDataSql);
}
@Test
public void testDeleteDataInSnapshotScan() throws Exception {
String tableName = "customers_even_dist";
MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, 10);
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
String[] deleteDataSql =
new String[] {
"DELETE FROM " + tableId + " where id = 101",
"DELETE FROM " + tableId + " where id = 102",
};
String[] recoveryDataSql =
new String[] {
"INSERT INTO " + tableId + " VALUES(101, 'user_1','Shanghai','123567891234')",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
};
StatefulTaskContext statefulTaskContext =
new MakeBinlogEventTaskContext(
sourceConfig,
binaryLogClient,
mySqlConnection,
() -> executeSql(sourceConfig, deleteDataSql));
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
String[] expected =
new String[] {
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"+I[105, user_5, Shanghai, 123567891234]",
"+I[106, user_6, Shanghai, 123567891234]",
"+I[107, user_7, Shanghai, 123567891234]",
"+I[108, user_8, Shanghai, 123567891234]",
"+I[109, user_9, Shanghai, 123567891234]",
"+I[110, user_10, Shanghai, 123567891234]",
};
List<String> actual =
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
executeSql(sourceConfig, recoveryDataSql);
}
private List<String> readTableSnapshotSplits(
List<MySqlSplit> mySqlSplits,
MySqlSourceConfig sourceConfig,
StatefulTaskContext statefulTaskContext,
int scanSplitsNum,
DataType dataType)
throws Exception {
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
SnapshotSplitReader snapshotSplitReader = new SnapshotSplitReader(statefulTaskContext, 0);
List<SourceRecord> result = new ArrayList<>();
@ -284,4 +461,57 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
.password(customerDatabase.getPassword())
.createConfig(0);
}
private boolean executeSql(MySqlSourceConfig sourceConfig, String[] sqlStatements) {
JdbcConnection connection = DebeziumUtils.openJdbcConnection(sourceConfig);
try {
connection.setAutoCommit(false);
connection.execute(sqlStatements);
connection.commit();
} catch (SQLException e) {
LOG.error("Failed to execute sql statements.", e);
return false;
}
return true;
}
class MakeBinlogEventTaskContext extends StatefulTaskContext {
private Supplier<Boolean> makeBinlogFunction;
public MakeBinlogEventTaskContext(
MySqlSourceConfig sourceConfig,
BinaryLogClient binaryLogClient,
MySqlConnection connection,
Supplier<Boolean> makeBinlogFunction) {
super(sourceConfig, binaryLogClient, connection);
this.makeBinlogFunction = makeBinlogFunction;
}
@Override
public EventDispatcher.SnapshotReceiver getSnapshotReceiver() {
EventDispatcher.SnapshotReceiver snapshotReceiver = super.getSnapshotReceiver();
return new EventDispatcher.SnapshotReceiver() {
@Override
public void changeRecord(
DataCollectionSchema schema,
Envelope.Operation operation,
Object key,
Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
snapshotReceiver.changeRecord(schema, operation, key, value, offset, headers);
}
@Override
public void completeSnapshot() throws InterruptedException {
snapshotReceiver.completeSnapshot();
// make binlog events
makeBinlogFunction.get();
}
};
}
}
}

Loading…
Cancel
Save