diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index 6ec7ce5b6..1f6f396ef 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -149,11 +149,11 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { final List finishedSnapshotSplitInfos = new ArrayList<>(); final Map tableSchemas = new HashMap<>(); - BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET; + BinlogOffset minBinlogOffset = null; for (MySqlSnapshotSplit split : assignedSnapshotSplit) { // find the min binlog offset BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId()); - if (binlogOffset.compareTo(minBinlogOffset) < 0) { + if (minBinlogOffset == null || binlogOffset.compareTo(minBinlogOffset) < 0) { minBinlogOffset = binlogOffset; } finishedSnapshotSplitInfos.add( @@ -171,7 +171,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { return new MySqlBinlogSplit( BINLOG_SPLIT_ID, lastSnapshotSplit.getSplitKeyType(), - minBinlogOffset, + minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset, BinlogOffset.NO_STOPPING_OFFSET, finishedSnapshotSplitInfos, tableSchemas); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 4e3af8802..0c74bc579 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -61,9 +61,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; -import static com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME; import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_STARTUP_MODE; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; @@ -613,7 +613,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); properties.put("snapshot.mode", "initial"); properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); - properties.put("database.history.instance.name", DATABASE_HISTORY_INSTANCE_NAME); + properties.put("database.history.instance.name", UUID.randomUUID().toString()); List captureTableIds = Arrays.stream(captureTables) .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 50d37326e..90bfb71b0 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -52,9 +52,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; -import static com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME; import static org.junit.Assert.assertEquals; /** Tests for {@link SnapshotSplitReader}. */ @@ -301,11 +301,11 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { properties.put("database.password", customerDatabase.getPassword()); properties.put("database.whitelist", customerDatabase.getDatabaseName()); properties.put("database.history.skip.unparseable.ddl", "true"); - properties.put("server-id-range", "1001-1002"); + properties.put("server-id", "1001-1002"); properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); properties.put("snapshot.mode", "initial"); properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); - properties.put("database.history.instance.name", DATABASE_HISTORY_INSTANCE_NAME); + properties.put("database.history.instance.name", UUID.randomUUID().toString()); List captureTableIds = Arrays.stream(captureTables) .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java new file mode 100644 index 000000000..229d782dd --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.assigners; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import com.ververica.cdc.connectors.mysql.MySqlTestBase; +import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; +import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState; +import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; +import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges.TableChange; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link MySqlHybridSplitAssigner}. */ +public class MySqlHybridSplitAssignerTest extends MySqlTestBase { + + private static final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + + @BeforeClass + public static void init() { + customerDatabase.createAndInitialize(); + } + + @Test + public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() { + + Configuration configuration = getConfig(); + final String captureTable = "customers"; + List captureTableIds = + Arrays.stream(new String[] {captureTable}) + .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) + .collect(Collectors.toList()); + configuration.setString("table.whitelist", String.join(",", captureTableIds)); + + // Step 1. Mock MySqlHybridSplitAssigner Object + TableId tableId = new TableId(null, customerDatabase.getDatabaseName(), captureTable); + RowType splitKeyType = + (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); + + List alreadyProcessedTables = Lists.newArrayList(tableId); + List remainingSplits = new ArrayList<>(); + + Map assignedSplits = new HashMap<>(); + Map splitFinishedOffsets = new HashMap<>(); + + for (int i = 0; i < 5; i++) { + String splitId = customerDatabase.getDatabaseName() + "." + captureTable + ":" + i; + Object[] splitStart = i == 0 ? null : new Object[] {i * 2}; + Object[] splitEnd = new Object[] {i * 2 + 2}; + BinlogOffset highWatermark = new BinlogOffset("mysql-bin.00001", i + 1); + Map tableSchemas = new HashMap<>(); + MySqlSnapshotSplit sqlSnapshotSplit = + new MySqlSnapshotSplit( + tableId, + splitId, + splitKeyType, + splitStart, + splitEnd, + highWatermark, + tableSchemas); + assignedSplits.put(splitId, sqlSnapshotSplit); + splitFinishedOffsets.put(splitId, highWatermark); + } + + SnapshotPendingSplitsState snapshotPendingSplitsState = + new SnapshotPendingSplitsState( + alreadyProcessedTables, + remainingSplits, + assignedSplits, + splitFinishedOffsets, + true); + HybridPendingSplitsState checkpoint = + new HybridPendingSplitsState(snapshotPendingSplitsState, false); + final MySqlHybridSplitAssigner assigner = + new MySqlHybridSplitAssigner(configuration, checkpoint); + + // step 2. Get the MySqlBinlogSplit after all snapshot splits finished + Optional binlogSplit = assigner.getNext(); + MySqlBinlogSplit mySqlBinlogSplit = binlogSplit.get().asBinlogSplit(); + + final List finishedSnapshotSplitInfos = new ArrayList<>(); + final List assignedSnapshotSplit = + assignedSplits.values().stream() + .sorted(Comparator.comparing(MySqlSplit::splitId)) + .collect(Collectors.toList()); + for (MySqlSnapshotSplit split : assignedSnapshotSplit) { + finishedSnapshotSplitInfos.add( + new FinishedSnapshotSplitInfo( + split.getTableId(), + split.splitId(), + split.getSplitStart(), + split.getSplitEnd(), + split.getHighWatermark())); + } + + MySqlBinlogSplit expected = + new MySqlBinlogSplit( + "binlog-split", + splitKeyType, + new BinlogOffset("mysql-bin.00001", 1), + BinlogOffset.NO_STOPPING_OFFSET, + finishedSnapshotSplitInfos, + new HashMap<>()); + assertEquals(expected, mySqlBinlogSplit); + } + + private Configuration getConfig() { + Map properties = new HashMap<>(); + properties.put("database.server.name", "embedded-test"); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.whitelist", customerDatabase.getDatabaseName()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", customerDatabase.getUsername()); + properties.put("database.password", customerDatabase.getPassword()); + properties.put("database.history.skip.unparseable.ddl", "true"); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + properties.put("snapshot.mode", "initial"); + properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); + properties.put("database.history.instance.name", UUID.randomUUID().toString()); + return Configuration.fromMap(properties); + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 14ed8db5c..a61754991 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -36,9 +36,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; -import static com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -266,12 +266,11 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { properties.put("database.user", customerDatabase.getUsername()); properties.put("database.password", customerDatabase.getPassword()); properties.put("database.history.skip.unparseable.ddl", "true"); - properties.put("server-id.range", "1001,1004"); properties.put("scan.snapshot.fetch.size", "2"); properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); properties.put("snapshot.mode", "initial"); properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); - properties.put("database.history.instance.name", DATABASE_HISTORY_INSTANCE_NAME); + properties.put("database.history.instance.name", UUID.randomUUID().toString()); return Configuration.fromMap(properties); } }