[mysql] Support "initial", "earliest-offset", "latest-offset" startup mode options

release-1.2
Jark Wu 4 years ago
parent c47526737e
commit 5fcfe8ca05
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -18,7 +18,6 @@
package com.alibaba.ververica.cdc.connectors.mysql; package com.alibaba.ververica.cdc.connectors.mysql;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupMode;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
@ -182,24 +181,39 @@ public class MySQLSource {
props.setProperty("database.serverTimezone", serverTimeZone); props.setProperty("database.serverTimezone", serverTimeZone);
} }
final DebeziumOffset specificOffset; DebeziumOffset specificOffset = null;
if (startupOptions.startupMode == StartupMode.SPECIFIC_OFFSETS) { switch (startupOptions.startupMode) {
// if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must be configured case INITIAL:
// 'schema_only_recovery' only snapshots the schemas, not the data, props.setProperty("snapshot.mode", "initial");
// and continue binlog reading from the restored offset break;
props.setProperty("snapshot.mode", "schema_only_recovery");
case EARLIEST_OFFSET:
specificOffset = new DebeziumOffset(); props.setProperty("snapshot.mode", "never");
Map<String, String> sourcePartition = new HashMap<>(); break;
sourcePartition.put("server", DATABASE_SERVER_NAME);
specificOffset.setSourcePartition(sourcePartition); case LATEST_OFFSET:
props.setProperty("snapshot.mode", "schema_only");
Map<String, Object> sourceOffset = new HashMap<>(); break;
sourceOffset.put("file", startupOptions.specificOffsetFile);
sourceOffset.put("pos", startupOptions.specificOffsetPos); case SPECIFIC_OFFSETS:
specificOffset.setSourceOffset(sourceOffset); // if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must
} else { // be configured. It only snapshots the schemas, not the data,
specificOffset = null; // and continue binlog reading from the specified offset
props.setProperty("snapshot.mode", "schema_only_recovery");
specificOffset = new DebeziumOffset();
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("server", DATABASE_SERVER_NAME);
specificOffset.setSourcePartition(sourcePartition);
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("file", startupOptions.specificOffsetFile);
sourceOffset.put("pos", startupOptions.specificOffsetPos);
specificOffset.setSourceOffset(sourceOffset);
break;
default:
throw new UnsupportedOperationException();
} }
if (dbzProperties != null) { if (dbzProperties != null) {

@ -347,6 +347,133 @@ public class MySQLConnectorITCase extends MySQLTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@Test
public void testStartupFromEarliestOffset() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight DECIMAL(10,3)" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = '%s'," +
" 'port' = '%s'," +
" 'username' = '%s'," +
" 'password' = '%s'," +
" 'database-name' = '%s'," +
" 'table-name' = '%s'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"products");
String sinkDDL = "CREATE TABLE sink " +
" WITH (" +
" 'connector' = 'values'," +
" 'sink-insert-only' = 'false'" +
") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
waitForSinkSize("sink", 20);
String[] expected = new String[]{
"101,scooter,Small 2-wheel scooter,3.140",
"102,car battery,12V car battery,8.100",
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800",
"104,hammer,12oz carpenter's hammer,0.750",
"105,hammer,14oz carpenter's hammer,0.875",
"106,hammer,18oz carpenter hammer,1.000",
"107,rocks,box of assorted rocks,5.100",
"108,jacket,water resistent black wind breaker,0.100",
"109,spare tire,24 inch spare tire,22.200",
"110,jacket,new water resistent white wind breaker,0.500"
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight DECIMAL(10,3)" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = '%s'," +
" 'port' = '%s'," +
" 'username' = '%s'," +
" 'password' = '%s'," +
" 'database-name' = '%s'," +
" 'table-name' = '%s'," +
" 'scan.startup.mode' = 'latest-offset'" +
")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"products");
String sinkDDL = "CREATE TABLE sink " +
" WITH (" +
" 'connector' = 'values'," +
" 'sink-insert-only' = 'false'" +
") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(5000L);
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
waitForSinkSize("sink", 7);
String[] expected = new String[]{"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {

@ -139,6 +139,75 @@ public class MySQLTableSourceFactoryTest {
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);
} }
@Test
public void testStartupFromInitial() {
Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "initial");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource = new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
3306,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
null,
StartupOptions.initial()
);
assertEquals(expectedSource, actualSource);
}
@Test
public void testStartupFromEarliestOffset() {
Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "earliest-offset");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource = new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
3306,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
null,
StartupOptions.earliest()
);
assertEquals(expectedSource, actualSource);
}
@Test
public void testStartupFromLatestOffset() {
Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "latest-offset");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource = new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
3306,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
null,
StartupOptions.latest()
);
assertEquals(expectedSource, actualSource);
}
@Test @Test
public void testValidation() { public void testValidation() {
// validate illegal port // validate illegal port

Loading…
Cancel
Save