diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index d7b682b47..aa4dd2cb2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -20,7 +20,6 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.cdc.common.event.DataChangeEvent; -import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -67,7 +66,6 @@ public class PaimonWriter private final Map writes; private final ExecutorService compactExecutor; private final MetricGroup metricGroup; - private final List committables; /** A workaround variable trace the checkpointId in {@link StreamOperator#snapshotState}. */ private long lastCheckpointId; @@ -83,7 +81,6 @@ public class PaimonWriter this.commitUser = commitUser; this.tables = new HashMap<>(); this.writes = new HashMap<>(); - this.committables = new ArrayList<>(); this.ioManager = new IOManagerAsync(); this.compactExecutor = Executors.newSingleThreadScheduledExecutor( @@ -94,11 +91,23 @@ public class PaimonWriter } @Override - public Collection prepareCommit() { - Collection allCommittables = new ArrayList<>(committables); - committables.clear(); + public Collection prepareCommit() throws IOException { + List committables = new ArrayList<>(); + for (Map.Entry entry : writes.entrySet()) { + Identifier key = entry.getKey(); + StoreSinkWrite write = entry.getValue(); + boolean waitCompaction = false; + committables.addAll( + // here we set it to lastCheckpointId+1 to + // avoid prepareCommit the same checkpointId with the first round. + write.prepareCommit(waitCompaction, lastCheckpointId + 1).stream() + .map( + committable -> + MultiTableCommittable.fromCommittable(key, committable)) + .collect(Collectors.toList())); + } lastCheckpointId++; - return allCommittables; + return committables; } @Override @@ -108,8 +117,14 @@ public class PaimonWriter if (paimonEvent.isShouldRefreshSchema()) { // remove the table temporarily, then add the table with latest schema when received // DataChangeEvent. - writes.remove(tableId); tables.remove(tableId); + try { + if (writes.containsKey(tableId)) { + writes.get(tableId).replace(getTable(tableId)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } } if (paimonEvent.getGenericRow() != null) { FileStoreTable table; @@ -159,36 +174,9 @@ public class PaimonWriter }); } - /** - * Called on checkpoint or end of input so that the writer to flush all pending data for - * at-least-once. - * - *

Execution order: flush(boolean endOfInput)=>prepareCommit()=>snapshotState(long - * checkpointId). - * - *

this method will also be called when receiving {@link FlushEvent}, but we don't need to - * commit the MultiTableCommittables immediately in this case, because {@link PaimonCommitter} - * support committing data of different schemas. - */ @Override - public void flush(boolean endOfInput) throws IOException { - for (Map.Entry entry : writes.entrySet()) { - Identifier key = entry.getKey(); - StoreSinkWrite write = entry.getValue(); - boolean waitCompaction = false; - committables.addAll( - // here we set it to lastCheckpointId+1 to - // avoid prepareCommit the same checkpointId with the first round. - // One thing to note is that during schema evolution, flush and checkpoint are - // consistent, - // but as long as there is data coming in, it will not trigger any conflict - // issues - write.prepareCommit(waitCompaction, lastCheckpointId + 1).stream() - .map( - committable -> - MultiTableCommittable.fromCommittable(key, committable)) - .collect(Collectors.toList())); - } + public void flush(boolean endOfInput) { + // do nothing as StoreSinkWrite#replace will write buffer to file. } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java index 941189a0a..fb7489542 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java @@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.SinkRecord; @@ -217,7 +218,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite { if (commitUser == null) { return; } + + List> states = write.checkpoint(); write.close(); write = newTableWrite(newTable); + write.restore((List) states); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index c337b8cce..094620b7b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -63,9 +63,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.OptionalLong; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -108,6 +110,7 @@ public class PaimonSinkITCase { catalogOptions = new Options(); catalogOptions.setString("metastore", metastore); catalogOptions.setString("warehouse", warehouse); + catalogOptions.setString("cache-enabled", "false"); table1 = TableId.tableId("test", "table1"); if ("hive".equals(metastore)) { catalogOptions.setString("hadoop-conf-dir", HADOOP_CONF_DIR); @@ -119,13 +122,14 @@ public class PaimonSinkITCase { + "'warehouse'='%s', " + "'metastore'='hive', " + "'hadoop-conf-dir'='%s', " - + "'hive-conf-dir'='%s' " + + "'hive-conf-dir'='%s', " + + "'cache-enabled'='false' " + ")", warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR)); } else { tEnv.executeSql( String.format( - "CREATE CATALOG paimon_catalog WITH ('type'='paimon', 'warehouse'='%s')", + "CREATE CATALOG paimon_catalog WITH ('type'='paimon', 'warehouse'='%s', 'cache-enabled'='false')", warehouse)); } FlinkCatalogFactory.createPaimonCatalog(catalogOptions) @@ -325,8 +329,8 @@ public class PaimonSinkITCase { AddColumnEvent addColumnEvent = new AddColumnEvent(table1, Collections.singletonList(columnWithPosition)); PaimonMetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); - writer.write(addColumnEvent, null); metadataApplier.applySchemaChange(addColumnEvent); + writer.write(addColumnEvent, null); generator = new BinaryRecordDataGenerator( RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING())); @@ -409,6 +413,13 @@ public class PaimonSinkITCase { Row.ofKind(RowKind.INSERT, "5", "5"), Row.ofKind(RowKind.INSERT, "6", "6")), result); + result = new ArrayList<>(); + tEnv.sqlQuery("select min_sequence_number from paimon_catalog.test.`table1$files`") + .execute() + .collect() + .forEachRemaining(result::add); + Set deduplicated = new HashSet<>(result); + Assertions.assertEquals(result.size(), deduplicated.size()); } @ParameterizedTest