feat: write iceberg rowData for iceberg writer

pull/3877/head
ConradJam 1 week ago
parent 99d965020f
commit 6750f5e943

@ -8,7 +8,7 @@ import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider; import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink; import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSinkV2;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer; import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -53,8 +53,8 @@ public class IcebergDataSink implements DataSink, Serializable {
@Override @Override
public EventSinkProvider getEventSinkProvider() { public EventSinkProvider getEventSinkProvider() {
IcebergEventSink icebergEventSink = IcebergEventSinkV2 icebergEventSink =
new IcebergEventSink( new IcebergEventSinkV2(
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId); tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
return FlinkSinkProvider.of(icebergEventSink); return FlinkSinkProvider.of(icebergEventSink);
} }

@ -1,7 +1,8 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2; package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.table.data.GenericRowData;
import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
public class IcebergEvent { public class IcebergEvent {
@ -9,19 +10,19 @@ public class IcebergEvent {
TableIdentifier tableId; TableIdentifier tableId;
// The actual record to be written to iceberg table. // The actual record to be written to iceberg table.
GenericRecord genericRow; GenericRowData genericRow;
// if true, means that table schema has changed right before this genericRow. // if true, means that table schema has changed right before this genericRow.
boolean shouldRefreshSchema; boolean shouldRefreshSchema;
public IcebergEvent( public IcebergEvent(
TableIdentifier tableId, GenericRecord genericRow, boolean shouldRefreshSchema) { TableIdentifier tableId, GenericRowData genericRow, boolean shouldRefreshSchema) {
this.tableId = tableId; this.tableId = tableId;
this.genericRow = genericRow; this.genericRow = genericRow;
this.shouldRefreshSchema = shouldRefreshSchema; this.shouldRefreshSchema = shouldRefreshSchema;
} }
public IcebergEvent(TableIdentifier tableId, GenericRecord genericRow) { public IcebergEvent(TableIdentifier tableId, GenericRowData genericRow) {
this.tableId = tableId; this.tableId = tableId;
this.genericRow = genericRow; this.genericRow = genericRow;
this.shouldRefreshSchema = false; this.shouldRefreshSchema = false;
@ -35,11 +36,11 @@ public class IcebergEvent {
this.tableId = tableId; this.tableId = tableId;
} }
public GenericRecord getGenericRow() { public GenericRowData getGenericRow() {
return genericRow; return genericRow;
} }
public void setGenericRow(GenericRecord genericRow) { public void setGenericRow(GenericRowData genericRow) {
this.genericRow = genericRow; this.genericRow = genericRow;
} }

@ -3,18 +3,17 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.flink.sink.FlinkSink;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Map; import java.util.Map;
public class IcebergEventSink extends IcebergSink implements SupportsPreWriteTopology<Event> { public class IcebergEventSinkV2 extends IcebergSinkV2 implements SupportsPreWriteTopology<Event> {
public final String schemaOperatorUid; public final String schemaOperatorUid;
public final ZoneId zoneId; public final ZoneId zoneId;
public IcebergEventSink( public IcebergEventSinkV2(
Map<String, String> catalogOptions, Map<String, String> catalogOptions,
String commitUser, String commitUser,
IcebergRecordSerializer<Event> serializer, IcebergRecordSerializer<Event> serializer,
@ -27,7 +26,6 @@ public class IcebergEventSink extends IcebergSink implements SupportsPreWriteTop
@Override @Override
public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) { public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
return dataStream; return dataStream;
} }
} }

@ -7,9 +7,9 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import java.io.IOException; import java.io.IOException;
import java.time.ZoneId; import java.time.ZoneId;
@ -50,8 +50,8 @@ public class IcebergRecordEventSerializer implements IcebergRecordSerializer<Eve
return new IcebergEvent(tableId, null, true); return new IcebergEvent(tableId, null, true);
} else if (event instanceof DataChangeEvent) { } else if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event; DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
GenericRecord genericRecord = GenericRowData genericRecord =
IcebergWriterHelper.convertEventToGenericRow( IcebergWriterHelper.convertEventToRow(
dataChangeEvent, dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters()); schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
return new IcebergEvent(tableId, genericRecord, false); return new IcebergEvent(tableId, genericRecord, false);

@ -16,7 +16,7 @@ import org.apache.iceberg.catalog.ImmutableTableCommit;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
public class IcebergSink public class IcebergSinkV2
implements Sink<Event>, implements Sink<Event>,
SupportsPreWriteTopology<Event>, SupportsPreWriteTopology<Event>,
SupportsCommitter<ImmutableTableCommit> { SupportsCommitter<ImmutableTableCommit> {
@ -30,14 +30,14 @@ public class IcebergSink
private final IcebergRecordSerializer<Event> serializer; private final IcebergRecordSerializer<Event> serializer;
public IcebergSink( public IcebergSinkV2(
Map<String, String> catalogOptions, IcebergRecordSerializer<Event> serializer) { Map<String, String> catalogOptions, IcebergRecordSerializer<Event> serializer) {
this.catalogOptions = catalogOptions; this.catalogOptions = catalogOptions;
this.serializer = serializer; this.serializer = serializer;
commitUser = DEFAULT_COMMIT_USER; commitUser = DEFAULT_COMMIT_USER;
} }
public IcebergSink( public IcebergSinkV2(
Map<String, String> catalogOptions, Map<String, String> catalogOptions,
String commitUser, String commitUser,
IcebergRecordSerializer<Event> serializer) { IcebergRecordSerializer<Event> serializer) {

@ -2,30 +2,126 @@ package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter; import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.catalog.ImmutableTableCommit; import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class IcebergWriter<InputT> implements CommittingSinkWriter<InputT, ImmutableTableCommit> { public class IcebergWriter<InputT> implements CommittingSinkWriter<InputT, WriteResult> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class);
private final Table table;
private final String commitUser;
private final MetricGroup metricGroup;
private final Map<String, TaskWriter<RowData>> writes;
private final IcebergRecordSerializer<InputT> serializer;
private long lastCheckpointId;
public IcebergWriter(
Table table,
MetricGroup metricGroup,
String commitUser,
IcebergRecordSerializer<InputT> serializer,
long lastCheckpointId) {
this.table = table;
this.commitUser = commitUser;
this.metricGroup = metricGroup;
this.serializer = serializer;
this.writes = new HashMap<>();
this.lastCheckpointId = lastCheckpointId;
}
@Override @Override
public Collection<ImmutableTableCommit> prepareCommit() public Collection<WriteResult> prepareCommit() throws IOException, InterruptedException {
throws IOException, InterruptedException { List<WriteResult> committables = new ArrayList<>();
return Collections.emptyList(); for (Map.Entry<String, TaskWriter<RowData>> entry : writes.entrySet()) {
String key = entry.getKey();
TaskWriter<RowData> writer = entry.getValue();
WriteResult result = writer.complete();
committables.add(result);
writes.put(key, getTaskWriter());
LOG.info(
"Iceberg writer flushed {} data files and {} delete files",
result.dataFiles().length,
result.deleteFiles().length);
}
return committables;
} }
@Override @Override
public void write(InputT inputT, Context context) throws IOException, InterruptedException {} public void write(InputT inputT, Context context) throws IOException, InterruptedException {
IcebergEvent icebergEvent = serializer.serialize(inputT);
String tableId = icebergEvent.getTableId().name();
// Handle schema changes (if any)
if (icebergEvent.isShouldRefreshSchema()) {
// In case of schema changes, refresh the table
try {
table.refresh();
} catch (Exception e) {
throw new IOException("Failed to refresh Iceberg table schema", e);
}
}
// Write the data to Iceberg
if (icebergEvent.getGenericRow() != null) {
TaskWriter<RowData> writer = writes.computeIfAbsent(tableId, id -> getTaskWriter());
try {
writer.write(icebergEvent.getGenericRow());
} catch (Exception e) {
throw new IOException("Failed to write event to Iceberg", e);
}
}
}
private TaskWriter<RowData> getTaskWriter() {
String formatString =
PropertyUtil.propertyAsString(
table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat format = FileFormat.fromString(formatString);
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
RowDataTaskWriterFactory taskWriterFactory =
new RowDataTaskWriterFactory(
table, flinkSchema, Long.MAX_VALUE, format, table.properties(), null, true);
return taskWriterFactory.create();
}
@Override @Override
public void flush(boolean b) throws IOException, InterruptedException {} public void flush(boolean b) throws IOException, InterruptedException {
// flush is used to handle flush/endOfInput, so no action is taken here.
}
@Override @Override
public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {} public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {}
@Override @Override
public void close() throws Exception {} public void close() throws Exception {
for (TaskWriter<RowData> writer : writes.values()) {
if (writer != null) {
writer.close();
}
}
}
} }

@ -24,12 +24,10 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks; import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.flink.data.StructRowData;
import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.DecimalUtil; import org.apache.iceberg.util.DecimalUtil;
@ -145,10 +143,9 @@ public class IcebergWriterHelper {
}; };
} }
public static GenericRecord convertEventToGenericRow( public static GenericRowData convertEventToRow(
DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> fieldGetters) { DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> fieldGetters) {
StructRowData structRowData; GenericRowData genericRow;
GenericRecord genericRow = null;
RecordData recordData; RecordData recordData;
switch (dataChangeEvent.op()) { switch (dataChangeEvent.op()) {
case INSERT: case INSERT:
@ -156,21 +153,20 @@ public class IcebergWriterHelper {
case REPLACE: case REPLACE:
{ {
recordData = dataChangeEvent.after(); recordData = dataChangeEvent.after();
structRowData = new StructRowData(Types.StructType.of(), RowKind.INSERT); genericRow = new GenericRowData(RowKind.INSERT, recordData.getArity());
break; break;
} }
case DELETE: case DELETE:
{ {
recordData = dataChangeEvent.before(); recordData = dataChangeEvent.before();
structRowData = new StructRowData(Types.StructType.of(), RowKind.DELETE); genericRow = new GenericRowData(RowKind.DELETE, recordData.getArity());
break; break;
} }
default: default:
throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op()); throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op());
} }
for (int i = 0; i < recordData.getArity(); i++) { for (int i = 0; i < recordData.getArity(); i++) {
// todo : how to set this row to genericRow.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
genericRow.setField(null, fieldGetters.get(i).getFieldOrNull(recordData));
} }
return genericRow; return genericRow;
} }

Loading…
Cancel
Save