diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java index e67fe5462..126b58c3a 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java @@ -18,7 +18,6 @@ package com.alibaba.ververica.cdc.connectors.mysql; -import com.alibaba.ververica.cdc.connectors.mysql.table.StartupMode; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; @@ -182,24 +181,39 @@ public class MySQLSource { props.setProperty("database.serverTimezone", serverTimeZone); } - final DebeziumOffset specificOffset; - if (startupOptions.startupMode == StartupMode.SPECIFIC_OFFSETS) { - // if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must be configured - // 'schema_only_recovery' only snapshots the schemas, not the data, - // and continue binlog reading from the restored offset - props.setProperty("snapshot.mode", "schema_only_recovery"); - - specificOffset = new DebeziumOffset(); - Map sourcePartition = new HashMap<>(); - sourcePartition.put("server", DATABASE_SERVER_NAME); - specificOffset.setSourcePartition(sourcePartition); - - Map sourceOffset = new HashMap<>(); - sourceOffset.put("file", startupOptions.specificOffsetFile); - sourceOffset.put("pos", startupOptions.specificOffsetPos); - specificOffset.setSourceOffset(sourceOffset); - } else { - specificOffset = null; + DebeziumOffset specificOffset = null; + switch (startupOptions.startupMode) { + case INITIAL: + props.setProperty("snapshot.mode", "initial"); + break; + + case EARLIEST_OFFSET: + props.setProperty("snapshot.mode", "never"); + break; + + case LATEST_OFFSET: + props.setProperty("snapshot.mode", "schema_only"); + break; + + case SPECIFIC_OFFSETS: + // if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must + // be configured. It only snapshots the schemas, not the data, + // and continue binlog reading from the specified offset + props.setProperty("snapshot.mode", "schema_only_recovery"); + + specificOffset = new DebeziumOffset(); + Map sourcePartition = new HashMap<>(); + sourcePartition.put("server", DATABASE_SERVER_NAME); + specificOffset.setSourcePartition(sourcePartition); + + Map sourceOffset = new HashMap<>(); + sourceOffset.put("file", startupOptions.specificOffsetFile); + sourceOffset.put("pos", startupOptions.specificOffsetPos); + specificOffset.setSourceOffset(sourceOffset); + break; + + default: + throw new UnsupportedOperationException(); } if (dbzProperties != null) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLConnectorITCase.java index 00e4df5fa..2284b5df9 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLConnectorITCase.java @@ -347,6 +347,133 @@ public class MySQLConnectorITCase extends MySQLTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testStartupFromEarliestOffset() throws Exception { + inventoryDatabase.createAndInitialize(); + String sourceDDL = String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = 'earliest-offset'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + inventoryDatabase.getUsername(), + inventoryDatabase.getPassword(), + inventoryDatabase.getDatabaseName(), + "products"); + String sinkDDL = "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + + waitForSinkSize("sink", 20); + + String[] expected = new String[]{ + "101,scooter,Small 2-wheel scooter,3.140", + "102,car battery,12V car battery,8.100", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800", + "104,hammer,12oz carpenter's hammer,0.750", + "105,hammer,14oz carpenter's hammer,0.875", + "106,hammer,18oz carpenter hammer,1.000", + "107,rocks,box of assorted rocks,5.100", + "108,jacket,water resistent black wind breaker,0.100", + "109,spare tire,24 inch spare tire,22.200", + "110,jacket,new water resistent white wind breaker,0.500" + }; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + + @Test + public void testStartupFromLatestOffset() throws Exception { + inventoryDatabase.createAndInitialize(); + String sourceDDL = String.format( + "CREATE TABLE debezium_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.startup.mode' = 'latest-offset'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + inventoryDatabase.getUsername(), + inventoryDatabase.getPassword(), + inventoryDatabase.getDatabaseName(), + "products"); + String sinkDDL = "CREATE TABLE sink " + + " WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ") LIKE debezium_source (EXCLUDING OPTIONS)"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + // wait for the source startup, we don't have a better way to wait it, use sleep for now + Thread.sleep(5000L); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + waitForSinkSize("sink", 7); + + String[] expected = new String[]{"110,jacket,new water resistent white wind breaker,0.500"}; + + List actual = TestValuesTableFactory.getResults("sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + // ------------------------------------------------------------------------------------ private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java index 78745701f..5fdd1cedf 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java @@ -139,6 +139,75 @@ public class MySQLTableSourceFactoryTest { assertEquals(expectedSource, actualSource); } + @Test + public void testStartupFromInitial() { + Map properties = getAllOptions(); + properties.put("scan.startup.mode", "initial"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + MySQLTableSource expectedSource = new MySQLTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + null, + StartupOptions.initial() + ); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testStartupFromEarliestOffset() { + Map properties = getAllOptions(); + properties.put("scan.startup.mode", "earliest-offset"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + MySQLTableSource expectedSource = new MySQLTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + null, + StartupOptions.earliest() + ); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testStartupFromLatestOffset() { + Map properties = getAllOptions(); + properties.put("scan.startup.mode", "latest-offset"); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + MySQLTableSource expectedSource = new MySQLTableSource( + TableSchemaUtils.getPhysicalSchema(SCHEMA), + 3306, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + ZoneId.of("UTC"), + PROPERTIES, + null, + StartupOptions.latest() + ); + assertEquals(expectedSource, actualSource); + } + @Test public void testValidation() { // validate illegal port