From 6750f5e9437d780b5ac32919e1414996683494ef Mon Sep 17 00:00:00 2001 From: ConradJam Date: Thu, 30 Jan 2025 13:28:43 +0800 Subject: [PATCH] feat: write iceberg rowData for iceberg writer --- .../iceberg/sink/IcebergDataSink.java | 6 +- .../iceberg/sink/v2/IcebergEvent.java | 13 +- ...EventSink.java => IcebergEventSinkV2.java} | 6 +- .../sink/v2/IcebergRecordEventSerializer.java | 6 +- .../{IcebergSink.java => IcebergSinkV2.java} | 6 +- .../iceberg/sink/v2/IcebergWriter.java | 114 ++++++++++++++++-- .../iceberg/sink/v2/IcebergWriterHelper.java | 16 +-- 7 files changed, 129 insertions(+), 38 deletions(-) rename flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/{IcebergEventSink.java => IcebergEventSinkV2.java} (83%) rename flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/{IcebergSink.java => IcebergSinkV2.java} (96%) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java index 3268bd11b..f092f0b3d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java @@ -8,7 +8,7 @@ import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.sink.EventSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.MetadataApplier; -import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink; +import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSinkV2; import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer; import java.io.Serializable; @@ -53,8 +53,8 @@ public class IcebergDataSink implements DataSink, Serializable { @Override public EventSinkProvider getEventSinkProvider() { - IcebergEventSink icebergEventSink = - new IcebergEventSink( + IcebergEventSinkV2 icebergEventSink = + new IcebergEventSinkV2( tableOptions, commitUser, serializer, schemaOperatorUid, zoneId); return FlinkSinkProvider.of(icebergEventSink); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEvent.java index 147f36a3a..2036a3f1f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEvent.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEvent.java @@ -1,7 +1,8 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2; +import org.apache.flink.table.data.GenericRowData; + import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; public class IcebergEvent { @@ -9,19 +10,19 @@ public class IcebergEvent { TableIdentifier tableId; // The actual record to be written to iceberg table. - GenericRecord genericRow; + GenericRowData genericRow; // if true, means that table schema has changed right before this genericRow. boolean shouldRefreshSchema; public IcebergEvent( - TableIdentifier tableId, GenericRecord genericRow, boolean shouldRefreshSchema) { + TableIdentifier tableId, GenericRowData genericRow, boolean shouldRefreshSchema) { this.tableId = tableId; this.genericRow = genericRow; this.shouldRefreshSchema = shouldRefreshSchema; } - public IcebergEvent(TableIdentifier tableId, GenericRecord genericRow) { + public IcebergEvent(TableIdentifier tableId, GenericRowData genericRow) { this.tableId = tableId; this.genericRow = genericRow; this.shouldRefreshSchema = false; @@ -35,11 +36,11 @@ public class IcebergEvent { this.tableId = tableId; } - public GenericRecord getGenericRow() { + public GenericRowData getGenericRow() { return genericRow; } - public void setGenericRow(GenericRecord genericRow) { + public void setGenericRow(GenericRowData genericRow) { this.genericRow = genericRow; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSinkV2.java similarity index 83% rename from flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSink.java rename to flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSinkV2.java index b3fd10911..e4c6363c2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergEventSinkV2.java @@ -3,18 +3,17 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.iceberg.flink.sink.FlinkSink; import java.time.ZoneId; import java.util.Map; -public class IcebergEventSink extends IcebergSink implements SupportsPreWriteTopology { +public class IcebergEventSinkV2 extends IcebergSinkV2 implements SupportsPreWriteTopology { public final String schemaOperatorUid; public final ZoneId zoneId; - public IcebergEventSink( + public IcebergEventSinkV2( Map catalogOptions, String commitUser, IcebergRecordSerializer serializer, @@ -27,7 +26,6 @@ public class IcebergEventSink extends IcebergSink implements SupportsPreWriteTop @Override public DataStream addPreWriteTopology(DataStream dataStream) { - return dataStream; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergRecordEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergRecordEventSerializer.java index 0378d7e50..875eef25c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergRecordEventSerializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergRecordEventSerializer.java @@ -7,9 +7,9 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.table.data.GenericRowData; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; import java.io.IOException; import java.time.ZoneId; @@ -50,8 +50,8 @@ public class IcebergRecordEventSerializer implements IcebergRecordSerializer, SupportsPreWriteTopology, SupportsCommitter { @@ -30,14 +30,14 @@ public class IcebergSink private final IcebergRecordSerializer serializer; - public IcebergSink( + public IcebergSinkV2( Map catalogOptions, IcebergRecordSerializer serializer) { this.catalogOptions = catalogOptions; this.serializer = serializer; commitUser = DEFAULT_COMMIT_USER; } - public IcebergSink( + public IcebergSinkV2( Map catalogOptions, String commitUser, IcebergRecordSerializer serializer) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java index 71e093c26..0c20840fc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java @@ -2,30 +2,126 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2; import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; -import org.apache.iceberg.catalog.ImmutableTableCommit; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -public class IcebergWriter implements CommittingSinkWriter { +public class IcebergWriter implements CommittingSinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + + private final Table table; + private final String commitUser; + private final MetricGroup metricGroup; + private final Map> writes; + private final IcebergRecordSerializer serializer; + + private long lastCheckpointId; + + public IcebergWriter( + Table table, + MetricGroup metricGroup, + String commitUser, + IcebergRecordSerializer serializer, + long lastCheckpointId) { + this.table = table; + this.commitUser = commitUser; + this.metricGroup = metricGroup; + this.serializer = serializer; + this.writes = new HashMap<>(); + this.lastCheckpointId = lastCheckpointId; + } @Override - public Collection prepareCommit() - throws IOException, InterruptedException { - return Collections.emptyList(); + public Collection prepareCommit() throws IOException, InterruptedException { + List committables = new ArrayList<>(); + for (Map.Entry> entry : writes.entrySet()) { + String key = entry.getKey(); + TaskWriter writer = entry.getValue(); + WriteResult result = writer.complete(); + committables.add(result); + writes.put(key, getTaskWriter()); + LOG.info( + "Iceberg writer flushed {} data files and {} delete files", + result.dataFiles().length, + result.deleteFiles().length); + } + return committables; } @Override - public void write(InputT inputT, Context context) throws IOException, InterruptedException {} + public void write(InputT inputT, Context context) throws IOException, InterruptedException { + IcebergEvent icebergEvent = serializer.serialize(inputT); + String tableId = icebergEvent.getTableId().name(); + + // Handle schema changes (if any) + if (icebergEvent.isShouldRefreshSchema()) { + // In case of schema changes, refresh the table + try { + table.refresh(); + } catch (Exception e) { + throw new IOException("Failed to refresh Iceberg table schema", e); + } + } + + // Write the data to Iceberg + if (icebergEvent.getGenericRow() != null) { + TaskWriter writer = writes.computeIfAbsent(tableId, id -> getTaskWriter()); + + try { + writer.write(icebergEvent.getGenericRow()); + } catch (Exception e) { + throw new IOException("Failed to write event to Iceberg", e); + } + } + } + + private TaskWriter getTaskWriter() { + String formatString = + PropertyUtil.propertyAsString( + table.properties(), + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + FileFormat format = FileFormat.fromString(formatString); + RowType flinkSchema = FlinkSchemaUtil.convert(table.schema()); + RowDataTaskWriterFactory taskWriterFactory = + new RowDataTaskWriterFactory( + table, flinkSchema, Long.MAX_VALUE, format, table.properties(), null, true); + return taskWriterFactory.create(); + } @Override - public void flush(boolean b) throws IOException, InterruptedException {} + public void flush(boolean b) throws IOException, InterruptedException { + // flush is used to handle flush/endOfInput, so no action is taken here. + } @Override public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {} @Override - public void close() throws Exception {} + public void close() throws Exception { + for (TaskWriter writer : writes.values()) { + if (writer != null) { + writer.close(); + } + } + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterHelper.java index 9ead72db0..fef0f333c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterHelper.java @@ -24,12 +24,10 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.types.RowKind; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.flink.data.StructRowData; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.DecimalUtil; @@ -145,10 +143,9 @@ public class IcebergWriterHelper { }; } - public static GenericRecord convertEventToGenericRow( + public static GenericRowData convertEventToRow( DataChangeEvent dataChangeEvent, List fieldGetters) { - StructRowData structRowData; - GenericRecord genericRow = null; + GenericRowData genericRow; RecordData recordData; switch (dataChangeEvent.op()) { case INSERT: @@ -156,21 +153,20 @@ public class IcebergWriterHelper { case REPLACE: { recordData = dataChangeEvent.after(); - structRowData = new StructRowData(Types.StructType.of(), RowKind.INSERT); + genericRow = new GenericRowData(RowKind.INSERT, recordData.getArity()); break; } case DELETE: { recordData = dataChangeEvent.before(); - structRowData = new StructRowData(Types.StructType.of(), RowKind.DELETE); + genericRow = new GenericRowData(RowKind.DELETE, recordData.getArity()); break; } default: throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op()); } for (int i = 0; i < recordData.getArity(); i++) { - // todo : how to set this row to - genericRow.setField(null, fieldGetters.get(i).getFieldOrNull(recordData)); + genericRow.setField(i, fieldGetters.get(i).getFieldOrNull(recordData)); } return genericRow; }