[mysql] Forbid using problematic startup options earliest-offset/specific-offset/timestamp

pull/303/head
Leonard Xu 4 years ago committed by Leonard Xu
parent 2f644007e2
commit 7b5f6aab9c

@ -33,7 +33,7 @@ import org.apache.kafka.connect.source.SourceRecord;
* A dispatcher to dispatch watermark signal events. * A dispatcher to dispatch watermark signal events.
* *
* <p>The watermark signal event is used to describe the start point and end point of a split scan. * <p>The watermark signal event is used to describe the start point and end point of a split scan.
* The Watermark Signal Algorithms is inspired by https://arxiv.org/pdf/2010.12597v1.pdf. * The Watermark Signal Algorithm is inspired by https://arxiv.org/pdf/2010.12597v1.pdf.
*/ */
public class SignalEventDispatcher { public class SignalEventDispatcher {

@ -58,7 +58,7 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.getServerIdForSubTask; import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.getServerIdForSubTask;
/** /**
* The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithms which supports parallel * The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel
* reading snapshot of table and then continue to capture data change from binlog. * reading snapshot of table and then continue to capture data change from binlog.
* *
* <pre> * <pre>

@ -151,31 +151,27 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
case SCAN_STARTUP_MODE_VALUE_INITIAL: case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial(); return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_EARLIEST:
return StartupOptions.earliest();
case SCAN_STARTUP_MODE_VALUE_LATEST: case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest(); return StartupOptions.latest();
case SCAN_STARTUP_MODE_VALUE_EARLIEST:
case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET: case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
String offsetFile = config.get(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
int offsetPos = config.get(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
return StartupOptions.specificOffset(offsetFile, offsetPos);
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP: case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
long millis = config.get(SCAN_STARTUP_TIMESTAMP_MILLIS); throw new ValidationException(
return StartupOptions.timestamp(millis); String.format(
"Unsupported option value '%s', the options [%s, %s, %s] are not supported correctly, please do not use them until they're correctly supported",
modeString,
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
default: default:
throw new ValidationException( throw new ValidationException(
String.format( String.format(
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s", "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
SCAN_STARTUP_MODE.key(), SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL, SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_LATEST, SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
SCAN_STARTUP_MODE_VALUE_TIMESTAMP,
modeString)); modeString));
} }
} }

@ -30,6 +30,7 @@ import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.mysql.MySqlTestBase; import com.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -377,6 +378,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@Ignore
@Test @Test
public void testStartupFromSpecificOffset() throws Exception { public void testStartupFromSpecificOffset() throws Exception {
if (incrementalSnapshot) { if (incrementalSnapshot) {
@ -466,6 +468,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@Ignore
@Test @Test
public void testStartupFromEarliestOffset() throws Exception { public void testStartupFromEarliestOffset() throws Exception {
if (incrementalSnapshot) { if (incrementalSnapshot) {
@ -551,6 +554,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
} }
@Test @Test
@Ignore
public void testStartupFromTimestamp() throws Exception { public void testStartupFromTimestamp() throws Exception {
if (incrementalSnapshot) { if (incrementalSnapshot) {
// not support yet // not support yet

@ -223,35 +223,26 @@ public class MySqlTableSourceFactoryTest {
@Test @Test
public void testStartupFromSpecificOffset() { public void testStartupFromSpecificOffset() {
try {
final String offsetFile = "mysql-bin.000003"; final String offsetFile = "mysql-bin.000003";
final int offsetPos = 100203; final int offsetPos = 100203;
Map<String, String> options = getAllOptions(); Map<String, String> properties = getAllOptions();
options.put("port", "3307"); properties.put("port", "3307");
options.put("server-id", "4321"); properties.put("server-id", "4321");
options.put("scan.startup.mode", "specific-offset"); properties.put("scan.startup.mode", "specific-offset");
options.put("scan.startup.specific-offset.file", offsetFile); properties.put("scan.startup.specific-offset.file", offsetFile);
options.put("scan.startup.specific-offset.pos", String.valueOf(offsetPos)); properties.put("scan.startup.specific-offset.pos", String.valueOf(offsetPos));
DynamicTableSource actualSource = createTableSource(options); createTableSource(properties);
MySqlTableSource expectedSource = fail("exception expected");
new MySqlTableSource( } catch (Throwable t) {
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), assertTrue(
3307, ExceptionUtils.findThrowableWithMessage(
MY_LOCALHOST, t,
MY_DATABASE, "Unsupported option value 'specific-offset', the options [earliest-offset, specific-offset, timestamp] are not supported correctly, please do not use them until they're correctly supported")
MY_TABLE, .isPresent());
MY_USERNAME, }
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
"4321",
false,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.specificOffset(offsetFile, offsetPos));
assertEquals(expectedSource, actualSource);
} }
@Test @Test
@ -283,29 +274,35 @@ public class MySqlTableSourceFactoryTest {
@Test @Test
public void testStartupFromEarliestOffset() { public void testStartupFromEarliestOffset() {
try {
Map<String, String> properties = getAllOptions(); Map<String, String> properties = getAllOptions();
properties.put("scan.startup.mode", "earliest-offset"); properties.put("scan.startup.mode", "earliest-offset");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
ExceptionUtils.findThrowableWithMessage(
t,
"Unsupported option value 'earliest-offset', the options [earliest-offset, specific-offset, timestamp] are not supported correctly, please do not use them until they're correctly supported")
.isPresent());
}
}
// validation for source @Test
DynamicTableSource actualSource = createTableSource(properties); public void testStartupFromSpecificTimestamp() {
MySqlTableSource expectedSource = try {
new MySqlTableSource( Map<String, String> properties = getAllOptions();
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), properties.put("scan.startup.mode", "timestamp");
3306, properties.put("scan.startup.timestamp-millis", "0");
MY_LOCALHOST, createTableSource(properties);
MY_DATABASE, fail("exception expected");
MY_TABLE, } catch (Throwable t) {
MY_USERNAME, assertTrue(
MY_PASSWORD, ExceptionUtils.findThrowableWithMessage(
ZoneId.of("UTC"), t,
PROPERTIES, "Unsupported option value 'timestamp', the options [earliest-offset, specific-offset, timestamp] are not supported correctly, please do not use them until they're correctly supported")
null, .isPresent());
false, }
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
CONNECT_TIMEOUT.defaultValue(),
StartupOptions.earliest());
assertEquals(expectedSource, actualSource);
} }
@Test @Test
@ -407,7 +404,7 @@ public class MySqlTableSourceFactoryTest {
} catch (Throwable t) { } catch (Throwable t) {
String msg = String msg =
"Invalid value for option 'scan.startup.mode'. Supported values are " "Invalid value for option 'scan.startup.mode'. Supported values are "
+ "[initial, earliest-offset, latest-offset, specific-offset, timestamp], " + "[initial, latest-offset], "
+ "but was: abc"; + "but was: abc";
assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent()); assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent());
} }

Loading…
Cancel
Save