[FLINK-36790][cdc-connector][paimon] Set waitCompaction to true in PaimonWriter to avoid CME problem

This closes  #3760

Co-authored-by: wuzhiping <wuzhiping.007@bytedance.com>
pull/3830/merge
stayrascal 4 weeks ago committed by GitHub
parent ddb5f00df5
commit dca94bdd71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -96,7 +96,7 @@ public class PaimonWriter<InputT>
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) { for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey(); Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue(); StoreSinkWrite write = entry.getValue();
boolean waitCompaction = false; boolean waitCompaction = true;
committables.addAll( committables.addAll(
// here we set it to lastCheckpointId+1 to // here we set it to lastCheckpointId+1 to
// avoid prepareCommit the same checkpointId with the first round. // avoid prepareCommit the same checkpointId with the first round.

@ -58,7 +58,7 @@ import org.apache.paimon.options.Options;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.CsvSource;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -140,7 +140,7 @@ public class PaimonSinkITCase {
.dropDatabase(TEST_DATABASE, true, true); .dropDatabase(TEST_DATABASE, true, true);
} }
private List<Event> createTestEvents() throws SchemaEvolveException { private List<Event> createTestEvents(boolean enableDeleteVectors) throws SchemaEvolveException {
List<Event> testEvents = new ArrayList<>(); List<Event> testEvents = new ArrayList<>();
// create table // create table
Schema schema = Schema schema =
@ -149,6 +149,7 @@ public class PaimonSinkITCase {
.physicalColumn("col2", DataTypes.STRING()) .physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1") .primaryKey("col1")
.option("bucket", "1") .option("bucket", "1")
.option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors))
.build(); .build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema); CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
testEvents.add(createTableEvent); testEvents.add(createTableEvent);
@ -180,8 +181,8 @@ public class PaimonSinkITCase {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"}) @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithDataChange(String metastore) public void testSinkWithDataChange(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException { Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore); initialize(metastore);
@ -192,7 +193,7 @@ public class PaimonSinkITCase {
Committer<MultiTableCommittable> committer = paimonSink.createCommitter(); Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
// insert // insert
for (Event event : createTestEvents()) { for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null); writer.write(event, null);
} }
writer.flush(false); writer.flush(false);
@ -215,7 +216,7 @@ public class PaimonSinkITCase {
// delete // delete
Event event = Event event =
DataChangeEvent.deleteEvent( DataChangeEvent.deleteEvent(
TableId.tableId("test", "table1"), table1,
generator.generate( generator.generate(
new Object[] { new Object[] {
BinaryStringData.fromString("1"), BinaryStringData.fromString("1"),
@ -240,7 +241,7 @@ public class PaimonSinkITCase {
// update // update
event = event =
DataChangeEvent.updateEvent( DataChangeEvent.updateEvent(
TableId.tableId("test", "table1"), table1,
generator.generate( generator.generate(
new Object[] { new Object[] {
BinaryStringData.fromString("2"), BinaryStringData.fromString("2"),
@ -273,17 +274,19 @@ public class PaimonSinkITCase {
.collect() .collect()
.forEachRemaining(result::add); .forEachRemaining(result::add);
// Each commit will generate one sequence number(equal to checkpointId). // Each commit will generate one sequence number(equal to checkpointId).
Assertions.assertEquals( List<Row> expected =
Arrays.asList( enableDeleteVector
? Collections.singletonList(Row.ofKind(RowKind.INSERT, 3L))
: Arrays.asList(
Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L), Row.ofKind(RowKind.INSERT, 2L),
Row.ofKind(RowKind.INSERT, 3L)), Row.ofKind(RowKind.INSERT, 3L));
result); Assertions.assertEquals(expected, result);
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"}) @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithSchemaChange(String metastore) public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException { Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore); initialize(metastore);
@ -294,7 +297,7 @@ public class PaimonSinkITCase {
Committer<MultiTableCommittable> committer = paimonSink.createCommitter(); Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
// 1. receive only DataChangeEvents during one checkpoint // 1. receive only DataChangeEvents during one checkpoint
for (Event event : createTestEvents()) { for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null); writer.write(event, null);
} }
writer.flush(false); writer.flush(false);
@ -427,8 +430,8 @@ public class PaimonSinkITCase {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"}) @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithMultiTables(String metastore) public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException { Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore); initialize(metastore);
@ -437,7 +440,7 @@ public class PaimonSinkITCase {
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext()); PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
Committer<MultiTableCommittable> committer = paimonSink.createCommitter(); Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
List<Event> testEvents = createTestEvents(); List<Event> testEvents = createTestEvents(enableDeleteVector);
// create table // create table
TableId table2 = TableId.tableId("test", "table2"); TableId table2 = TableId.tableId("test", "table2");
Schema schema = Schema schema =
@ -492,8 +495,8 @@ public class PaimonSinkITCase {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"}) @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testDuplicateCommitAfterRestore(String metastore) public void testDuplicateCommitAfterRestore(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException { Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore); initialize(metastore);
@ -504,7 +507,7 @@ public class PaimonSinkITCase {
Committer<MultiTableCommittable> committer = paimonSink.createCommitter(); Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
// insert // insert
for (Event event : createTestEvents()) { for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null); writer.write(event, null);
} }
writer.flush(false); writer.flush(false);
@ -553,8 +556,13 @@ public class PaimonSinkITCase {
.execute() .execute()
.collect() .collect()
.forEachRemaining(result::add); .forEachRemaining(result::add);
if (enableDeleteVector) {
// Each APPEND will trigger COMPACT once enable deletion-vectors.
Assertions.assertEquals(16, result.size());
} else {
// 8 APPEND and 1 COMPACT // 8 APPEND and 1 COMPACT
Assertions.assertEquals(result.size(), 9); Assertions.assertEquals(9, result.size());
}
result.clear(); result.clear();
tEnv.sqlQuery("select * from paimon_catalog.test.`table1`") tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")

Loading…
Cancel
Save