From 27001d8421f9d00a1972c0182b4ac0b5e78d99d5 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 15 Sep 2021 17:42:23 +0800 Subject: [PATCH] [mysql] Use min/max of split key and row count to describe the distribution of table this closes #392 --- .../mysql/source/assigners/ChunkSplitter.java | 77 +++++++++++-- .../mysql/source/utils/ObjectUtils.java | 26 +++++ .../mysql/source/utils/StatementUtils.java | 20 ++++ .../MySqlSnapshotSplitAssignerTest.java | 18 +++ .../mysql/table/MySqlConnectorITCase.java | 104 +++++++++++++++--- .../src/test/resources/ddl/customer.sql | 18 +++ 6 files changed, 236 insertions(+), 27 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index dad4ccaf7..a31ecfe7f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -36,6 +36,7 @@ import io.debezium.relational.history.TableChanges.TableChange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; import java.sql.SQLException; import java.time.Duration; import java.util.ArrayList; @@ -46,9 +47,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryApproximateRowCnt; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMin; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMinMax; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryNextChunkMax; +import static java.math.BigDecimal.ROUND_CEILING; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; @@ -59,6 +62,12 @@ import static org.apache.flink.table.api.DataTypes.ROW; class ChunkSplitter { private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class); + /** + * The maximum evenly distribution factor used to judge the data in table is evenly distributed + * or not, the factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount. + */ + public static final Double MAX_EVENLY_DISTRIBUTION_FACTOR = 2.0d; + private final MySqlConnection jdbc; private final MySqlSchema mySqlSchema; private final int chunkSize; @@ -128,7 +137,7 @@ class ChunkSplitter { } final List chunks; - if (splitColumnEvenlyDistributed(splitColumn)) { + if (isSplitColumnEvenlyDistributed(jdbc, tableId, splitColumn, min, max, chunkSize)) { // use evenly-sized chunks which is much efficient chunks = splitEvenlySizedChunks(min, max); } else { @@ -224,19 +233,63 @@ class ChunkSplitter { // ------------------------------------------------------------------------------------------ /** Checks whether split column is evenly distributed across its range. */ - private static boolean splitColumnEvenlyDistributed(Column splitColumn) { - // only column is auto-incremental are recognized as evenly distributed. - // TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future. - if (splitColumn.isAutoIncremented()) { - DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); - LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); - // currently, we only support split column with type BIGINT, INT, DECIMAL - return typeRoot == LogicalTypeRoot.BIGINT - || typeRoot == LogicalTypeRoot.INTEGER - || typeRoot == LogicalTypeRoot.DECIMAL; - } else { + private static boolean isSplitColumnEvenlyDistributed( + MySqlConnection jdbc, + TableId tableId, + Column splitColumn, + Object min, + Object max, + int chunkSize) + throws SQLException { + DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); + LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); + + // currently, we only support the optimization that split column with type BIGINT, INT, + // DECIMAL + if (!(typeRoot == LogicalTypeRoot.BIGINT + || typeRoot == LogicalTypeRoot.INTEGER + || typeRoot == LogicalTypeRoot.DECIMAL)) { return false; } + + if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) { + return true; + } + + // only column is numeric and evenly distribution factor is less than + // MAX_EVENLY_DISTRIBUTION_FACTOR will be treated as evenly distributed. + final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); + final double evenlyDistributionFactor = + calculateEvenlyDistributionFactor(min, max, approximateRowCnt); + LOG.info( + "The evenly distribution factor for table {} is {}", + tableId, + evenlyDistributionFactor); + return evenlyDistributionFactor <= MAX_EVENLY_DISTRIBUTION_FACTOR; + } + + /** + * Returns the evenly distribution factor of the table data. + * + * @param min the min value of the split column + * @param max the max value of the split column + * @param approximateRowCnt the approximate row count of the table. + */ + private static double calculateEvenlyDistributionFactor( + Object min, Object max, long approximateRowCnt) { + if (!min.getClass().equals(max.getClass())) { + throw new IllegalStateException( + String.format( + "Unsupported operation type, the MIN value type %s is different with MAX value type %s.", + min.getClass().getSimpleName(), max.getClass().getSimpleName())); + } + if (approximateRowCnt == 0) { + return Double.MAX_VALUE; + } + BigDecimal difference = ObjectUtils.minus(max, min); + // factor = max - min + 1 / rowCount + final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); + return subRowCnt.divide(new BigDecimal(approximateRowCnt), 2, ROUND_CEILING).doubleValue(); } private static String splitId(TableId tableId, int chunkId) { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java index 44eb6eb3a..2d26b76c9 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ObjectUtils.java @@ -42,6 +42,32 @@ public class ObjectUtils { } } + /** Returns the difference {@code BigDecimal} whose value is {@code (minuend - subtrahend)}. */ + public static BigDecimal minus(Object minuend, Object subtrahend) { + if (!minuend.getClass().equals(subtrahend.getClass())) { + throw new IllegalStateException( + String.format( + "Unsupported operand type, the minuend type %s is different with subtrahend type %s.", + minuend.getClass().getSimpleName(), + subtrahend.getClass().getSimpleName())); + } + if (minuend instanceof Integer) { + return BigDecimal.valueOf((int) minuend - (int) subtrahend); + } else if (minuend instanceof Long) { + return BigDecimal.valueOf((long) minuend - (long) subtrahend); + } else if (minuend instanceof BigInteger) { + return new BigDecimal( + ((BigInteger) minuend).subtract((BigInteger) subtrahend).toString()); + } else if (minuend instanceof BigDecimal) { + return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend); + } else { + throw new UnsupportedOperationException( + String.format( + "Unsupported type %s for numeric minus.", + minuend.getClass().getSimpleName())); + } + } + /** * Compares two comparable objects. * diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java index 8580ea008..1824df031 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -58,6 +58,26 @@ public class StatementUtils { }); } + public static long queryApproximateRowCnt(MySqlConnection jdbc, TableId tableId) + throws SQLException { + // The statement used to get approximate row count which is less + // accurate than COUNT(*), but is more efficient for large table. + final String useDatabaseStatement = String.format("USE %s;", quote(tableId.catalog())); + final String rowCountQuery = String.format("SHOW TABLE STATUS LIKE '%s';", tableId.table()); + jdbc.executeWithoutCommitting(useDatabaseStatement); + return jdbc.queryAndMap( + rowCountQuery, + rs -> { + if (!rs.next() || rs.getMetaData().getColumnCount() < 5) { + throw new SQLException( + String.format( + "No result returned after running query [%s]", + rowCountQuery)); + } + return rs.getLong(5); + }); + } + public static Object queryMin( MySqlConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 558bd5cc8..6ace49d20 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -72,6 +72,13 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { assertEquals(expected, splits); } + @Test + public void testAssignTableWhoseRowCntLessSplitSize() { + List expected = Arrays.asList("customers null null"); + List splits = getTestAssignSnapshotSplits(2000, new String[] {"customers"}); + assertEquals(expected, splits); + } + @Test public void testAssignMultipleTableSplits() { List expected = @@ -103,6 +110,17 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { assertEquals(expected, splits); } + @Test + public void testAssignSnapshotSplitsWithRandomPrimaryKey() { + List expected = + Arrays.asList( + "address null [417111867899200427]", + "address [417111867899200427] [417420106184475563]", + "address [417420106184475563] null"); + List splits = getTestAssignSnapshotSplits(4, new String[] {"address"}); + assertEquals(expected, splits); + } + @Test public void testAssignSnapshotSplitsWithDecimalKey() { List expected = diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 10e72d8ab..99cdf8394 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -51,11 +51,17 @@ import static org.junit.Assert.assertThat; @RunWith(Parameterized.class) public class MySqlConnectorITCase extends MySqlTestBase { + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; + private final UniqueDatabase inventoryDatabase = - new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); + new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); private final UniqueDatabase fullTypesDatabase = - new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw"); + new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER, TEST_PASSWORD); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -122,8 +128,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), + TEST_USER, + TEST_PASSWORD, inventoryDatabase.getDatabaseName(), "products", getDezImplementation(), @@ -239,8 +245,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), + TEST_USER, + TEST_PASSWORD, inventoryDatabase.getDatabaseName(), "products", getDezImplementation(), @@ -391,7 +397,7 @@ public class MySqlConnectorITCase extends MySqlTestBase { "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]" }; - assertThat(fetchRows(result.collect(), 3), containsInAnyOrder(expected)); + assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected)); result.getJobClient().get().cancel().get(); } @@ -421,8 +427,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), + TEST_USER, + TEST_PASSWORD, inventoryDatabase.getDatabaseName(), "products", incrementalSnapshot, @@ -466,6 +472,74 @@ public class MySqlConnectorITCase extends MySqlTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception { + customerDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE address (" + + " `id` DECIMAL(20, 0) NOT NULL," + + " country STRING," + + " city STRING," + + " detail_address STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.internal.implementation' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + customerDatabase.getUsername(), + customerDatabase.getPassword(), + customerDatabase.getDatabaseName(), + "address", + getDezImplementation(), + incrementalSnapshot, + getServerId(), + getSplitSize()); + tEnv.executeSql(sourceDDL); + // async submit job + TableResult result = + tEnv.executeSql( + "SELECT id,\n" + "country,\n" + "city,\n" + "detail_address FROM address"); + + CloseableIterator iterator = result.collect(); + waitForSnapshotStarted(iterator); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE address SET city = 'Hangzhou' WHERE id=416927583791428523;"); + statement.execute( + "INSERT INTO address VALUES(418257940021724075, 'Germany', 'Berlin', 'West Town address 3')"); + } + + String[] expected = + new String[] { + "+I[417271541558096811, America, New York, East Town address 2]", + "+I[417272886855938987, America, New York, East Town address 3]", + "+I[417111867899200427, America, New York, East Town address 1]", + "+I[417420106184475563, Germany, Berlin, West Town address 1]", + "+I[418161258277847979, Germany, Berlin, West Town address 2]", + "+I[416874195632735147, China, Beijing, West Town address 1]", + "+I[416927583791428523, China, Beijing, West Town address 2]", + "+I[417022095255614379, China, Beijing, West Town address 3]", + "-U[416927583791428523, China, Beijing, West Town address 2]", + "+U[416927583791428523, China, Hangzhou, West Town address 2]", + "+I[418257940021724075, Germany, Berlin, West Town address 3]", + }; + assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected)); + result.getJobClient().get().cancel().get(); + } + @Ignore @Test public void testStartupFromSpecificOffset() throws Exception { @@ -507,8 +581,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), + TEST_USER, + TEST_PASSWORD, inventoryDatabase.getDatabaseName(), "products", offset.f0, @@ -585,8 +659,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), + TEST_USER, + TEST_PASSWORD, inventoryDatabase.getDatabaseName(), "products", incrementalSnapshot, @@ -671,8 +745,8 @@ public class MySqlConnectorITCase extends MySqlTestBase { + ")", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getDatabasePort(), - inventoryDatabase.getUsername(), - inventoryDatabase.getPassword(), + TEST_USER, + TEST_PASSWORD, inventoryDatabase.getDatabaseName(), "products", System.currentTimeMillis(), diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index fb63e3315..3234b6c01 100644 --- a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -172,3 +172,21 @@ insert into shopping_cart_dec VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'), (124456.456, 'KIND_002', 'user_1', 'my shopping cart'), (125489.6789, 'KIND_003', 'user_1', 'my shopping cart'); + +-- create table whose primary key are produced by snowflake algorithm +CREATE TABLE address ( + id BIGINT UNSIGNED NOT NULL PRIMARY KEY, + country VARCHAR(255) NOT NULL, + city VARCHAR(255) NOT NULL, + detail_address VARCHAR(1024) +); + +INSERT INTO address +VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'), + (416927583791428523, 'China', 'Beijing', 'West Town address 2'), + (417022095255614379, 'China', 'Beijing', 'West Town address 3'), + (417111867899200427, 'America', 'New York', 'East Town address 1'), + (417271541558096811, 'America', 'New York', 'East Town address 2'), + (417272886855938987, 'America', 'New York', 'East Town address 3'), + (417420106184475563, 'Germany', 'Berlin', 'West Town address 1'), + (418161258277847979, 'Germany', 'Berlin', 'West Town address 2');