[FLINK-36750][pipeline-connector/paimon] Restore SinkWriter from state to keep the information of previous SinkWrite when schema evolution happen

This closes #3745.
pull/3751/head
Kunni 2 months ago committed by GitHub
parent 203bb9211b
commit fb2a1d0c38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -20,7 +20,6 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@ -67,7 +66,6 @@ public class PaimonWriter<InputT>
private final Map<Identifier, StoreSinkWrite> writes;
private final ExecutorService compactExecutor;
private final MetricGroup metricGroup;
private final List<MultiTableCommittable> committables;
/** A workaround variable trace the checkpointId in {@link StreamOperator#snapshotState}. */
private long lastCheckpointId;
@ -83,7 +81,6 @@ public class PaimonWriter<InputT>
this.commitUser = commitUser;
this.tables = new HashMap<>();
this.writes = new HashMap<>();
this.committables = new ArrayList<>();
this.ioManager = new IOManagerAsync();
this.compactExecutor =
Executors.newSingleThreadScheduledExecutor(
@ -94,11 +91,23 @@ public class PaimonWriter<InputT>
}
@Override
public Collection<MultiTableCommittable> prepareCommit() {
Collection<MultiTableCommittable> allCommittables = new ArrayList<>(committables);
committables.clear();
public Collection<MultiTableCommittable> prepareCommit() throws IOException {
List<MultiTableCommittable> committables = new ArrayList<>();
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue();
boolean waitCompaction = false;
committables.addAll(
// here we set it to lastCheckpointId+1 to
// avoid prepareCommit the same checkpointId with the first round.
write.prepareCommit(waitCompaction, lastCheckpointId + 1).stream()
.map(
committable ->
MultiTableCommittable.fromCommittable(key, committable))
.collect(Collectors.toList()));
}
lastCheckpointId++;
return allCommittables;
return committables;
}
@Override
@ -108,8 +117,14 @@ public class PaimonWriter<InputT>
if (paimonEvent.isShouldRefreshSchema()) {
// remove the table temporarily, then add the table with latest schema when received
// DataChangeEvent.
writes.remove(tableId);
tables.remove(tableId);
try {
if (writes.containsKey(tableId)) {
writes.get(tableId).replace(getTable(tableId));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (paimonEvent.getGenericRow() != null) {
FileStoreTable table;
@ -159,36 +174,9 @@ public class PaimonWriter<InputT>
});
}
/**
* Called on checkpoint or end of input so that the writer to flush all pending data for
* at-least-once.
*
* <p>Execution order: flush(boolean endOfInput)=>prepareCommit()=>snapshotState(long
* checkpointId).
*
* <p>this method will also be called when receiving {@link FlushEvent}, but we don't need to
* commit the MultiTableCommittables immediately in this case, because {@link PaimonCommitter}
* support committing data of different schemas.
*/
@Override
public void flush(boolean endOfInput) throws IOException {
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue();
boolean waitCompaction = false;
committables.addAll(
// here we set it to lastCheckpointId+1 to
// avoid prepareCommit the same checkpointId with the first round.
// One thing to note is that during schema evolution, flush and checkpoint are
// consistent,
// but as long as there is data coming in, it will not trigger any conflict
// issues
write.prepareCommit(waitCompaction, lastCheckpointId + 1).stream()
.map(
committable ->
MultiTableCommittable.fromCommittable(key, committable))
.collect(Collectors.toList()));
}
public void flush(boolean endOfInput) {
// do nothing as StoreSinkWrite#replace will write buffer to file.
}
@Override

@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
@ -217,7 +218,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
if (commitUser == null) {
return;
}
List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
write.close();
write = newTableWrite(newTable);
write.restore((List) states);
}
}

@ -63,9 +63,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@ -108,6 +110,7 @@ public class PaimonSinkITCase {
catalogOptions = new Options();
catalogOptions.setString("metastore", metastore);
catalogOptions.setString("warehouse", warehouse);
catalogOptions.setString("cache-enabled", "false");
table1 = TableId.tableId("test", "table1");
if ("hive".equals(metastore)) {
catalogOptions.setString("hadoop-conf-dir", HADOOP_CONF_DIR);
@ -119,13 +122,14 @@ public class PaimonSinkITCase {
+ "'warehouse'='%s', "
+ "'metastore'='hive', "
+ "'hadoop-conf-dir'='%s', "
+ "'hive-conf-dir'='%s' "
+ "'hive-conf-dir'='%s', "
+ "'cache-enabled'='false' "
+ ")",
warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR));
} else {
tEnv.executeSql(
String.format(
"CREATE CATALOG paimon_catalog WITH ('type'='paimon', 'warehouse'='%s')",
"CREATE CATALOG paimon_catalog WITH ('type'='paimon', 'warehouse'='%s', 'cache-enabled'='false')",
warehouse));
}
FlinkCatalogFactory.createPaimonCatalog(catalogOptions)
@ -325,8 +329,8 @@ public class PaimonSinkITCase {
AddColumnEvent addColumnEvent =
new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
writer.write(addColumnEvent, null);
metadataApplier.applySchemaChange(addColumnEvent);
writer.write(addColumnEvent, null);
generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()));
@ -409,6 +413,13 @@ public class PaimonSinkITCase {
Row.ofKind(RowKind.INSERT, "5", "5"),
Row.ofKind(RowKind.INSERT, "6", "6")),
result);
result = new ArrayList<>();
tEnv.sqlQuery("select min_sequence_number from paimon_catalog.test.`table1$files`")
.execute()
.collect()
.forEachRemaining(result::add);
Set<Row> deduplicated = new HashSet<>(result);
Assertions.assertEquals(result.size(), deduplicated.size());
}
@ParameterizedTest

Loading…
Cancel
Save