From e894b0a69e259bb84957f57de3322f9ba669028b Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Sat, 24 Jul 2021 05:37:04 +0800 Subject: [PATCH] [mysql] Keep the global order of between snapshot records and binlog records --- .../mysql/source/MySqlSourceOptions.java | 2 +- .../assigner/MySqlBinlogSplitAssigner.java | 10 +- .../assigner/MySqlSnapshotSplitAssigner.java | 38 +-- .../source/assigner/MySqlSplitAssigner.java | 13 - .../mysql/source/offset/BinlogOffset.java | 4 + .../source/reader/MySqlSourceReader.java | 107 +++++-- .../mysql/source/utils/SplitKeyUtils.java | 5 +- .../mysql/table/MySqlTableSource.java | 9 +- .../mysql/table/MySqlConnectorITCase.java | 278 ++++++++---------- 9 files changed, 231 insertions(+), 235 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java index 8ed210f1d..d19127c21 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/MySqlSourceOptions.java @@ -117,7 +117,7 @@ public class MySqlSourceOptions { .durationType() .defaultValue(Duration.ofSeconds(30)) .withDescription( - "The maximum time in milliseconds that the connector should wait after trying to connect to the MySQL database server before timing out."); + "The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out."); public static final ConfigOption SCAN_STARTUP_MODE = ConfigOptions.key("scan.startup.mode") diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlBinlogSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlBinlogSplitAssigner.java index 7ffdb04b7..2963fab73 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlBinlogSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlBinlogSplitAssigner.java @@ -36,7 +36,7 @@ import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOff import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SplitKeyUtils.validateAndGetSplitKeyType; /** - * A split assigner that assign the binlog split for all capture tables. + * A split assigner that assign the binlog split for all captured tables. * *

This assigner is used for latest-offset startup mode. */ @@ -52,10 +52,6 @@ public class MySqlBinlogSplitAssigner extends MySqlSplitAssigner { super(configuration, definedPkType, alreadyProcessedTables, remainingSplits); } - public void open() { - super.open(); - } - /** * Gets the binlog split for latest-offset. * @@ -91,8 +87,4 @@ public class MySqlBinlogSplitAssigner extends MySqlSplitAssigner { } } } - - public void close() { - super.close(); - } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSnapshotSplitAssigner.java index 291a9cf21..db0216fb0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSnapshotSplitAssigner.java @@ -20,6 +20,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.assigner; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions; @@ -85,6 +86,10 @@ public class MySqlSnapshotSplitAssigner extends MySqlSplitAssigner { public void open() { super.open(); + // TODO: skip to scan table lists if we are already in binlog phase + alreadyProcessedTables.forEach(capturedTables::remove); + this.remainingTables.addAll(capturedTables); + this.currentTableSplitSeq = 0; } @@ -175,6 +180,10 @@ public class MySqlSnapshotSplitAssigner extends MySqlSplitAssigner { "Interrupted when analyze splits for table {}, exception {}", tableId, e); + throw new FlinkRuntimeException( + String.format( + "Interrupted when analyze splits for table %s", tableId), + e); } LOG.info("Has analyze {} splits for table {} ", splitCnt, tableId); } @@ -314,33 +323,4 @@ public class MySqlSnapshotSplitAssigner extends MySqlSplitAssigner { currentTableSplitSeq++; return splitId; } - - public void close() { - if (jdbc != null) { - try { - jdbc.close(); - } catch (SQLException e) { - LOG.error("Close jdbc connection error", e); - } - } - } - - /** - * Adds a set of splits to this assigner. This happens for example when some split processing - * failed and the splits need to be re-added, or when new splits got discovered. - */ - public void addSplits(Collection splits) { - remainingSplits.addAll(splits); - } - - public Collection getAlreadyProcessedTables() { - return alreadyProcessedTables; - } - - /** - * Gets the remaining splits for {@link #alreadyProcessedTables} that this assigner has pending. - */ - public Collection remainingSplits() { - return remainingSplits; - } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSplitAssigner.java index 334222685..d2f9ba4d5 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/assigner/MySqlSplitAssigner.java @@ -86,22 +86,9 @@ public abstract class MySqlSplitAssigner { return getClass() == MySqlBinlogSplitAssigner.class; } - /** Casts this assigner into a {@link MySqlSnapshotSplitAssigner}. */ - @SuppressWarnings("unchecked") - public final MySqlSnapshotSplitAssigner asSnapshotSplitAssigner() { - return (MySqlSnapshotSplitAssigner) this; - } - - /** Casts this assigner into a {@link MySqlBinlogSplitAssigner}. */ - @SuppressWarnings("unchecked") - public final MySqlBinlogSplitAssigner asBinlogSplitAssigner() { - return (MySqlBinlogSplitAssigner) this; - } - public void open() { initJdbcConnection(); this.tableFilters = getTableFilters(); - // TODO: skip to scan table lists if we are already in binlog phase this.capturedTables = getCapturedTables(); this.databaseSchema = StatefulTaskContext.getMySqlDatabaseSchema(configuration, jdbc); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java index dab389590..5149a9806 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java @@ -66,6 +66,10 @@ public class BinlogOffset implements Comparable, Serializable { return this.compareTo(that) >= 0; } + public boolean isBefore(BinlogOffset that) { + return this.compareTo(that) > 0; + } + @Override public String toString() { return filename + ":" + position; diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index f46db3247..fd0c5b75c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -31,6 +31,8 @@ import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorAckEve import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorRequestReportEvent; import com.alibaba.ververica.cdc.connectors.mysql.source.events.SourceReaderReportEvent; import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.alibaba.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; +import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState; import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState; @@ -40,11 +42,11 @@ import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -55,7 +57,12 @@ public class MySqlSourceReader private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class); - private final Map finishedUnAckedSplits; + private final HashMap finishedUnAckedSplits; + // before the binlog split start, wait at least one complete checkpoint to + // ensure the order of snapshot records and binlog records in parallel tasks + private final HashMap binlogSplitWithCheckpointCnt; + private final HashMap binlogSplitWithMinHighWatermark; + private final int subtaskId; public MySqlSourceReader( @@ -71,6 +78,8 @@ public class MySqlSourceReader config, context); this.finishedUnAckedSplits = new HashMap<>(); + this.binlogSplitWithCheckpointCnt = new HashMap<>(); + this.binlogSplitWithMinHighWatermark = new HashMap<>(); this.subtaskId = context.getIndexOfSubtask(); } @@ -97,6 +106,11 @@ public class MySqlSourceReader // add finished snapshot splits that didn't receive ack yet stateSplits.addAll(finishedUnAckedSplits.values()); + + // add the initial binlog to state + if (binlogSplitWithCheckpointCnt.size() > 0) { + stateSplits.addAll(binlogSplitWithCheckpointCnt.keySet()); + } return stateSplits; } @@ -116,28 +130,35 @@ public class MySqlSourceReader @Override public void addSplits(List splits) { + for (MySqlSplit split : splits) { + if (split.isSnapshotSplit()) { + if (split.asSnapshotSplit().isSnapshotReadFinished()) { + this.finishedUnAckedSplits.put(split.splitId(), split.asSnapshotSplit()); + } else { + // add all un-finished snapshot splits to SourceReaderBase + super.addSplits(Collections.singletonList(split)); + } + } else { + if (hasBeenReadBinlogSplit(split.asBinlogSplit())) { + // add binlog split has been read, the case restores from state + super.addSplits(Collections.singletonList(split)); + } else { + // receive new binlog split, check need wait or not + if (!binlogSplitWithCheckpointCnt.containsKey(split)) { + // wait + binlogSplitWithCheckpointCnt.put(split.asBinlogSplit(), 0); + } else if (binlogSplitWithCheckpointCnt.get(split) > 0) { + // the binlog split has wait more thant one checkpoint + // submit it + super.addSplits(Collections.singletonList(split)); + } + } + } + } + // case for restore from state, notify split enumerator if there're finished snapshot splits // and has not report - splits.stream() - .filter( - split -> - split.isSnapshotSplit() - && split.asSnapshotSplit().isSnapshotReadFinished()) - .forEach( - split -> - this.finishedUnAckedSplits.put( - split.splitId(), split.asSnapshotSplit())); reportFinishedSnapshotSplitsIfNeed(); - - // add all un-finished splits(including binlog split) to SourceReaderBase - super.addSplits( - splits.stream() - .filter( - split -> - !(split.isSnapshotSplit() - && split.asSnapshotSplit() - .isSnapshotReadFinished())) - .collect(Collectors.toList())); } @Override @@ -179,8 +200,52 @@ public class MySqlSourceReader } } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (binlogSplitWithCheckpointCnt.size() > 0) { + MySqlBinlogSplit mySqlBinlogSplit = + binlogSplitWithCheckpointCnt.keySet().iterator().next(); + + binlogSplitWithCheckpointCnt.put( + mySqlBinlogSplit, binlogSplitWithCheckpointCnt.get(mySqlBinlogSplit) + 1); + + if (binlogSplitWithCheckpointCnt.get(mySqlBinlogSplit) > 0) { + this.addSplits(Collections.singletonList(mySqlBinlogSplit)); + binlogSplitWithCheckpointCnt.clear(); + } + } + } + @Override protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { return splitState.toMySqlSplit(); } + + private boolean hasBeenReadBinlogSplit(MySqlBinlogSplit binlogSplit) { + if (binlogSplit.getFinishedSnapshotSplitInfos().isEmpty()) { + // the latest-offset mode, do not need to wait checkpoint + return true; + } else { + BinlogOffset minHighWatermark = getMinHighWatermark(binlogSplit); + return binlogSplit.getStartingOffset().isBefore(minHighWatermark); + } + } + + private BinlogOffset getMinHighWatermark(MySqlBinlogSplit binlogSplit) { + final String binlogSplitId = binlogSplit.splitId(); + if (binlogSplitWithMinHighWatermark.containsKey(binlogSplitId)) { + return binlogSplitWithMinHighWatermark.get(binlogSplitId); + } else { + BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET; + for (FinishedSnapshotSplitInfo snapshotSplitInfo : + binlogSplit.getFinishedSnapshotSplitInfos()) { + // find the min HighWatermark of all finished snapshot splits + if (snapshotSplitInfo.getHighWatermark().compareTo(minBinlogOffset) < 0) { + minBinlogOffset = snapshotSplitInfo.getHighWatermark(); + } + } + binlogSplitWithMinHighWatermark.put(binlogSplitId, minBinlogOffset); + return minBinlogOffset; + } + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java index 088f0bb3f..77b7bdea2 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/source/utils/SplitKeyUtils.java @@ -21,6 +21,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.utils; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import io.debezium.relational.Column; import io.debezium.relational.Table; import java.util.Arrays; @@ -63,8 +64,8 @@ public class SplitKeyUtils { public static boolean splitKeyIsAutoIncremented(RowType splitKeyType, Table actualTable) { final String splitKeyName = unquoteColumnName(splitKeyType.getFieldNames().get(0)); - return !actualTable.primaryKeyColumnNames().isEmpty() - && actualTable.isAutoIncremented(splitKeyName); + final Column column = actualTable.columnWithName(splitKeyName); + return column != null && column.isAutoIncremented(); } private static void validatePrimaryKey( diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 10cc9ad47..2bd1fce40 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -51,6 +51,9 @@ import java.util.Optional; import java.util.Properties; import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.DATABASE_SERVER_NAME; +import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_CHUNK_SIZE; +import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static com.alibaba.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SERVER_ID; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -184,9 +187,9 @@ public class MySqlTableSource implements ScanTableSource { * The server id is required, it will be replaced to 'database.server.id' when build {@Link * MySQLSplitReader} */ - properties.put("server-id", serverId); - properties.put("scan.split.size", String.valueOf(splitSize)); - properties.put("scan.fetch.size", String.valueOf(fetchSize)); + properties.put(SERVER_ID.key(), serverId); + properties.put(SCAN_SNAPSHOT_CHUNK_SIZE.key(), String.valueOf(splitSize)); + properties.put(SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize)); properties.put("connect.timeout.ms", String.valueOf(connectTimeout.toMillis())); if (database != null) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index dfb1fe521..8efe84379 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -24,31 +24,26 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.utils.LegacyRowResource; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import com.alibaba.ververica.cdc.connectors.mysql.MySqlTestBase; import com.alibaba.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; import org.junit.Before; -import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.sql.Connection; -import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; -import java.util.concurrent.ExecutionException; import static com.alibaba.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySQLLatestOffset; import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** Integration tests for MySQL binlog SQL source. */ @@ -67,14 +62,13 @@ public class MySqlConnectorITCase extends MySqlTestBase { StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + // the debezium mysql connector use legacy implementation or not private final boolean useLegacyDezMySQL; // enable the parallelRead(i.e: The new source MySQLParallelSource) private final boolean parallelRead; - @ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE; - public MySqlConnectorITCase(boolean useLegacyDezMySQL, boolean parallelRead) { this.useLegacyDezMySQL = useLegacyDezMySQL; this.parallelRead = parallelRead; @@ -102,17 +96,16 @@ public class MySqlConnectorITCase extends MySqlTestBase { } @Test - public void testConsumingAllEvents() - throws SQLException, ExecutionException, InterruptedException { + public void testConsumingAllEvents() throws Exception { inventoryDatabase.createAndInitialize(); String sourceDDL = String.format( "CREATE TABLE debezium_source (" - + " id INT NOT NULL," + + " `id` INT NOT NULL," + " name STRING," + " description STRING," + " weight DECIMAL(10,3)," - + " primary key (id) not enforced" + + " primary key (`id`) not enforced" + ") WITH (" + " 'connector' = 'mysql-cdc'," + " 'hostname' = '%s'," @@ -198,18 +191,17 @@ public class MySqlConnectorITCase extends MySqlTestBase { String[] expected = new String[] { - "scooter,3.140", - "car battery,8.100", - "12-pack drill bits,0.800", - "hammer,2.625", - "rocks,5.100", - "jacket,0.600", - "spare tire,22.200" + "+I[scooter, 3.140]", + "+I[car battery, 8.100]", + "+I[12-pack drill bits, 0.800]", + "+I[hammer, 2.625]", + "+I[rocks, 5.100]", + "+I[jacket, 0.600]", + "+I[spare tire, 22.200]" }; List actual = TestValuesTableFactory.getResults("sink"); assertThat(actual, containsInAnyOrder(expected)); - result.getJobClient().get().cancel().get(); } @@ -265,41 +257,12 @@ public class MySqlConnectorITCase extends MySqlTestBase { parallelRead, getServerId(), getSplitSize()); - String sinkDDL = - "CREATE TABLE sink (\n" - + " id INT NOT NULL,\n" - + " tiny_c TINYINT,\n" - + " tiny_un_c SMALLINT ,\n" - + " small_c SMALLINT,\n" - + " small_un_c INT,\n" - + " int_c INT ,\n" - + " int_un_c BIGINT,\n" - + " int11_c BIGINT,\n" - + " big_c BIGINT,\n" - + " varchar_c STRING,\n" - + " char_c STRING,\n" - + " float_c FLOAT,\n" - + " double_c DOUBLE,\n" - + " decimal_c DECIMAL(8, 4),\n" - + " numeric_c DECIMAL(6, 0),\n" - + " boolean_c BOOLEAN,\n" - + " date_c DATE,\n" - + " time_c TIME(0),\n" - + " datetime3_c TIMESTAMP(3),\n" - + " datetime6_c TIMESTAMP(6),\n" - + " timestamp_c TIMESTAMP(0),\n" - + " file_uuid STRING\n" - + ") WITH (" - + " 'connector' = 'values'," - + " 'sink-insert-only' = 'false'" - + ")"; tEnv.executeSql(sourceDDL); - tEnv.executeSql(sinkDDL); // async submit job TableResult result = tEnv.executeSql( - "INSERT INTO sink SELECT id,\n" + "SELECT id,\n" + "tiny_c,\n" + "tiny_un_c,\n" + "small_c,\n" @@ -322,7 +285,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + "timestamp_c,\n" + "TO_BASE64(DECODE(file_uuid, 'UTF-8')) FROM full_types"); - waitForSnapshotStarted("sink"); + CloseableIterator iterator = result.collect(); + waitForSnapshotStarted(iterator); try (Connection connection = fullTypesDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -331,22 +295,85 @@ public class MySqlConnectorITCase extends MySqlTestBase { "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); } - waitForSinkSize("sink", 3); - - List expected = - Arrays.asList( - "+I(1,127,255,32767,65535,2147483647,4294967295,2147483647,9223372036854775807,Hello World,abc," - + "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123," - + "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)", - "-U(1,127,255,32767,65535,2147483647,4294967295,2147483647,9223372036854775807,Hello World,abc," - + "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123," - + "2020-07-17T18:00:22.123456,2020-07-17T18:00:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)", - "+U(1,127,255,32767,65535,2147483647,4294967295,2147483647,9223372036854775807,Hello World,abc," - + "123.102,404.4443,123.4567,346,true,2020-07-17,18:00:22,2020-07-17T18:00:22.123," - + "2020-07-17T18:00:22.123456,2020-07-17T18:33:22,ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=)"); - List actual = TestValuesTableFactory.getRawResults("sink"); - assertEquals(expected, actual); + String[] expected = + new String[] { + "+I[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]", + "-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]", + "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]" + }; + assertThat(fetchRows(result.collect(), 3), containsInAnyOrder(expected)); + result.getJobClient().get().cancel().get(); + } + + @Test + public void testStartupFromLatestOffset() throws Exception { + inventoryDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " primary key(id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = 'latest-offset'," + + " 'scan.snapshot.parallel-read' = '%s'," + + " 'server-id' = '%s'," + + " 'debezium.internal.implementation' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + inventoryDatabase.getUsername(), + inventoryDatabase.getPassword(), + inventoryDatabase.getDatabaseName(), + "products", + parallelRead, + getServerId(), + getDezImplementation()); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); + + // wait for the source startup, we don't have a better way to wait it, use sleep for now + while (result.getJobClient().get().getJobStatus().get() != RUNNING) { + Thread.sleep(5000L); + } + CloseableIterator iterator = result.collect(); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + String[] expected = + new String[] { + "+I[110, jacket, water resistent white wind breaker, 0.200]", + "+I[111, scooter, Big 2-wheel scooter , 5.180]", + "-U[110, jacket, water resistent white wind breaker, 0.200]", + "+U[110, jacket, new water resistent white wind breaker, 0.500]", + "-U[111, scooter, Big 2-wheel scooter , 5.180]", + "+U[111, scooter, Big 2-wheel scooter , 5.170]", + "-D[111, scooter, Big 2-wheel scooter , 5.170]" + }; + assertThat(fetchRows(iterator, expected.length), containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); } @@ -431,7 +458,7 @@ public class MySqlConnectorITCase extends MySqlTestBase { waitForSinkSize("sink", 7); String[] expected = - new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; List actual = TestValuesTableFactory.getResults("sink"); assertThat(actual, containsInAnyOrder(expected)); @@ -505,16 +532,16 @@ public class MySqlConnectorITCase extends MySqlTestBase { String[] expected = new String[] { - "101,scooter,Small 2-wheel scooter,3.140", - "102,car battery,12V car battery,8.100", - "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800", - "104,hammer,12oz carpenter's hammer,0.750", - "105,hammer,14oz carpenter's hammer,0.875", - "106,hammer,18oz carpenter hammer,1.000", - "107,rocks,box of assorted rocks,5.100", - "108,jacket,water resistent black wind breaker,0.100", - "109,spare tire,24 inch spare tire,22.200", - "110,jacket,new water resistent white wind breaker,0.500" + "+I[101, scooter, Small 2-wheel scooter, 3.140]", + "+I[102, car battery, 12V car battery, 8.100]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", + "+I[104, hammer, 12oz carpenter's hammer, 0.750]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875]", + "+I[108, jacket, water resistent black wind breaker, 0.100]", + "+I[109, spare tire, 24 inch spare tire, 22.200]", + "+I[106, hammer, 18oz carpenter hammer, 1.000]", + "+I[107, rocks, box of assorted rocks, 5.100]", + "+I[110, jacket, new water resistent white wind breaker, 0.500]" }; List actual = TestValuesTableFactory.getResults("sink"); @@ -523,85 +550,6 @@ public class MySqlConnectorITCase extends MySqlTestBase { result.getJobClient().get().cancel().get(); } - @Test - public void testStartupFromLatestOffset() throws Exception { - inventoryDatabase.createAndInitialize(); - String sourceDDL = - String.format( - "CREATE TABLE debezium_source (" - + " id INT NOT NULL," - + " name STRING," - + " description STRING," - + " weight DECIMAL(10,3)," - + " primary key(id) not enforced" - + ") WITH (" - + " 'connector' = 'mysql-cdc'," - + " 'hostname' = '%s'," - + " 'port' = '%s'," - + " 'username' = '%s'," - + " 'password' = '%s'," - + " 'database-name' = '%s'," - + " 'table-name' = '%s'," - + " 'scan.startup.mode' = 'latest-offset'," - + " 'scan.snapshot.parallel-read' = '%s'," - + " 'server-id' = '%s'," - + " 'debezium.internal.implementation' = '%s'" - + ")", - MYSQL_CONTAINER.getHost(), - MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), - inventoryDatabase.getDatabaseName(), - "products", - parallelRead, - getServerId(), - getDezImplementation()); - tEnv.executeSql(sourceDDL); - - // async submit job - TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); - // wait for the source startup, we don't have a better way to wait it, use sleep for now - while (result.getJobClient().get().getJobStatus().get() != RUNNING) { - Thread.sleep(5000L); - } - - try (Connection connection = inventoryDatabase.getJdbcConnection(); - Statement statement = connection.createStatement()) { - - statement.execute( - "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 - statement.execute( - "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); - statement.execute( - "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); - statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); - statement.execute("DELETE FROM products WHERE id=111;"); - } - - String[] expected = - new String[] { - "110,jacket,water resistent white wind breaker,0.200", - "111,scooter,Big 2-wheel scooter ,5.180", - "110,jacket,water resistent white wind breaker,0.200", - "110,jacket,new water resistent white wind breaker,0.500", - "111,scooter,Big 2-wheel scooter ,5.180", - "111,scooter,Big 2-wheel scooter ,5.170", - "111,scooter,Big 2-wheel scooter ,5.170" - }; - assertThat(fetchRows(result.collect(), 7), containsInAnyOrder(expected)); - result.getJobClient().get().cancel().get(); - } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - @Test public void testStartupFromTimestamp() throws Exception { if (parallelRead) { @@ -668,7 +616,7 @@ public class MySqlConnectorITCase extends MySqlTestBase { waitForSinkSize("sink", 7); String[] expected = - new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; List actual = TestValuesTableFactory.getResults("sink"); assertThat(actual, containsInAnyOrder(expected)); @@ -722,4 +670,20 @@ public class MySqlConnectorITCase extends MySqlTestBase { } } } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { + while (!iterator.hasNext()) { + Thread.sleep(100); + } + } }