From 7b5f6aab9c9a3a0c7e3cc818e581addf3820fd33 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 11 Aug 2021 10:20:09 +0800 Subject: [PATCH] [mysql] Forbid using problematic startup options earliest-offset/specific-offset/timestamp --- .../dispatcher/SignalEventDispatcher.java | 2 +- .../mysql/source/MySqlParallelSource.java | 2 +- .../mysql/table/MySqlTableSourceFactory.java | 22 ++--- .../mysql/table/MySqlConnectorITCase.java | 4 + .../table/MySqlTableSourceFactoryTest.java | 97 +++++++++---------- 5 files changed, 62 insertions(+), 65 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java index 75adc6f62..103dc07c6 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java @@ -33,7 +33,7 @@ import org.apache.kafka.connect.source.SourceRecord; * A dispatcher to dispatch watermark signal events. * *

The watermark signal event is used to describe the start point and end point of a split scan. - * The Watermark Signal Algorithms is inspired by https://arxiv.org/pdf/2010.12597v1.pdf. + * The Watermark Signal Algorithm is inspired by https://arxiv.org/pdf/2010.12597v1.pdf. */ public class SignalEventDispatcher { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java index f0313296a..2e528eb9c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java @@ -58,7 +58,7 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.getServerIdForSubTask; /** - * The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithms which supports parallel + * The MySQL CDC Source based on FLIP-27 and Watermark Signal Algorithm which supports parallel * reading snapshot of table and then continue to capture data change from binlog. * *

diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index cd10c4ec7..72e690b47 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -151,31 +151,27 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
             case SCAN_STARTUP_MODE_VALUE_INITIAL:
                 return StartupOptions.initial();
 
-            case SCAN_STARTUP_MODE_VALUE_EARLIEST:
-                return StartupOptions.earliest();
-
             case SCAN_STARTUP_MODE_VALUE_LATEST:
                 return StartupOptions.latest();
 
+            case SCAN_STARTUP_MODE_VALUE_EARLIEST:
             case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET:
-                String offsetFile = config.get(SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
-                int offsetPos = config.get(SCAN_STARTUP_SPECIFIC_OFFSET_POS);
-                return StartupOptions.specificOffset(offsetFile, offsetPos);
-
             case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
-                long millis = config.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
-                return StartupOptions.timestamp(millis);
+                throw new ValidationException(
+                        String.format(
+                                "Unsupported option value '%s', the options [%s, %s, %s] are not supported correctly, please do not use them until they're correctly supported",
+                                modeString,
+                                SCAN_STARTUP_MODE_VALUE_EARLIEST,
+                                SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
+                                SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
 
             default:
                 throw new ValidationException(
                         String.format(
-                                "Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s",
+                                "Invalid value for option '%s'. Supported values are [%s, %s], but was: %s",
                                 SCAN_STARTUP_MODE.key(),
                                 SCAN_STARTUP_MODE_VALUE_INITIAL,
-                                SCAN_STARTUP_MODE_VALUE_EARLIEST,
                                 SCAN_STARTUP_MODE_VALUE_LATEST,
-                                SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,
-                                SCAN_STARTUP_MODE_VALUE_TIMESTAMP,
                                 modeString));
         }
     }
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index c3dd123bf..99ff010d5 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.util.CloseableIterator;
 import com.ververica.cdc.connectors.mysql.MySqlTestBase;
 import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -377,6 +378,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
         result.getJobClient().get().cancel().get();
     }
 
+    @Ignore
     @Test
     public void testStartupFromSpecificOffset() throws Exception {
         if (incrementalSnapshot) {
@@ -466,6 +468,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
         result.getJobClient().get().cancel().get();
     }
 
+    @Ignore
     @Test
     public void testStartupFromEarliestOffset() throws Exception {
         if (incrementalSnapshot) {
@@ -551,6 +554,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
     }
 
     @Test
+    @Ignore
     public void testStartupFromTimestamp() throws Exception {
         if (incrementalSnapshot) {
             // not support yet
diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 599adfe57..cf17aec9b 100644
--- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -223,35 +223,26 @@ public class MySqlTableSourceFactoryTest {
 
     @Test
     public void testStartupFromSpecificOffset() {
-        final String offsetFile = "mysql-bin.000003";
-        final int offsetPos = 100203;
+        try {
+            final String offsetFile = "mysql-bin.000003";
+            final int offsetPos = 100203;
 
-        Map options = getAllOptions();
-        options.put("port", "3307");
-        options.put("server-id", "4321");
-        options.put("scan.startup.mode", "specific-offset");
-        options.put("scan.startup.specific-offset.file", offsetFile);
-        options.put("scan.startup.specific-offset.pos", String.valueOf(offsetPos));
+            Map properties = getAllOptions();
+            properties.put("port", "3307");
+            properties.put("server-id", "4321");
+            properties.put("scan.startup.mode", "specific-offset");
+            properties.put("scan.startup.specific-offset.file", offsetFile);
+            properties.put("scan.startup.specific-offset.pos", String.valueOf(offsetPos));
 
-        DynamicTableSource actualSource = createTableSource(options);
-        MySqlTableSource expectedSource =
-                new MySqlTableSource(
-                        TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
-                        3307,
-                        MY_LOCALHOST,
-                        MY_DATABASE,
-                        MY_TABLE,
-                        MY_USERNAME,
-                        MY_PASSWORD,
-                        ZoneId.of("UTC"),
-                        PROPERTIES,
-                        "4321",
-                        false,
-                        SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
-                        SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
-                        CONNECT_TIMEOUT.defaultValue(),
-                        StartupOptions.specificOffset(offsetFile, offsetPos));
-        assertEquals(expectedSource, actualSource);
+            createTableSource(properties);
+            fail("exception expected");
+        } catch (Throwable t) {
+            assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    t,
+                                    "Unsupported option value 'specific-offset', the options [earliest-offset, specific-offset, timestamp] are not supported correctly, please do not use them until they're correctly supported")
+                            .isPresent());
+        }
     }
 
     @Test
@@ -283,29 +274,35 @@ public class MySqlTableSourceFactoryTest {
 
     @Test
     public void testStartupFromEarliestOffset() {
-        Map properties = getAllOptions();
-        properties.put("scan.startup.mode", "earliest-offset");
+        try {
+            Map properties = getAllOptions();
+            properties.put("scan.startup.mode", "earliest-offset");
+            createTableSource(properties);
+            fail("exception expected");
+        } catch (Throwable t) {
+            assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    t,
+                                    "Unsupported option value 'earliest-offset', the options [earliest-offset, specific-offset, timestamp] are not supported correctly, please do not use them until they're correctly supported")
+                            .isPresent());
+        }
+    }
 
-        // validation for source
-        DynamicTableSource actualSource = createTableSource(properties);
-        MySqlTableSource expectedSource =
-                new MySqlTableSource(
-                        TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
-                        3306,
-                        MY_LOCALHOST,
-                        MY_DATABASE,
-                        MY_TABLE,
-                        MY_USERNAME,
-                        MY_PASSWORD,
-                        ZoneId.of("UTC"),
-                        PROPERTIES,
-                        null,
-                        false,
-                        SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
-                        SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
-                        CONNECT_TIMEOUT.defaultValue(),
-                        StartupOptions.earliest());
-        assertEquals(expectedSource, actualSource);
+    @Test
+    public void testStartupFromSpecificTimestamp() {
+        try {
+            Map properties = getAllOptions();
+            properties.put("scan.startup.mode", "timestamp");
+            properties.put("scan.startup.timestamp-millis", "0");
+            createTableSource(properties);
+            fail("exception expected");
+        } catch (Throwable t) {
+            assertTrue(
+                    ExceptionUtils.findThrowableWithMessage(
+                                    t,
+                                    "Unsupported option value 'timestamp', the options [earliest-offset, specific-offset, timestamp] are not supported correctly, please do not use them until they're correctly supported")
+                            .isPresent());
+        }
     }
 
     @Test
@@ -407,7 +404,7 @@ public class MySqlTableSourceFactoryTest {
         } catch (Throwable t) {
             String msg =
                     "Invalid value for option 'scan.startup.mode'. Supported values are "
-                            + "[initial, earliest-offset, latest-offset, specific-offset, timestamp], "
+                            + "[initial, latest-offset], "
                             + "but was: abc";
             assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent());
         }