[FLINK-36061] Support Iceberg CDC Pipeline SinkV2

pull/3877/head
ConradJam 4 months ago committed by ConradJam
parent 3e16a66972
commit 99d965020f

@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-cdc-pipeline-connectors</artifactId>
<groupId>org.apache.flink</groupId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-pipeline-connector-iceberg</artifactId>
<packaging>jar</packaging>
<name>flink-cdc-pipeline-connector-iceberg</name>
<properties>
<iceberg.version>1.7.1</iceberg.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,78 @@
package org.apache.flink.cdc.connectors.iceberg.sink;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSink;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
import java.io.Serializable;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
public class IcebergDataSink implements DataSink, Serializable {
// options for creating Iceberg catalog.
private final Map<String, String> catalogOptions;
// options for creating Iceberg table.
private final Map<String, String> tableOptions;
private final String commitUser;
private final Map<TableId, List<String>> partitionMaps;
private final IcebergRecordSerializer<Event> serializer;
private final ZoneId zoneId;
public final String schemaOperatorUid;
public IcebergDataSink(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
String commitUser,
Map<TableId, List<String>> partitionMaps,
IcebergRecordSerializer<Event> serializer,
ZoneId zoneId,
String schemaOperatorUid) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.commitUser = commitUser;
this.partitionMaps = partitionMaps;
this.serializer = serializer;
this.zoneId = zoneId;
this.schemaOperatorUid = schemaOperatorUid;
}
@Override
public EventSinkProvider getEventSinkProvider() {
IcebergEventSink icebergEventSink =
new IcebergEventSink(
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
return FlinkSinkProvider.of(icebergEventSink);
}
@Override
public MetadataApplier getMetadataApplier() {
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
}
@Override
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
// TODO getDataChangeEventHashFunctionProvider if use
return DataSink.super.getDataChangeEventHashFunctionProvider();
}
@Override
public HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider(
int parallelism) {
return DataSink.super.getDataChangeEventHashFunctionProvider(parallelism);
}
}

@ -0,0 +1,108 @@
package org.apache.flink.cdc.connectors.iceberg.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordEventSerializer;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
import org.apache.flink.table.catalog.Catalog;
import org.apache.commons.collections.map.HashedMap;
import org.apache.iceberg.flink.FlinkCatalogFactory;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
public class IcebergDataSinkFactory implements DataSinkFactory {
public static final String IDENTIFIER = "iceberg";
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value);
} else if (key.startsWith(IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) {
catalogOptions.put(
key.substring(
IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()),
value);
}
});
FlinkCatalogFactory factory = new FlinkCatalogFactory();
try {
Catalog catalog =
factory.createCatalog(
catalogOptions.getOrDefault("default-database", "default"),
catalogOptions);
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");
} catch (Exception e) {
throw new RuntimeException("failed to create or use paimon catalog", e);
}
ZoneId zoneId = ZoneId.systemDefault();
if (!Objects.equals(
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue())) {
zoneId =
ZoneId.of(
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
}
String commitUser =
context.getFactoryConfiguration().get(IcebergDataSinkOptions.COMMIT_USER);
IcebergRecordSerializer<Event> serializer =
new IcebergRecordEventSerializer(new HashedMap(), zoneId);
String schemaOperatorUid =
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
return new IcebergDataSink(
catalogOptions,
tableOptions,
commitUser,
new HashMap<>(),
serializer,
zoneId,
schemaOperatorUid);
}
@Override
public String identifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.METASTORE);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.WAREHOUSE);
options.add(IcebergDataSinkOptions.URI);
options.add(IcebergDataSinkOptions.COMMIT_USER);
options.add(IcebergDataSinkOptions.PARTITION_KEY);
return options;
}
}

@ -0,0 +1,47 @@
package org.apache.flink.cdc.connectors.iceberg.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;
public class IcebergDataSinkOptions {
// prefix for passing properties for table creation.
public static final String PREFIX_TABLE_PROPERTIES = "table.properties.";
// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";
public static final ConfigOption<String> COMMIT_USER =
key("commit.user")
.stringType()
.defaultValue("admin")
.withDescription("User name for committing data files.");
public static final ConfigOption<String> WAREHOUSE =
key("catalog.properties.warehouse")
.stringType()
.noDefaultValue()
.withDescription("The warehouse root path of catalog.");
public static final ConfigOption<String> METASTORE =
key("catalog.properties.metastore")
.stringType()
.noDefaultValue()
.withDescription("Metastore of iceberg catalog, supports filesystem and hive.");
public static final ConfigOption<String> URI =
key("catalog.properties.uri")
.stringType()
.noDefaultValue()
.withDescription("Uri of metastore server.");
public static final ConfigOption<String> PARTITION_KEY =
key("partition.key")
.stringType()
.defaultValue("")
.withDescription(
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. "
+ "Tables are separated by ';', and partition keys are separated by ','. "
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
}

@ -0,0 +1,292 @@
package org.apache.flink.cdc.connectors.iceberg.sink;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
public class IcebergMetadataApplier implements MetadataApplier {
private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataApplier.class);
// Catalog is unSerializable.
private transient Catalog catalog;
private final Map<String, String> catalogOptions;
private final Map<TableId, List<String>> partitionMaps;
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
// currently, we set table options for all tables using the same options.
private final Map<String, String> tableOptions;
public IcebergMetadataApplier(Map<String, String> catalogOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = new HashMap<>();
this.partitionMaps = new HashMap<>();
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
public IcebergMetadataApplier(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
Map<TableId, List<String>> partitionMaps) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
throws SchemaEvolveException {
if (catalog == null) {
catalog = CatalogUtil.buildIcebergCatalog("cdc-iceberg-catalog", catalogOptions, null);
}
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
addColumnEvent -> {
applyAddColumn(addColumnEvent);
return null;
},
alterColumnTypeEvent -> {
applyAlterColumnType(alterColumnTypeEvent);
return null;
},
createTableEvent -> {
applyCreateTable(createTableEvent);
return null;
},
dropColumnEvent -> {
applyDropColumn(dropColumnEvent);
return null;
},
dropTableEvent -> {
throw new UnsupportedSchemaChangeEventException(dropTableEvent);
},
renameColumnEvent -> {
applyRenameColumn(renameColumnEvent);
return null;
},
truncateTableEvent -> {
throw new UnsupportedSchemaChangeEventException(truncateTableEvent);
});
}
private void applyCreateTable(CreateTableEvent event) {
try {
Namespace namespace = Namespace.of(event.tableId().getNamespace());
TableIdentifier tableIdentifier =
TableIdentifier.of(namespace, event.tableId().getSchemaName());
org.apache.flink.cdc.common.schema.Schema cdcSchema = event.getSchema();
Table table = catalog.loadTable(tableIdentifier);
Schema icebergSchema = new Schema();
for (Column column : cdcSchema.getColumns()) {
table.updateSchema()
.addColumn(
column.getName(),
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(column.getType())
.getLogicalType()),
column.getComment());
}
table.updateSchema().setIdentifierFields(cdcSchema.partitionKeys());
PartitionSpec spec = null;
if (partitionMaps.containsKey(event.tableId())) {
spec =
PartitionSpec.builderFor(icebergSchema)
.identity(String.join(",", partitionMaps.get(event.tableId())))
.build();
} else if (cdcSchema.partitionKeys() != null && !cdcSchema.partitionKeys().isEmpty()) {
spec =
PartitionSpec.builderFor(icebergSchema)
.identity(String.join(",", cdcSchema.partitionKeys()))
.build();
}
catalog.createTable(tableIdentifier, icebergSchema, spec, catalogOptions);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyAddColumn(AddColumnEvent event) {
try {
Namespace namespace = Namespace.of(event.tableId().getNamespace());
TableIdentifier tableIdentifier =
TableIdentifier.of(namespace, event.tableId().getSchemaName());
Table table = catalog.loadTable(tableIdentifier);
applyAddColumnEventWithPosition(table, event);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
throws SchemaEvolveException {
try {
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
Column addColumn = columnWithPosition.getAddColumn();
switch (columnWithPosition.getPosition()) {
case FIRST:
table.updateSchema()
.addColumn(
addColumn.getName(),
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType()),
addColumn.getComment());
table.updateSchema().moveFirst(addColumn.getName());
break;
case LAST:
table.updateSchema()
.addColumn(
addColumn.getName(),
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType()),
addColumn.getComment());
break;
case BEFORE:
table.updateSchema()
.addColumn(
addColumn.getName(),
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType()),
addColumn.getComment());
table.updateSchema()
.moveBefore(
addColumn.getName(),
columnWithPosition.getExistedColumnName());
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
table.updateSchema()
.addColumn(
addColumn.getName(),
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType()),
addColumn.getComment());
table.updateSchema()
.moveAfter(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
break;
default:
throw new SchemaEvolveException(
event,
"Unknown column position: " + columnWithPosition.getPosition());
}
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyDropColumn(DropColumnEvent event) {
try {
Namespace namespace = Namespace.of(event.tableId().getNamespace());
TableIdentifier tableIdentifier =
TableIdentifier.of(namespace, event.tableId().getSchemaName());
Table table = catalog.loadTable(tableIdentifier);
event.getDroppedColumnNames()
.forEach((column) -> table.updateSchema().deleteColumn(column));
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyRenameColumn(RenameColumnEvent event) {
try {
Namespace namespace = Namespace.of(event.tableId().getNamespace());
TableIdentifier tableIdentifier =
TableIdentifier.of(namespace, event.tableId().getSchemaName());
Table table = catalog.loadTable(tableIdentifier);
event.getNameMapping()
.forEach(
(oldName, newName) ->
table.updateSchema().renameColumn(oldName, newName));
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
private void applyAlterColumnType(AlterColumnTypeEvent event) {
try {
Namespace namespace = Namespace.of(event.tableId().getNamespace());
TableIdentifier tableIdentifier =
TableIdentifier.of(namespace, event.tableId().getSchemaName());
Table table = catalog.loadTable(tableIdentifier);
event.getTypeMapping()
.forEach(
(oldName, newType) -> {
Type.PrimitiveType type =
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(newType)
.getLogicalType())
.asPrimitiveType();
table.updateSchema().updateColumn(oldName, type);
});
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}
@Override
public MetadataApplier setAcceptedSchemaEvolutionTypes(
Set<SchemaChangeEventType> schemaEvolutionTypes) {
this.enabledSchemaEvolutionTypes = schemaEvolutionTypes;
return this;
}
@Override
public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
return enabledSchemaEvolutionTypes.contains(schemaChangeEventType);
}
@Override
public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
return Sets.newHashSet(
SchemaChangeEventType.CREATE_TABLE,
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.DROP_COLUMN,
SchemaChangeEventType.RENAME_COLUMN,
SchemaChangeEventType.ALTER_COLUMN_TYPE);
}
}

@ -0,0 +1,53 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
public class IcebergEvent {
// Identifier for the iceberg table to be written.
TableIdentifier tableId;
// The actual record to be written to iceberg table.
GenericRecord genericRow;
// if true, means that table schema has changed right before this genericRow.
boolean shouldRefreshSchema;
public IcebergEvent(
TableIdentifier tableId, GenericRecord genericRow, boolean shouldRefreshSchema) {
this.tableId = tableId;
this.genericRow = genericRow;
this.shouldRefreshSchema = shouldRefreshSchema;
}
public IcebergEvent(TableIdentifier tableId, GenericRecord genericRow) {
this.tableId = tableId;
this.genericRow = genericRow;
this.shouldRefreshSchema = false;
}
public TableIdentifier getTableId() {
return tableId;
}
public void setTableId(TableIdentifier tableId) {
this.tableId = tableId;
}
public GenericRecord getGenericRow() {
return genericRow;
}
public void setGenericRow(GenericRecord genericRow) {
this.genericRow = genericRow;
}
public boolean isShouldRefreshSchema() {
return shouldRefreshSchema;
}
public void setShouldRefreshSchema(boolean shouldRefreshSchema) {
this.shouldRefreshSchema = shouldRefreshSchema;
}
}

@ -0,0 +1,33 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.flink.sink.FlinkSink;
import java.time.ZoneId;
import java.util.Map;
public class IcebergEventSink extends IcebergSink implements SupportsPreWriteTopology<Event> {
public final String schemaOperatorUid;
public final ZoneId zoneId;
public IcebergEventSink(
Map<String, String> catalogOptions,
String commitUser,
IcebergRecordSerializer<Event> serializer,
String schemaOperatorUid,
ZoneId zoneId) {
super(catalogOptions, commitUser, serializer);
this.schemaOperatorUid = schemaOperatorUid;
this.zoneId = zoneId;
}
@Override
public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
return dataStream;
}
}

@ -0,0 +1,63 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import java.io.IOException;
import java.time.ZoneId;
import java.util.Map;
public class IcebergRecordEventSerializer implements IcebergRecordSerializer<Event> {
// maintain the latest schema of tableId.
private final Map<TableId, TableSchemaInfo> schemaMaps;
// ZoneId for converting relevant type.
private final ZoneId zoneId;
public IcebergRecordEventSerializer(Map<TableId, TableSchemaInfo> schemaMaps, ZoneId zoneId) {
this.schemaMaps = schemaMaps;
this.zoneId = zoneId;
}
@Override
public IcebergEvent serialize(Event event) throws IOException {
TableIdentifier tableId = TableIdentifier.of(((ChangeEvent) event).tableId().toString());
if (event instanceof SchemaChangeEvent) {
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
schemaMaps.put(
createTableEvent.tableId(),
new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
} else {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
schemaMaps.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(
SchemaUtils.applySchemaChangeEvent(
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
schemaChangeEvent),
zoneId));
}
return new IcebergEvent(tableId, null, true);
} else if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
GenericRecord genericRecord =
IcebergWriterHelper.convertEventToGenericRow(
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
return new IcebergEvent(tableId, genericRecord, false);
} else {
throw new IllegalArgumentException(
"failed to convert Input into IcebergEvent, unsupported event: " + event);
}
}
}

@ -0,0 +1,8 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import java.io.IOException;
import java.io.Serializable;
public interface IcebergRecordSerializer<Input> extends Serializable {
IcebergEvent serialize(Input t) throws IOException;
}

@ -0,0 +1,74 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.iceberg.catalog.ImmutableTableCommit;
import java.io.IOException;
import java.util.Map;
public class IcebergSink
implements Sink<Event>,
SupportsPreWriteTopology<Event>,
SupportsCommitter<ImmutableTableCommit> {
// provided a default commit user.
public static final String DEFAULT_COMMIT_USER = "admin";
protected final Map<String, String> catalogOptions;
protected final String commitUser;
private final IcebergRecordSerializer<Event> serializer;
public IcebergSink(
Map<String, String> catalogOptions, IcebergRecordSerializer<Event> serializer) {
this.catalogOptions = catalogOptions;
this.serializer = serializer;
commitUser = DEFAULT_COMMIT_USER;
}
public IcebergSink(
Map<String, String> catalogOptions,
String commitUser,
IcebergRecordSerializer<Event> serializer) {
this.catalogOptions = catalogOptions;
this.commitUser = commitUser;
this.serializer = serializer;
}
@Override
public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
return null;
}
@Override
public Committer<ImmutableTableCommit> createCommitter(
CommitterInitContext committerInitContext) throws IOException {
return null;
}
@Override
public SimpleVersionedSerializer<ImmutableTableCommit> getCommittableSerializer() {
return null;
}
@Override
public SinkWriter<Event> createWriter(InitContext initContext) throws IOException {
return null;
}
@Override
public SinkWriter<Event> createWriter(WriterInitContext context) throws IOException {
return Sink.super.createWriter(context);
}
}

@ -0,0 +1,31 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.iceberg.catalog.ImmutableTableCommit;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
public class IcebergWriter<InputT> implements CommittingSinkWriter<InputT, ImmutableTableCommit> {
@Override
public Collection<ImmutableTableCommit> prepareCommit()
throws IOException, InterruptedException {
return Collections.emptyList();
}
@Override
public void write(InputT inputT, Context context) throws IOException, InterruptedException {}
@Override
public void flush(boolean b) throws IOException, InterruptedException {}
@Override
public void writeWatermark(Watermark watermark) throws IOException, InterruptedException {}
@Override
public void close() throws Exception {}
}

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.flink.data.StructRowData;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.DecimalUtil;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;
public class IcebergWriterHelper {
public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) {
List<Column> columns = schema.getColumns();
List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(columns.size());
for (int i = 0; i < columns.size(); i++) {
fieldGetters.add(createFieldGetter(columns.get(i).getType(), i, zoneId));
}
return fieldGetters;
}
// Iceberg RowDataWrapper
private static RecordData.FieldGetter createFieldGetter(
DataType fieldType, int fieldPos, ZoneId zoneId) {
final RecordData.FieldGetter fieldGetter;
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
case VARCHAR:
fieldGetter = row -> row.getString(fieldPos).toString();
break;
case BOOLEAN:
fieldGetter = row -> row.getBoolean(fieldPos);
break;
case BINARY:
case VARBINARY:
fieldGetter = row -> row.getBinary(fieldPos);
break;
case DECIMAL:
final int decimalPrecision = DataTypeChecks.getPrecision(fieldType);
final int decimalScale = DataTypeChecks.getScale(fieldType);
fieldGetter =
row -> {
DecimalData decimalData =
row.getDecimal(fieldPos, decimalPrecision, decimalScale);
return DecimalUtil.toReusedFixLengthBytes(
decimalPrecision,
decimalScale,
decimalData.toBigDecimal(),
new byte[TypeUtil.decimalRequiredBytes(decimalPrecision)]);
};
break;
case TINYINT:
fieldGetter = row -> row.getByte(fieldPos);
break;
case SMALLINT:
fieldGetter = row -> row.getShort(fieldPos);
break;
case BIGINT:
fieldGetter = row -> row.getLong(fieldPos);
break;
case FLOAT:
fieldGetter = row -> row.getFloat(fieldPos);
break;
case DOUBLE:
fieldGetter = row -> row.getDouble(fieldPos);
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
// Time in RowData is in milliseconds (Integer), while iceberg's time is
// microseconds
// (Long).
fieldGetter = row -> ((long) row.getInt(fieldPos)) * 1_000;
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
fieldGetter =
(row) -> {
LocalDateTime localDateTime =
row.getTimestamp(
fieldPos,
DataTypeChecks.getPrecision(fieldType))
.toLocalDateTime();
return DateTimeUtil.microsFromTimestamp(localDateTime);
};
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
fieldGetter =
row ->
DateTimeUtil.microsFromInstant(
row.getLocalZonedTimestampData(
fieldPos,
DataTypeChecks.getPrecision(fieldType))
.toInstant());
break;
case ROW:
final int rowFieldCount = getFieldCount(fieldType);
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
break;
default:
throw new IllegalArgumentException(
"don't support type of " + fieldType.getTypeRoot());
}
if (!fieldType.isNullable()) {
return fieldGetter;
}
return row -> {
if (row.isNullAt(fieldPos)) {
return null;
}
return fieldGetter.getFieldOrNull(row);
};
}
public static GenericRecord convertEventToGenericRow(
DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> fieldGetters) {
StructRowData structRowData;
GenericRecord genericRow = null;
RecordData recordData;
switch (dataChangeEvent.op()) {
case INSERT:
case UPDATE:
case REPLACE:
{
recordData = dataChangeEvent.after();
structRowData = new StructRowData(Types.StructType.of(), RowKind.INSERT);
break;
}
case DELETE:
{
recordData = dataChangeEvent.before();
structRowData = new StructRowData(Types.StructType.of(), RowKind.DELETE);
break;
}
default:
throw new IllegalArgumentException("don't support type of " + dataChangeEvent.op());
}
for (int i = 0; i < recordData.getArity(); i++) {
// todo : how to set this row to
genericRow.setField(null, fieldGetters.get(i).getFieldOrNull(recordData));
}
return genericRow;
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.shaded.guava31.com.google.common.hash.Hashing;
import static java.nio.charset.StandardCharsets.UTF_8;
/** Generating {@link OperatorID} for communication between Flink operators. */
@Internal
public class OperatorIDGenerator {
private final String transformationUid;
public OperatorIDGenerator(String transformationUid) {
this.transformationUid = transformationUid;
}
/**
* Generate {@link OperatorID}.
*
* <p>Operator ID generation is an internal implementation inside Flink, happening during the
* stream graph generating phase, so our algorithm of generating operator ID should be exactly
* the same as in Flink, in order to make sure that operators can reach out each other on the
* cluster.
*
* @see
* org.apache.flink.streaming.api.graph.StreamGraphHasherV2#traverseStreamGraphAndGenerateHashes
* the algorithm of generating operator ID in Flink
*/
public OperatorID generate() {
byte[] hash =
Hashing.murmur3_128(0)
.newHasher()
.putString(transformationUid, UTF_8)
.hash()
.asBytes();
return new OperatorID(hash);
}
}

@ -0,0 +1,27 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.schema.Schema;
import java.time.ZoneId;
import java.util.List;
public class TableSchemaInfo {
private final Schema schema;
private final List<RecordData.FieldGetter> fieldGetters;
public TableSchemaInfo(Schema schema, ZoneId zoneId) {
this.schema = schema;
this.fieldGetters = IcebergWriterHelper.createFieldGetters(schema, zoneId);
}
public Schema getSchema() {
return schema;
}
public List<RecordData.FieldGetter> getFieldGetters() {
return fieldGetters;
}
}

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkFactory

@ -37,6 +37,7 @@ limitations under the License.
<module>flink-cdc-pipeline-connector-elasticsearch</module>
<module>flink-cdc-pipeline-connector-oceanbase</module>
<module>flink-cdc-pipeline-connector-maxcompute</module>
<module>flink-cdc-pipeline-connector-iceberg</module>
</modules>
<dependencies>

Loading…
Cancel
Save