|
|
|
@ -84,6 +84,8 @@ public class PaimonSinkITCase {
|
|
|
|
|
|
|
|
|
|
private BinaryRecordDataGenerator generator;
|
|
|
|
|
|
|
|
|
|
private static int checkpointId = 1;
|
|
|
|
|
|
|
|
|
|
public static final String TEST_DATABASE = "test";
|
|
|
|
|
private static final String HADOOP_CONF_DIR =
|
|
|
|
|
Objects.requireNonNull(
|
|
|
|
@ -188,6 +190,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -214,6 +217,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -243,6 +247,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -274,6 +279,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -324,6 +330,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -371,6 +378,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -433,6 +441,7 @@ public class PaimonSinkITCase {
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
@ -454,6 +463,99 @@ public class PaimonSinkITCase {
|
|
|
|
|
Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ParameterizedTest
|
|
|
|
|
@ValueSource(strings = {"filesystem", "hive"})
|
|
|
|
|
public void testDuplicateCommitAfterRestore(String metastore)
|
|
|
|
|
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
|
|
|
|
|
Catalog.DatabaseNotExistException, SchemaEvolveException {
|
|
|
|
|
initialize(metastore);
|
|
|
|
|
PaimonSink<Event> paimonSink =
|
|
|
|
|
new PaimonSink<>(
|
|
|
|
|
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
|
|
|
|
|
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
|
|
|
|
|
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
|
|
|
|
|
|
|
|
|
|
// insert
|
|
|
|
|
for (Event event : createTestEvents()) {
|
|
|
|
|
writer.write(event, null);
|
|
|
|
|
}
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests =
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
|
|
|
|
|
|
// We add a loop for restore 7 times
|
|
|
|
|
for (int i = 2; i < 9; i++) {
|
|
|
|
|
// We've two steps in checkpoint: 1. snapshotState(ckp); 2.
|
|
|
|
|
// notifyCheckpointComplete(ckp).
|
|
|
|
|
// It's possible that flink job will restore from a checkpoint with only step#1 finished
|
|
|
|
|
// and
|
|
|
|
|
// step#2 not.
|
|
|
|
|
// CommitterOperator will try to re-commit recovered transactions.
|
|
|
|
|
committer.commit(commitRequests);
|
|
|
|
|
List<DataChangeEvent> events =
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
DataChangeEvent.insertEvent(
|
|
|
|
|
table1,
|
|
|
|
|
generator.generate(
|
|
|
|
|
new Object[] {
|
|
|
|
|
BinaryStringData.fromString(Integer.toString(i)),
|
|
|
|
|
BinaryStringData.fromString(Integer.toString(i))
|
|
|
|
|
})));
|
|
|
|
|
Assertions.assertDoesNotThrow(
|
|
|
|
|
() -> {
|
|
|
|
|
for (Event event : events) {
|
|
|
|
|
writer.write(event, null);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
writer.flush(false);
|
|
|
|
|
// Checkpoint id start from 1
|
|
|
|
|
committer.commit(
|
|
|
|
|
writer.prepareCommit().stream()
|
|
|
|
|
.map(this::correctCheckpointId)
|
|
|
|
|
.map(MockCommitRequestImpl::new)
|
|
|
|
|
.collect(Collectors.toList()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<Row> result = new ArrayList<>();
|
|
|
|
|
tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
|
|
|
|
|
.execute()
|
|
|
|
|
.collect()
|
|
|
|
|
.forEachRemaining(result::add);
|
|
|
|
|
// 8 APPEND and 1 COMPACT
|
|
|
|
|
Assertions.assertEquals(result.size(), 9);
|
|
|
|
|
result.clear();
|
|
|
|
|
|
|
|
|
|
tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")
|
|
|
|
|
.execute()
|
|
|
|
|
.collect()
|
|
|
|
|
.forEachRemaining(result::add);
|
|
|
|
|
Assertions.assertEquals(
|
|
|
|
|
Arrays.asList(
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "1", "1"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "2", "2"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "3", "3"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "4", "4"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "5", "5"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "6", "6"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "7", "7"),
|
|
|
|
|
Row.ofKind(RowKind.INSERT, "8", "8")),
|
|
|
|
|
result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) {
|
|
|
|
|
// update the right checkpointId for MultiTableCommittable
|
|
|
|
|
return new MultiTableCommittable(
|
|
|
|
|
committable.getDatabase(),
|
|
|
|
|
committable.getTable(),
|
|
|
|
|
checkpointId++,
|
|
|
|
|
committable.kind(),
|
|
|
|
|
committable.wrappedCommittable());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class MockCommitRequestImpl<CommT> extends CommitRequestImpl<CommT> {
|
|
|
|
|
|
|
|
|
|
protected MockCommitRequestImpl(CommT committable) {
|
|
|
|
|