[mysql] Use min/max of split key and row count to describe the distribution of table

this closes  #392
pull/459/head
Leonard Xu 4 years ago
parent 851dcc66b4
commit 27001d8421

@ -36,6 +36,7 @@ import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -46,9 +47,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryApproximateRowCnt;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMin; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMin;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMinMax; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryMinMax;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryNextChunkMax; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.queryNextChunkMax;
import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW; import static org.apache.flink.table.api.DataTypes.ROW;
@ -59,6 +62,12 @@ import static org.apache.flink.table.api.DataTypes.ROW;
class ChunkSplitter { class ChunkSplitter {
private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class); private static final Logger LOG = LoggerFactory.getLogger(ChunkSplitter.class);
/**
* The maximum evenly distribution factor used to judge the data in table is evenly distributed
* or not, the factor could be calculated by MAX(id) - MIN(id) + 1 / rowCount.
*/
public static final Double MAX_EVENLY_DISTRIBUTION_FACTOR = 2.0d;
private final MySqlConnection jdbc; private final MySqlConnection jdbc;
private final MySqlSchema mySqlSchema; private final MySqlSchema mySqlSchema;
private final int chunkSize; private final int chunkSize;
@ -128,7 +137,7 @@ class ChunkSplitter {
} }
final List<ChunkRange> chunks; final List<ChunkRange> chunks;
if (splitColumnEvenlyDistributed(splitColumn)) { if (isSplitColumnEvenlyDistributed(jdbc, tableId, splitColumn, min, max, chunkSize)) {
// use evenly-sized chunks which is much efficient // use evenly-sized chunks which is much efficient
chunks = splitEvenlySizedChunks(min, max); chunks = splitEvenlySizedChunks(min, max);
} else { } else {
@ -224,19 +233,63 @@ class ChunkSplitter {
// ------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------
/** Checks whether split column is evenly distributed across its range. */ /** Checks whether split column is evenly distributed across its range. */
private static boolean splitColumnEvenlyDistributed(Column splitColumn) { private static boolean isSplitColumnEvenlyDistributed(
// only column is auto-incremental are recognized as evenly distributed. MySqlConnection jdbc,
// TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future. TableId tableId,
if (splitColumn.isAutoIncremented()) { Column splitColumn,
DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); Object min,
LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); Object max,
// currently, we only support split column with type BIGINT, INT, DECIMAL int chunkSize)
return typeRoot == LogicalTypeRoot.BIGINT throws SQLException {
|| typeRoot == LogicalTypeRoot.INTEGER DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn);
|| typeRoot == LogicalTypeRoot.DECIMAL; LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot();
} else {
// currently, we only support the optimization that split column with type BIGINT, INT,
// DECIMAL
if (!(typeRoot == LogicalTypeRoot.BIGINT
|| typeRoot == LogicalTypeRoot.INTEGER
|| typeRoot == LogicalTypeRoot.DECIMAL)) {
return false; return false;
} }
if (ObjectUtils.minus(max, min).compareTo(BigDecimal.valueOf(chunkSize)) <= 0) {
return true;
}
// only column is numeric and evenly distribution factor is less than
// MAX_EVENLY_DISTRIBUTION_FACTOR will be treated as evenly distributed.
final long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
final double evenlyDistributionFactor =
calculateEvenlyDistributionFactor(min, max, approximateRowCnt);
LOG.info(
"The evenly distribution factor for table {} is {}",
tableId,
evenlyDistributionFactor);
return evenlyDistributionFactor <= MAX_EVENLY_DISTRIBUTION_FACTOR;
}
/**
* Returns the evenly distribution factor of the table data.
*
* @param min the min value of the split column
* @param max the max value of the split column
* @param approximateRowCnt the approximate row count of the table.
*/
private static double calculateEvenlyDistributionFactor(
Object min, Object max, long approximateRowCnt) {
if (!min.getClass().equals(max.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operation type, the MIN value type %s is different with MAX value type %s.",
min.getClass().getSimpleName(), max.getClass().getSimpleName()));
}
if (approximateRowCnt == 0) {
return Double.MAX_VALUE;
}
BigDecimal difference = ObjectUtils.minus(max, min);
// factor = max - min + 1 / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
return subRowCnt.divide(new BigDecimal(approximateRowCnt), 2, ROUND_CEILING).doubleValue();
} }
private static String splitId(TableId tableId, int chunkId) { private static String splitId(TableId tableId, int chunkId) {

@ -42,6 +42,32 @@ public class ObjectUtils {
} }
} }
/** Returns the difference {@code BigDecimal} whose value is {@code (minuend - subtrahend)}. */
public static BigDecimal minus(Object minuend, Object subtrahend) {
if (!minuend.getClass().equals(subtrahend.getClass())) {
throw new IllegalStateException(
String.format(
"Unsupported operand type, the minuend type %s is different with subtrahend type %s.",
minuend.getClass().getSimpleName(),
subtrahend.getClass().getSimpleName()));
}
if (minuend instanceof Integer) {
return BigDecimal.valueOf((int) minuend - (int) subtrahend);
} else if (minuend instanceof Long) {
return BigDecimal.valueOf((long) minuend - (long) subtrahend);
} else if (minuend instanceof BigInteger) {
return new BigDecimal(
((BigInteger) minuend).subtract((BigInteger) subtrahend).toString());
} else if (minuend instanceof BigDecimal) {
return ((BigDecimal) minuend).subtract((BigDecimal) subtrahend);
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported type %s for numeric minus.",
minuend.getClass().getSimpleName()));
}
}
/** /**
* Compares two comparable objects. * Compares two comparable objects.
* *

@ -58,6 +58,26 @@ public class StatementUtils {
}); });
} }
public static long queryApproximateRowCnt(MySqlConnection jdbc, TableId tableId)
throws SQLException {
// The statement used to get approximate row count which is less
// accurate than COUNT(*), but is more efficient for large table.
final String useDatabaseStatement = String.format("USE %s;", quote(tableId.catalog()));
final String rowCountQuery = String.format("SHOW TABLE STATUS LIKE '%s';", tableId.table());
jdbc.executeWithoutCommitting(useDatabaseStatement);
return jdbc.queryAndMap(
rowCountQuery,
rs -> {
if (!rs.next() || rs.getMetaData().getColumnCount() < 5) {
throw new SQLException(
String.format(
"No result returned after running query [%s]",
rowCountQuery));
}
return rs.getLong(5);
});
}
public static Object queryMin( public static Object queryMin(
MySqlConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) MySqlConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException { throws SQLException {

@ -72,6 +72,13 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
assertEquals(expected, splits); assertEquals(expected, splits);
} }
@Test
public void testAssignTableWhoseRowCntLessSplitSize() {
List<String> expected = Arrays.asList("customers null null");
List<String> splits = getTestAssignSnapshotSplits(2000, new String[] {"customers"});
assertEquals(expected, splits);
}
@Test @Test
public void testAssignMultipleTableSplits() { public void testAssignMultipleTableSplits() {
List<String> expected = List<String> expected =
@ -103,6 +110,17 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
assertEquals(expected, splits); assertEquals(expected, splits);
} }
@Test
public void testAssignSnapshotSplitsWithRandomPrimaryKey() {
List<String> expected =
Arrays.asList(
"address null [417111867899200427]",
"address [417111867899200427] [417420106184475563]",
"address [417420106184475563] null");
List<String> splits = getTestAssignSnapshotSplits(4, new String[] {"address"});
assertEquals(expected, splits);
}
@Test @Test
public void testAssignSnapshotSplitsWithDecimalKey() { public void testAssignSnapshotSplitsWithDecimalKey() {
List<String> expected = List<String> expected =

@ -51,11 +51,17 @@ import static org.junit.Assert.assertThat;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class MySqlConnectorITCase extends MySqlTestBase { public class MySqlConnectorITCase extends MySqlTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
private final UniqueDatabase inventoryDatabase = private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);
private final UniqueDatabase fullTypesDatabase = private final UniqueDatabase fullTypesDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", "mysqluser", "mysqlpw"); new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER, TEST_PASSWORD);
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
private final StreamExecutionEnvironment env = private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment.getExecutionEnvironment();
@ -122,8 +128,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(), TEST_USER,
inventoryDatabase.getPassword(), TEST_PASSWORD,
inventoryDatabase.getDatabaseName(), inventoryDatabase.getDatabaseName(),
"products", "products",
getDezImplementation(), getDezImplementation(),
@ -239,8 +245,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(), TEST_USER,
inventoryDatabase.getPassword(), TEST_PASSWORD,
inventoryDatabase.getDatabaseName(), inventoryDatabase.getDatabaseName(),
"products", "products",
getDezImplementation(), getDezImplementation(),
@ -391,7 +397,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]" "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]"
}; };
assertThat(fetchRows(result.collect(), 3), containsInAnyOrder(expected)); assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -421,8 +427,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(), TEST_USER,
inventoryDatabase.getPassword(), TEST_PASSWORD,
inventoryDatabase.getDatabaseName(), inventoryDatabase.getDatabaseName(),
"products", "products",
incrementalSnapshot, incrementalSnapshot,
@ -466,6 +472,74 @@ public class MySqlConnectorITCase extends MySqlTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@Test
public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception {
customerDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE address ("
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customerDatabase.getUsername(),
customerDatabase.getPassword(),
customerDatabase.getDatabaseName(),
"address",
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"SELECT id,\n" + "country,\n" + "city,\n" + "detail_address FROM address");
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE address SET city = 'Hangzhou' WHERE id=416927583791428523;");
statement.execute(
"INSERT INTO address VALUES(418257940021724075, 'Germany', 'Berlin', 'West Town address 3')");
}
String[] expected =
new String[] {
"+I[417271541558096811, America, New York, East Town address 2]",
"+I[417272886855938987, America, New York, East Town address 3]",
"+I[417111867899200427, America, New York, East Town address 1]",
"+I[417420106184475563, Germany, Berlin, West Town address 1]",
"+I[418161258277847979, Germany, Berlin, West Town address 2]",
"+I[416874195632735147, China, Beijing, West Town address 1]",
"+I[416927583791428523, China, Beijing, West Town address 2]",
"+I[417022095255614379, China, Beijing, West Town address 3]",
"-U[416927583791428523, China, Beijing, West Town address 2]",
"+U[416927583791428523, China, Hangzhou, West Town address 2]",
"+I[418257940021724075, Germany, Berlin, West Town address 3]",
};
assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
@Ignore @Ignore
@Test @Test
public void testStartupFromSpecificOffset() throws Exception { public void testStartupFromSpecificOffset() throws Exception {
@ -507,8 +581,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(), TEST_USER,
inventoryDatabase.getPassword(), TEST_PASSWORD,
inventoryDatabase.getDatabaseName(), inventoryDatabase.getDatabaseName(),
"products", "products",
offset.f0, offset.f0,
@ -585,8 +659,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(), TEST_USER,
inventoryDatabase.getPassword(), TEST_PASSWORD,
inventoryDatabase.getDatabaseName(), inventoryDatabase.getDatabaseName(),
"products", "products",
incrementalSnapshot, incrementalSnapshot,
@ -671,8 +745,8 @@ public class MySqlConnectorITCase extends MySqlTestBase {
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(), TEST_USER,
inventoryDatabase.getPassword(), TEST_PASSWORD,
inventoryDatabase.getDatabaseName(), inventoryDatabase.getDatabaseName(),
"products", "products",
System.currentTimeMillis(), System.currentTimeMillis(),

@ -172,3 +172,21 @@ insert into shopping_cart_dec
VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'), VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
(124456.456, 'KIND_002', 'user_1', 'my shopping cart'), (124456.456, 'KIND_002', 'user_1', 'my shopping cart'),
(125489.6789, 'KIND_003', 'user_1', 'my shopping cart'); (125489.6789, 'KIND_003', 'user_1', 'my shopping cart');
-- create table whose primary key are produced by snowflake algorithm
CREATE TABLE address (
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
country VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
detail_address VARCHAR(1024)
);
INSERT INTO address
VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'),
(416927583791428523, 'China', 'Beijing', 'West Town address 2'),
(417022095255614379, 'China', 'Beijing', 'West Town address 3'),
(417111867899200427, 'America', 'New York', 'East Town address 1'),
(417271541558096811, 'America', 'New York', 'East Town address 2'),
(417272886855938987, 'America', 'New York', 'East Town address 3'),
(417420106184475563, 'Germany', 'Berlin', 'West Town address 1'),
(418161258277847979, 'Germany', 'Berlin', 'West Town address 2');

Loading…
Cancel
Save