[mysql] Optimize the checkpoint be optional under single parallelism (#365)

Co-authored-by: Leonard Xu <xbjtdcq@gmailcom>
pull/367/head
Leonard Xu 3 years ago committed by GitHub
parent d40765eb32
commit a26096910e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -130,10 +130,11 @@ public class MySqlParallelSource<T>
public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
SplitEnumeratorContext<MySqlSplit> enumContext) {
MySqlValidator validator = new MySqlValidator(config);
final int currentParallelism = enumContext.currentParallelism();
final MySqlSplitAssigner splitAssigner =
startupMode.equals("initial")
? new MySqlHybridSplitAssigner(config)
? new MySqlHybridSplitAssigner(config, currentParallelism)
: new MySqlBinlogSplitAssigner(config);
return new MySqlSourceEnumerator(enumContext, splitAssigner, validator);
@ -144,9 +145,11 @@ public class MySqlParallelSource<T>
SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
MySqlValidator validator = new MySqlValidator(config);
final MySqlSplitAssigner splitAssigner;
final int currentParallelism = enumContext.currentParallelism();
if (checkpoint instanceof HybridPendingSplitsState) {
splitAssigner =
new MySqlHybridSplitAssigner(config, (HybridPendingSplitsState) checkpoint);
new MySqlHybridSplitAssigner(
config, currentParallelism, (HybridPendingSplitsState) checkpoint);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner(config, (BinlogPendingSplitsState) checkpoint);

@ -50,15 +50,17 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
public MySqlHybridSplitAssigner(Configuration configuration) {
this(new MySqlSnapshotSplitAssigner(configuration), false);
public MySqlHybridSplitAssigner(Configuration configuration, int currentParallelism) {
this(new MySqlSnapshotSplitAssigner(configuration, currentParallelism), false);
}
public MySqlHybridSplitAssigner(
Configuration configuration, HybridPendingSplitsState checkpoint) {
Configuration configuration,
int currentParallelism,
HybridPendingSplitsState checkpoint) {
this(
new MySqlSnapshotSplitAssigner(
configuration, checkpoint.getSnapshotPendingSplits()),
configuration, currentParallelism, checkpoint.getSnapshotPendingSplits()),
checkpoint.isBinlogSplitAssigned());
}

@ -69,6 +69,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private boolean assignerFinished;
private final Configuration configuration;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
private final RelationalTableFilters tableFilters;
private final int chunkSize;
@ -78,9 +79,10 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Nullable private Long checkpointIdToFinish;
public MySqlSnapshotSplitAssigner(Configuration configuration) {
public MySqlSnapshotSplitAssigner(Configuration configuration, int currentParallelism) {
this(
configuration,
currentParallelism,
new ArrayList<>(),
new ArrayList<>(),
new HashMap<>(),
@ -89,9 +91,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
}
public MySqlSnapshotSplitAssigner(
Configuration configuration, SnapshotPendingSplitsState checkpoint) {
Configuration configuration,
int currentParallelism,
SnapshotPendingSplitsState checkpoint) {
this(
configuration,
currentParallelism,
checkpoint.getAlreadyProcessedTables(),
checkpoint.getRemainingSplits(),
checkpoint.getAssignedSplits(),
@ -101,12 +106,14 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private MySqlSnapshotSplitAssigner(
Configuration configuration,
int currentParallelism,
List<TableId> alreadyProcessedTables,
List<MySqlSnapshotSplit> remainingSplits,
Map<String, MySqlSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
boolean assignerFinished) {
this.configuration = configuration;
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
@ -166,8 +173,17 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSplitsFinished()) {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerFinished = true;
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
}
}
}

@ -75,6 +75,7 @@ import static org.junit.Assert.assertEquals;
/** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlTestBase {
private static final int currentParallelism = 4;
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -586,7 +587,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
}
private List<MySqlSnapshotSplit> getMySqlSplits(Configuration configuration, RowType pkType) {
final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
assigner.open();
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
while (true) {

@ -60,6 +60,7 @@ import static org.junit.Assert.assertEquals;
/** Tests for {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderTest extends MySqlTestBase {
private static final int currentParallelism = 4;
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -277,7 +278,8 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
}
private List<MySqlSplit> getMySqlSplits(Configuration configuration, RowType pkType) {
final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
assigner.open();
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
while (true) {

@ -55,6 +55,7 @@ import static org.junit.Assert.assertEquals;
/** Tests for {@link MySqlHybridSplitAssigner}. */
public class MySqlHybridSplitAssignerTest extends MySqlTestBase {
private static final int currentParallelism = 4;
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -114,7 +115,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlTestBase {
HybridPendingSplitsState checkpoint =
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(configuration, checkpoint);
new MySqlHybridSplitAssigner(configuration, currentParallelism, checkpoint);
// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
Optional<MySqlSplit> binlogSplit = assigner.getNext();

@ -48,6 +48,7 @@ import static org.junit.Assert.fail;
/** Tests for {@link MySqlSnapshotSplitAssigner}. */
public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
private static final int currentParallelism = 4;
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -121,7 +122,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
.collect(Collectors.toList());
configuration.setString("table.whitelist", String.join(",", captureTableIds));
final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
assigner.open();
List<MySqlSplit> sqlSplits = new ArrayList<>();

@ -206,6 +206,94 @@ public class MySqlConnectorITCase extends MySqlTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testCheckpointIsOptionalUnderSingleParallelism() throws Exception {
if (incrementalSnapshot) {
env.setParallelism(1);
// check the checkpoint is optional when parallelism is 1
env.getCheckpointConfig().disableCheckpointing();
} else {
return;
}
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"products",
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
CloseableIterator<Row> iterator = result.collect();
String[] expectedSnapshot =
new String[] {
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
"+I[102, car battery, 12V car battery, 8.100]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
"+I[106, hammer, 16oz carpenter's hammer, 1.000]",
"+I[107, rocks, box of assorted rocks, 5.300]",
"+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]"
};
assertThat(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
String[] expectedBinlog =
new String[] {
"+I[110, jacket, water resistent white wind breaker, 0.200]",
"+I[111, scooter, Big 2-wheel scooter , 5.180]",
"-U[110, jacket, water resistent white wind breaker, 0.200]",
"+U[110, jacket, new water resistent white wind breaker, 0.500]",
"-U[111, scooter, Big 2-wheel scooter , 5.180]",
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
result.getJobClient().get().cancel().get();
}
@Test
public void testAllTypes() throws Throwable {
fullTypesDatabase.createAndInitialize();

Loading…
Cancel
Save