[mysql] Keep the global order of between snapshot records and binlog records

pull/275/head
Leonard Xu 4 years ago committed by Jark Wu
parent 8638720d71
commit e894b0a69e
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -117,7 +117,7 @@ public class MySqlSourceOptions {
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The maximum time in milliseconds that the connector should wait after trying to connect to the MySQL database server before timing out.");
"The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.");
public static final ConfigOption<String> SCAN_STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")

@ -36,7 +36,7 @@ import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOff
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SplitKeyUtils.validateAndGetSplitKeyType;
/**
* A split assigner that assign the binlog split for all capture tables.
* A split assigner that assign the binlog split for all captured tables.
*
* <p>This assigner is used for latest-offset startup mode.
*/
@ -52,10 +52,6 @@ public class MySqlBinlogSplitAssigner extends MySqlSplitAssigner {
super(configuration, definedPkType, alreadyProcessedTables, remainingSplits);
}
public void open() {
super.open();
}
/**
* Gets the binlog split for latest-offset.
*
@ -91,8 +87,4 @@ public class MySqlBinlogSplitAssigner extends MySqlSplitAssigner {
}
}
}
public void close() {
super.close();
}
}

@ -20,6 +20,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.assigner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions;
@ -85,6 +86,10 @@ public class MySqlSnapshotSplitAssigner extends MySqlSplitAssigner {
public void open() {
super.open();
// TODO: skip to scan table lists if we are already in binlog phase
alreadyProcessedTables.forEach(capturedTables::remove);
this.remainingTables.addAll(capturedTables);
this.currentTableSplitSeq = 0;
}
@ -175,6 +180,10 @@ public class MySqlSnapshotSplitAssigner extends MySqlSplitAssigner {
"Interrupted when analyze splits for table {}, exception {}",
tableId,
e);
throw new FlinkRuntimeException(
String.format(
"Interrupted when analyze splits for table %s", tableId),
e);
}
LOG.info("Has analyze {} splits for table {} ", splitCnt, tableId);
}
@ -314,33 +323,4 @@ public class MySqlSnapshotSplitAssigner extends MySqlSplitAssigner {
currentTableSplitSeq++;
return splitId;
}
public void close() {
if (jdbc != null) {
try {
jdbc.close();
} catch (SQLException e) {
LOG.error("Close jdbc connection error", e);
}
}
}
/**
* Adds a set of splits to this assigner. This happens for example when some split processing
* failed and the splits need to be re-added, or when new splits got discovered.
*/
public void addSplits(Collection<MySqlSplit> splits) {
remainingSplits.addAll(splits);
}
public Collection<TableId> getAlreadyProcessedTables() {
return alreadyProcessedTables;
}
/**
* Gets the remaining splits for {@link #alreadyProcessedTables} that this assigner has pending.
*/
public Collection<MySqlSplit> remainingSplits() {
return remainingSplits;
}
}

@ -86,22 +86,9 @@ public abstract class MySqlSplitAssigner {
return getClass() == MySqlBinlogSplitAssigner.class;
}
/** Casts this assigner into a {@link MySqlSnapshotSplitAssigner}. */
@SuppressWarnings("unchecked")
public final MySqlSnapshotSplitAssigner asSnapshotSplitAssigner() {
return (MySqlSnapshotSplitAssigner) this;
}
/** Casts this assigner into a {@link MySqlBinlogSplitAssigner}. */
@SuppressWarnings("unchecked")
public final MySqlBinlogSplitAssigner asBinlogSplitAssigner() {
return (MySqlBinlogSplitAssigner) this;
}
public void open() {
initJdbcConnection();
this.tableFilters = getTableFilters();
// TODO: skip to scan table lists if we are already in binlog phase
this.capturedTables = getCapturedTables();
this.databaseSchema = StatefulTaskContext.getMySqlDatabaseSchema(configuration, jdbc);
}

@ -66,6 +66,10 @@ public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
return this.compareTo(that) >= 0;
}
public boolean isBefore(BinlogOffset that) {
return this.compareTo(that) > 0;
}
@Override
public String toString() {
return filename + ":" + position;

@ -31,6 +31,8 @@ import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorAckEve
import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorRequestReportEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.SourceReaderReportEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState;
@ -40,11 +42,11 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkState;
@ -55,7 +57,12 @@ public class MySqlSourceReader<T>
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);
private final Map<String, MySqlSnapshotSplit> finishedUnAckedSplits;
private final HashMap<String, MySqlSnapshotSplit> finishedUnAckedSplits;
// before the binlog split start, wait at least one complete checkpoint to
// ensure the order of snapshot records and binlog records in parallel tasks
private final HashMap<MySqlBinlogSplit, Integer> binlogSplitWithCheckpointCnt;
private final HashMap<String, BinlogOffset> binlogSplitWithMinHighWatermark;
private final int subtaskId;
public MySqlSourceReader(
@ -71,6 +78,8 @@ public class MySqlSourceReader<T>
config,
context);
this.finishedUnAckedSplits = new HashMap<>();
this.binlogSplitWithCheckpointCnt = new HashMap<>();
this.binlogSplitWithMinHighWatermark = new HashMap<>();
this.subtaskId = context.getIndexOfSubtask();
}
@ -97,6 +106,11 @@ public class MySqlSourceReader<T>
// add finished snapshot splits that didn't receive ack yet
stateSplits.addAll(finishedUnAckedSplits.values());
// add the initial binlog to state
if (binlogSplitWithCheckpointCnt.size() > 0) {
stateSplits.addAll(binlogSplitWithCheckpointCnt.keySet());
}
return stateSplits;
}
@ -116,28 +130,35 @@ public class MySqlSourceReader<T>
@Override
public void addSplits(List<MySqlSplit> splits) {
for (MySqlSplit split : splits) {
if (split.isSnapshotSplit()) {
if (split.asSnapshotSplit().isSnapshotReadFinished()) {
this.finishedUnAckedSplits.put(split.splitId(), split.asSnapshotSplit());
} else {
// add all un-finished snapshot splits to SourceReaderBase
super.addSplits(Collections.singletonList(split));
}
} else {
if (hasBeenReadBinlogSplit(split.asBinlogSplit())) {
// add binlog split has been read, the case restores from state
super.addSplits(Collections.singletonList(split));
} else {
// receive new binlog split, check need wait or not
if (!binlogSplitWithCheckpointCnt.containsKey(split)) {
// wait
binlogSplitWithCheckpointCnt.put(split.asBinlogSplit(), 0);
} else if (binlogSplitWithCheckpointCnt.get(split) > 0) {
// the binlog split has wait more thant one checkpoint
// submit it
super.addSplits(Collections.singletonList(split));
}
}
}
}
// case for restore from state, notify split enumerator if there're finished snapshot splits
// and has not report
splits.stream()
.filter(
split ->
split.isSnapshotSplit()
&& split.asSnapshotSplit().isSnapshotReadFinished())
.forEach(
split ->
this.finishedUnAckedSplits.put(
split.splitId(), split.asSnapshotSplit()));
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits(including binlog split) to SourceReaderBase
super.addSplits(
splits.stream()
.filter(
split ->
!(split.isSnapshotSplit()
&& split.asSnapshotSplit()
.isSnapshotReadFinished()))
.collect(Collectors.toList()));
}
@Override
@ -179,8 +200,52 @@ public class MySqlSourceReader<T>
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (binlogSplitWithCheckpointCnt.size() > 0) {
MySqlBinlogSplit mySqlBinlogSplit =
binlogSplitWithCheckpointCnt.keySet().iterator().next();
binlogSplitWithCheckpointCnt.put(
mySqlBinlogSplit, binlogSplitWithCheckpointCnt.get(mySqlBinlogSplit) + 1);
if (binlogSplitWithCheckpointCnt.get(mySqlBinlogSplit) > 0) {
this.addSplits(Collections.singletonList(mySqlBinlogSplit));
binlogSplitWithCheckpointCnt.clear();
}
}
}
@Override
protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) {
return splitState.toMySqlSplit();
}
private boolean hasBeenReadBinlogSplit(MySqlBinlogSplit binlogSplit) {
if (binlogSplit.getFinishedSnapshotSplitInfos().isEmpty()) {
// the latest-offset mode, do not need to wait checkpoint
return true;
} else {
BinlogOffset minHighWatermark = getMinHighWatermark(binlogSplit);
return binlogSplit.getStartingOffset().isBefore(minHighWatermark);
}
}
private BinlogOffset getMinHighWatermark(MySqlBinlogSplit binlogSplit) {
final String binlogSplitId = binlogSplit.splitId();
if (binlogSplitWithMinHighWatermark.containsKey(binlogSplitId)) {
return binlogSplitWithMinHighWatermark.get(binlogSplitId);
} else {
BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;
for (FinishedSnapshotSplitInfo snapshotSplitInfo :
binlogSplit.getFinishedSnapshotSplitInfos()) {
// find the min HighWatermark of all finished snapshot splits
if (snapshotSplitInfo.getHighWatermark().compareTo(minBinlogOffset) < 0) {
minBinlogOffset = snapshotSplitInfo.getHighWatermark();
}
}
binlogSplitWithMinHighWatermark.put(binlogSplitId, minBinlogOffset);
return minBinlogOffset;
}
}
}

@ -21,6 +21,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import java.util.Arrays;
@ -63,8 +64,8 @@ public class SplitKeyUtils {
public static boolean splitKeyIsAutoIncremented(RowType splitKeyType, Table actualTable) {
final String splitKeyName = unquoteColumnName(splitKeyType.getFieldNames().get(0));
return !actualTable.primaryKeyColumnNames().isEmpty()
&& actualTable.isAutoIncremented(splitKeyName);
final Column column = actualTable.columnWithName(splitKeyName);
return column != null && column.isAutoIncremented();
}
private static void validatePrimaryKey(

@ -51,6 +51,9 @@ import java.util.Optional;
import java.util.Properties;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_ID;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@ -184,9 +187,9 @@ public class MySqlTableSource implements ScanTableSource {
* The server id is required, it will be replaced to 'database.server.id' when build {@Link
* MySQLSplitReader}
*/
properties.put("server-id", serverId);
properties.put("scan.split.size", String.valueOf(splitSize));
properties.put("scan.fetch.size", String.valueOf(fetchSize));
properties.put(SERVER_ID.key(), serverId);
properties.put(SCAN_SNAPSHOT_CHUNK_SIZE.key(), String.valueOf(splitSize));
properties.put(SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize));
properties.put("connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));
if (database != null) {

@ -24,31 +24,26 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.alibaba.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.alibaba.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import static com.alibaba.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySQLLatestOffset;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Integration tests for MySQL binlog SQL source. */
@ -67,14 +62,13 @@ public class MySqlConnectorITCase extends MySqlTestBase {
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
// the debezium mysql connector use legacy implementation or not
private final boolean useLegacyDezMySQL;
// enable the parallelRead(i.e: The new source MySQLParallelSource)
private final boolean parallelRead;
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
public MySqlConnectorITCase(boolean useLegacyDezMySQL, boolean parallelRead) {
this.useLegacyDezMySQL = useLegacyDezMySQL;
this.parallelRead = parallelRead;
@ -102,17 +96,16 @@ public class MySqlConnectorITCase extends MySqlTestBase {
}
@Test
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
public void testConsumingAllEvents() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " primary key (id) not enforced"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
@ -198,18 +191,17 @@ public class MySqlConnectorITCase extends MySqlTestBase {
String[] expected =
new String[] {
"scooter,3.140",
"car battery,8.100",
"12-pack drill bits,0.800",
"hammer,2.625",
"rocks,5.100",
"jacket,0.600",
"spare tire,22.200"
"+I[scooter, 3.140]",
"+I[car battery, 8.100]",
"+I[12-pack drill bits, 0.800]",
"+I[hammer, 2.625]",
"+I[rocks, 5.100]",
"+I[jacket, 0.600]",
"+I[spare tire, 22.200]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@ -265,41 +257,12 @@ public class MySqlConnectorITCase extends MySqlTestBase {
parallelRead,
getServerId(),
getSplitSize());
String sinkDDL =
"CREATE TABLE sink (\n"
+ " id INT NOT NULL,\n"
+ " tiny_c TINYINT,\n"
+ " tiny_un_c SMALLINT ,\n"
+ " small_c SMALLINT,\n"
+ " small_un_c INT,\n"
+ " int_c INT ,\n"
+ " int_un_c BIGINT,\n"
+ " int11_c BIGINT,\n"
+ " big_c BIGINT,\n"
+ " varchar_c STRING,\n"
+ " char_c STRING,\n"
+ " float_c FLOAT,\n"
+ " double_c DOUBLE,\n"
+ " decimal_c DECIMAL(8, 4),\n"
+ " numeric_c DECIMAL(6, 0),\n"
+ " boolean_c BOOLEAN,\n"
+ " date_c DATE,\n"
+ " time_c TIME(0),\n"
+ " datetime3_c TIMESTAMP(3),\n"
+ " datetime6_c TIMESTAMP(6),\n"
+ " timestamp_c TIMESTAMP(0),\n"
+ " file_uuid STRING\n"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"INSERT INTO sink SELECT id,\n"
"SELECT id,\n"
+ "tiny_c,\n"
+ "tiny_un_c,\n"
+ "small_c,\n"
@ -322,7 +285,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ "timestamp_c,\n"
+ "TO_BASE64(DECODE(file_uuid, 'UTF-8')) FROM full_types");
waitForSnapshotStarted("sink");
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -331,22 +295,85 @@ public class MySqlConnectorITCase extends MySqlTestBase {
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
}
waitForSinkSize("sink", 3);
List<String> expected =
Arrays.asList(
"+I(1,127,255,32767,65535,2147483647,4294967295,2147483647,9223372036854775807,Hello World,abc,"
+ "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,"
+ "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)",
"-U(1,127,255,32767,65535,2147483647,4294967295,2147483647,9223372036854775807,Hello World,abc,"
+ "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,"
+ "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)",
"+U(1,127,255,32767,65535,2147483647,4294967295,2147483647,9223372036854775807,Hello World,abc,"
+ "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123,"
+ "2020-07-17T18:00:22.123456,2020-07-17T18:33:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
assertEquals(expected, actual);
String[] expected =
new String[] {
"+I[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]",
"-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]",
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]"
};
assertThat(fetchRows(result.collect(), 3), containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " primary key(id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset',"
+ " 'scan.snapshot.parallel-read' = '%s',"
+ " 'server-id' = '%s',"
+ " 'debezium.internal.implementation' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"products",
parallelRead,
getServerId(),
getDezImplementation());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
while (result.getJobClient().get().getJobStatus().get() != RUNNING) {
Thread.sleep(5000L);
}
CloseableIterator<Row> iterator = result.collect();
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
String[] expected =
new String[] {
"+I[110, jacket, water resistent white wind breaker, 0.200]",
"+I[111, scooter, Big 2-wheel scooter , 5.180]",
"-U[110, jacket, water resistent white wind breaker, 0.200]",
"+U[110, jacket, new water resistent white wind breaker, 0.500]",
"-U[111, scooter, Big 2-wheel scooter , 5.180]",
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertThat(fetchRows(iterator, expected.length), containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@ -431,7 +458,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
waitForSinkSize("sink", 7);
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
@ -505,16 +532,16 @@ public class MySqlConnectorITCase extends MySqlTestBase {
String[] expected =
new String[] {
"101,scooter,Small 2-wheel scooter,3.140",
"102,car battery,12V car battery,8.100",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800",
"104,hammer,12oz carpenter's hammer,0.750",
"105,hammer,14oz carpenter's hammer,0.875",
"106,hammer,18oz carpenter hammer,1.000",
"107,rocks,box of assorted rocks,5.100",
"108,jacket,water resistent black wind breaker,0.100",
"109,spare tire,24 inch spare tire,22.200",
"110,jacket,new water resistent white wind breaker,0.500"
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
"+I[102, car battery, 12V car battery, 8.100]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
"+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]",
"+I[106, hammer, 18oz carpenter hammer, 1.000]",
"+I[107, rocks, box of assorted rocks, 5.100]",
"+I[110, jacket, new water resistent white wind breaker, 0.500]"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
@ -523,85 +550,6 @@ public class MySqlConnectorITCase extends MySqlTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " primary key(id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset',"
+ " 'scan.snapshot.parallel-read' = '%s',"
+ " 'server-id' = '%s',"
+ " 'debezium.internal.implementation' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"products",
parallelRead,
getServerId(),
getDezImplementation());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
while (result.getJobClient().get().getJobStatus().get() != RUNNING) {
Thread.sleep(5000L);
}
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
String[] expected =
new String[] {
"110,jacket,water resistent white wind breaker,0.200",
"111,scooter,Big 2-wheel scooter ,5.180",
"110,jacket,water resistent white wind breaker,0.200",
"110,jacket,new water resistent white wind breaker,0.500",
"111,scooter,Big 2-wheel scooter ,5.180",
"111,scooter,Big 2-wheel scooter ,5.170",
"111,scooter,Big 2-wheel scooter ,5.170"
};
assertThat(fetchRows(result.collect(), 7), containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
@Test
public void testStartupFromTimestamp() throws Exception {
if (parallelRead) {
@ -668,7 +616,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
waitForSinkSize("sink", 7);
String[] expected =
new String[] {"110,jacket,new water resistent white wind breaker,0.500"};
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
@ -722,4 +670,20 @@ public class MySqlConnectorITCase extends MySqlTestBase {
}
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
}
}
}

Loading…
Cancel
Save