|
|
|
@ -30,6 +30,8 @@ import org.apache.flink.core.io.InputStatus;
|
|
|
|
|
import org.apache.flink.metrics.MetricGroup;
|
|
|
|
|
import org.apache.flink.table.api.DataTypes;
|
|
|
|
|
import org.apache.flink.table.types.DataType;
|
|
|
|
|
import org.apache.flink.table.types.logical.LogicalType;
|
|
|
|
|
import org.apache.flink.table.types.logical.RowType;
|
|
|
|
|
import org.apache.flink.util.Collector;
|
|
|
|
|
import org.apache.flink.util.Preconditions;
|
|
|
|
|
|
|
|
|
@ -93,6 +95,7 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHigh
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
|
|
|
|
|
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
|
|
|
|
|
import static java.lang.String.format;
|
|
|
|
|
import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
|
|
|
|
|
import static org.apache.flink.util.Preconditions.checkState;
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
|
@ -107,6 +110,107 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
|
|
|
|
|
private final UniqueDatabase inventoryDatabase =
|
|
|
|
|
new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testFinishedUnackedSplitsUsingStateFromSnapshotPhase() throws Exception {
|
|
|
|
|
customerDatabase.createAndInitialize();
|
|
|
|
|
final MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"});
|
|
|
|
|
final DataType dataType =
|
|
|
|
|
DataTypes.ROW(
|
|
|
|
|
DataTypes.FIELD("id", DataTypes.BIGINT()),
|
|
|
|
|
DataTypes.FIELD("name", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("address", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("phone_number", DataTypes.STRING()));
|
|
|
|
|
List<MySqlSplit> snapshotSplits;
|
|
|
|
|
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
|
|
|
|
|
Map<TableId, TableChanges.TableChange> tableSchemas =
|
|
|
|
|
TableDiscoveryUtils.discoverSchemaForCapturedTables(
|
|
|
|
|
new MySqlPartition(
|
|
|
|
|
sourceConfig.getMySqlConnectorConfig().getLogicalName()),
|
|
|
|
|
sourceConfig,
|
|
|
|
|
jdbc);
|
|
|
|
|
TableId tableId = new TableId(customerDatabase.getDatabaseName(), null, "customers");
|
|
|
|
|
RowType splitType =
|
|
|
|
|
RowType.of(
|
|
|
|
|
new LogicalType[] {DataTypes.INT().getLogicalType()},
|
|
|
|
|
new String[] {"id"});
|
|
|
|
|
snapshotSplits =
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
new MySqlSnapshotSplit(
|
|
|
|
|
tableId,
|
|
|
|
|
tableId + ":0",
|
|
|
|
|
splitType,
|
|
|
|
|
null,
|
|
|
|
|
new Integer[] {200},
|
|
|
|
|
null,
|
|
|
|
|
tableSchemas),
|
|
|
|
|
new MySqlSnapshotSplit(
|
|
|
|
|
tableId,
|
|
|
|
|
tableId + ":1",
|
|
|
|
|
splitType,
|
|
|
|
|
new Integer[] {200},
|
|
|
|
|
new Integer[] {1500},
|
|
|
|
|
null,
|
|
|
|
|
tableSchemas),
|
|
|
|
|
new MySqlSnapshotSplit(
|
|
|
|
|
tableId,
|
|
|
|
|
tableId + ":2",
|
|
|
|
|
splitType,
|
|
|
|
|
new Integer[] {1500},
|
|
|
|
|
null,
|
|
|
|
|
null,
|
|
|
|
|
tableSchemas));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 1: start source reader and assign snapshot splits
|
|
|
|
|
MySqlSourceReader<SourceRecord> reader = createReader(sourceConfig, -1);
|
|
|
|
|
reader.start();
|
|
|
|
|
reader.addSplits(snapshotSplits);
|
|
|
|
|
|
|
|
|
|
String[] expectedRecords =
|
|
|
|
|
new String[] {
|
|
|
|
|
"+I[111, user_6, Shanghai, 123567891234]",
|
|
|
|
|
"+I[110, user_5, Shanghai, 123567891234]",
|
|
|
|
|
"+I[101, user_1, Shanghai, 123567891234]",
|
|
|
|
|
"+I[103, user_3, Shanghai, 123567891234]",
|
|
|
|
|
"+I[102, user_2, Shanghai, 123567891234]",
|
|
|
|
|
"+I[118, user_7, Shanghai, 123567891234]",
|
|
|
|
|
"+I[121, user_8, Shanghai, 123567891234]",
|
|
|
|
|
"+I[123, user_9, Shanghai, 123567891234]",
|
|
|
|
|
"+I[109, user_4, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1009, user_10, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1011, user_12, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1010, user_11, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1013, user_14, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1012, user_13, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1015, user_16, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1014, user_15, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1017, user_18, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1016, user_17, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1019, user_20, Shanghai, 123567891234]",
|
|
|
|
|
"+I[1018, user_19, Shanghai, 123567891234]",
|
|
|
|
|
"+I[2000, user_21, Shanghai, 123567891234]"
|
|
|
|
|
};
|
|
|
|
|
// Step 2: wait the snapshot splits finished reading
|
|
|
|
|
Thread.sleep(5000L);
|
|
|
|
|
List<String> actualRecords = consumeRecords(reader, dataType);
|
|
|
|
|
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
|
|
|
|
|
|
|
|
|
|
// Step 3: snapshot reader's state
|
|
|
|
|
List<MySqlSplit> splitsState = reader.snapshotState(1L);
|
|
|
|
|
|
|
|
|
|
// Step 4: restart reader from a restored state
|
|
|
|
|
MySqlSourceReader<SourceRecord> restartReader = createReader(sourceConfig, -1);
|
|
|
|
|
restartReader.start();
|
|
|
|
|
restartReader.addSplits(splitsState);
|
|
|
|
|
|
|
|
|
|
// Step 5: check the finished unacked splits between original reader and restarted reader
|
|
|
|
|
assertEquals(3, reader.getFinishedUnackedSplits().size());
|
|
|
|
|
assertMapEquals(
|
|
|
|
|
restartReader.getFinishedUnackedSplits(), reader.getFinishedUnackedSplits());
|
|
|
|
|
reader.close();
|
|
|
|
|
restartReader.close();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testBinlogReadFailoverCrossTransaction() throws Exception {
|
|
|
|
|
customerDatabase.createAndInitialize();
|
|
|
|
@ -411,8 +515,9 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
|
|
|
|
|
MySqlSourceReader<SourceRecord> sourceReader, DataType recordType) throws Exception {
|
|
|
|
|
// Poll all the n records of the single split.
|
|
|
|
|
final SimpleReaderOutput output = new SimpleReaderOutput();
|
|
|
|
|
while (output.getResults().size() == 0) {
|
|
|
|
|
sourceReader.pollNext(output);
|
|
|
|
|
InputStatus status = MORE_AVAILABLE;
|
|
|
|
|
while (MORE_AVAILABLE == status || output.getResults().size() == 0) {
|
|
|
|
|
status = sourceReader.pollNext(output);
|
|
|
|
|
}
|
|
|
|
|
final RecordsFormatter formatter = new RecordsFormatter(recordType);
|
|
|
|
|
return formatter.format(output.getResults());
|
|
|
|
|