[mysql] Support wide table with huge number of columns

pull/450/head
Leonard Xu 3 years ago committed by GitHub
parent bbd2870b08
commit 004d1768ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -33,6 +33,7 @@ import io.debezium.relational.history.JsonTableChangeSerializer;
import io.debezium.relational.history.TableChanges.TableChange;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -133,7 +134,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
Object[] splitBoundaryStart = serializedStringToRow(in.readUTF());
Object[] splitBoundaryEnd = serializedStringToRow(in.readUTF());
BinlogOffset highWatermark = readBinlogPosition(version, in);
Map<TableId, TableChange> tableSchemas = readTableSchemas(in);
Map<TableId, TableChange> tableSchemas = readTableSchemas(version, in);
return new MySqlSnapshotSplit(
tableId,
@ -150,7 +151,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
BinlogOffset endingOffset = readBinlogPosition(version, in);
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
readFinishedSplitsInfo(version, in);
Map<TableId, TableChange> tableChangeMap = readTableSchemas(in);
Map<TableId, TableChange> tableChangeMap = readTableSchemas(version, in);
in.releaseArrays();
return new MySqlBinlogSplit(
splitId,
@ -172,18 +173,36 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
out.writeInt(size);
for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) {
out.writeUTF(entry.getKey().toString());
out.writeUTF(documentWriter.write(jsonSerializer.toDocument(entry.getValue())));
final String tableChangeStr =
documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
final byte[] tableChangeBytes = tableChangeStr.getBytes(StandardCharsets.UTF_8);
out.writeInt(tableChangeBytes.length);
out.write(tableChangeBytes);
}
}
private static Map<TableId, TableChange> readTableSchemas(DataInputDeserializer in)
private static Map<TableId, TableChange> readTableSchemas(int version, DataInputDeserializer in)
throws IOException {
DocumentReader documentReader = DocumentReader.defaultReader();
Map<TableId, TableChange> tableSchemas = new HashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
TableId tableId = TableId.parse(in.readUTF());
Document document = documentReader.read(in.readUTF());
final String tableChangeStr;
switch (version) {
case 1:
tableChangeStr = in.readUTF();
break;
case 2:
final int len = in.readInt();
final byte[] bytes = new byte[len];
in.read(bytes);
tableChangeStr = new String(bytes, StandardCharsets.UTF_8);
break;
default:
throw new IOException("Unknown version: " + version);
}
Document document = documentReader.read(tableChangeStr);
TableChange tableChange = JsonTableChangeSerializer.fromDocument(document, true);
tableSchemas.put(tableId, tableChange);
}

@ -402,6 +402,78 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testWideTable() throws Exception {
final int tableColumnCount = 500;
fullTypesDatabase.createAndInitialize();
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s", fullTypesDatabase.getDatabaseName()));
statement.execute(
"CREATE TABLE wide_table("
+ buildColumnsDDL("col", 0, tableColumnCount, "BIGINT")
+ " PRIMARY KEY (col0) "
+ ")");
statement.execute(
"INSERT INTO wide_table values("
+ getIntegerSeqString(0, tableColumnCount)
+ ")");
}
String sourceDDL =
String.format(
"CREATE TABLE wide_table (\n"
+ buildColumnsDDL("col", 0, tableColumnCount, "BIGINT")
+ " primary key (`col0`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
fullTypesDatabase.getUsername(),
fullTypesDatabase.getPassword(),
fullTypesDatabase.getDatabaseName(),
"wide_table",
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM wide_table");
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE wide_table SET col1 = 1024 WHERE col0=0;");
}
String[] expected =
new String[] {
"+I[0, 1, " + getIntegerSeqString(2, tableColumnCount) + "]",
"-U[0, 1, " + getIntegerSeqString(2, tableColumnCount) + "]",
"+U[0, 1024, " + getIntegerSeqString(2, tableColumnCount) + "]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromLatestOffset() throws Exception {
inventoryDatabase.createAndInitialize();
@ -817,6 +889,24 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
return 0;
}
private static String buildColumnsDDL(
String columnPrefix, int start, int end, String dataType) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = start; i < end; i++) {
stringBuilder.append(columnPrefix).append(i).append(" ").append(dataType).append(",");
}
return stringBuilder.toString();
}
private static String getIntegerSeqString(int start, int end) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = start; i < end - 1; i++) {
stringBuilder.append(i).append(", ");
}
stringBuilder.append(end - 1);
return stringBuilder.toString();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);

Loading…
Cancel
Save