diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index aa9dc8879..66467b4d5 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -87,6 +87,20 @@ Pipeline 连接器配置项
String |
Sink 的名称。 |
+
+ partition.strategy |
+ optional |
+ (none) |
+ String |
+ 定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 `hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`。 |
+
+
+ key.format |
+ optional |
+ (none) |
+ String |
+ 用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json`, 默认值为 `json`。 |
+
value.format |
optional |
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 57f690666..6bb94bc37 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -85,6 +85,20 @@ Pipeline Connector Options
String |
The name of the sink. |
+
+ partition.strategy |
+ optional |
+ (none) |
+ String |
+ Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`. |
+
+
+ key.format |
+ optional |
+ (none) |
+ String |
+ Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`. |
+
value.format |
optional |
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index 5be032a76..2614f594f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -45,6 +45,14 @@ limitations under the License.
provided
+
+ org.apache.flink
+ flink-csv
+ ${flink.version}
+ provided
+
+
+
org.apache.flink
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
index a8480d77d..fe764ea85 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
@@ -39,26 +40,59 @@ import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
/** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
public class TableSchemaInfo {
+ private final TableId tableId;
+
private final Schema schema;
+ private final List primaryKeyColumnIndexes;
+
private final List fieldGetters;
private final SerializationSchema serializationSchema;
public TableSchemaInfo(
- Schema schema, SerializationSchema serializationSchema, ZoneId zoneId) {
+ TableId tableId,
+ Schema schema,
+ SerializationSchema serializationSchema,
+ ZoneId zoneId) {
+ this.tableId = tableId;
this.schema = schema;
this.serializationSchema = serializationSchema;
this.fieldGetters = createFieldGetters(schema, zoneId);
+ primaryKeyColumnIndexes = new ArrayList<>();
+ for (int keyIndex = 0; keyIndex < schema.primaryKeys().size(); keyIndex++) {
+ for (int columnIndex = 0; columnIndex < schema.getColumnCount(); columnIndex++) {
+ if (schema.getColumns()
+ .get(columnIndex)
+ .getName()
+ .equals(schema.primaryKeys().get(keyIndex))) {
+ primaryKeyColumnIndexes.add(columnIndex);
+ break;
+ }
+ }
+ }
}
/** convert to {@link RowData}, which will be pass to serializationSchema. */
- public RowData getRowDataFromRecordData(RecordData recordData) {
- GenericRowData genericRowData = new GenericRowData(recordData.getArity());
- for (int i = 0; i < recordData.getArity(); i++) {
- genericRowData.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
+ public RowData getRowDataFromRecordData(RecordData recordData, boolean primaryKeyOnly) {
+ if (primaryKeyOnly) {
+ GenericRowData genericRowData = new GenericRowData(primaryKeyColumnIndexes.size() + 1);
+ genericRowData.setField(0, StringData.fromString(tableId.toString()));
+ for (int i = 0; i < primaryKeyColumnIndexes.size(); i++) {
+ genericRowData.setField(
+ i + 1,
+ fieldGetters
+ .get(primaryKeyColumnIndexes.get(i))
+ .getFieldOrNull(recordData));
+ }
+ return genericRowData;
+ } else {
+ GenericRowData genericRowData = new GenericRowData(recordData.getArity());
+ for (int i = 0; i < recordData.getArity(); i++) {
+ genericRowData.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
+ }
+ return genericRowData;
}
- return genericRowData;
}
private static List createFieldGetters(Schema schema, ZoneId zoneId) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index 0a145cab7..d0c617975 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -127,7 +127,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema
}
jsonSerializers.put(
schemaChangeEvent.tableId(),
- new TableSchemaInfo(schema, jsonSerializer, zoneId));
+ new TableSchemaInfo(
+ schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
return null;
}
@@ -153,7 +154,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
- .getRowDataFromRecordData((dataChangeEvent.after()))
+ .getRowDataFromRecordData(
+ dataChangeEvent.after(), false)
}));
reuseGenericRowData.setField(2, OP_INSERT);
return jsonSerializers
@@ -168,7 +170,7 @@ public class CanalJsonSerializationSchema implements SerializationSchema
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
- (dataChangeEvent.before()))
+ dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(1, null);
reuseGenericRowData.setField(2, OP_DELETE);
@@ -185,7 +187,7 @@ public class CanalJsonSerializationSchema implements SerializationSchema
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
- (dataChangeEvent.before()))
+ dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(
1,
@@ -193,7 +195,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
- .getRowDataFromRecordData((dataChangeEvent.after()))
+ .getRowDataFromRecordData(
+ dataChangeEvent.after(), false)
}));
reuseGenericRowData.setField(2, OP_UPDATE);
return jsonSerializers
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index ce8afc0db..15cecbc4f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -126,7 +126,8 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * A map of {@link TableId} and its {@link SerializationSchema} to serialize Debezium JSON data.
+ */
+ private final Map csvSerializers;
+
+ private final ZoneId zoneId;
+
+ private InitializationContext context;
+
+ public CsvSerializationSchema(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ csvSerializers = new HashMap<>();
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public byte[] serialize(Event event) {
+ if (event instanceof SchemaChangeEvent) {
+ Schema schema;
+ SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+ if (event instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) event;
+ schema = createTableEvent.getSchema();
+ } else {
+ schema =
+ SchemaUtils.applySchemaChangeEvent(
+ csvSerializers.get(schemaChangeEvent.tableId()).getSchema(),
+ schemaChangeEvent);
+ }
+ CsvRowDataSerializationSchema csvSerializer = buildSerializationForPrimaryKey(schema);
+ try {
+ csvSerializer.open(context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ csvSerializers.put(
+ schemaChangeEvent.tableId(),
+ new TableSchemaInfo(
+ schemaChangeEvent.tableId(), schema, csvSerializer, zoneId));
+ return null;
+ }
+ DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+ RecordData recordData =
+ dataChangeEvent.op().equals(OperationType.DELETE)
+ ? dataChangeEvent.before()
+ : dataChangeEvent.after();
+ TableSchemaInfo tableSchemaInfo = csvSerializers.get(dataChangeEvent.tableId());
+ return tableSchemaInfo
+ .getSerializationSchema()
+ .serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
+ }
+
+ private CsvRowDataSerializationSchema buildSerializationForPrimaryKey(Schema schema) {
+ DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
+ fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
+ for (int i = 0; i < schema.primaryKeys().size(); i++) {
+ Column column = schema.getColumn(schema.primaryKeys().get(i)).get();
+ fields[i + 1] = DataTypes.FIELD(column.getName(), column.getType());
+ }
+ // the row should never be null
+ DataType dataType = DataTypes.ROW(fields).notNull();
+ LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
+ return new CsvRowDataSerializationSchema.Builder((RowType) rowType).build();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
new file mode 100644
index 000000000..5425d444e
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataField;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A {@link SerializationSchema} to convert {@link Event} into byte of json format. */
+public class JsonSerializationSchema implements SerializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * A map of {@link TableId} and its {@link SerializationSchema} to serialize Debezium JSON data.
+ */
+ private final Map jsonSerializers;
+
+ private final TimestampFormat timestampFormat;
+
+ private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
+
+ private final String mapNullKeyLiteral;
+
+ private final boolean encodeDecimalAsPlainNumber;
+
+ private final ZoneId zoneId;
+
+ private InitializationContext context;
+
+ public JsonSerializationSchema(
+ TimestampFormat timestampFormat,
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ ZoneId zoneId,
+ boolean encodeDecimalAsPlainNumber) {
+ this.timestampFormat = timestampFormat;
+ this.mapNullKeyMode = mapNullKeyMode;
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ this.zoneId = zoneId;
+ jsonSerializers = new HashMap<>();
+ }
+
+ @Override
+ public void open(InitializationContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public byte[] serialize(Event event) {
+ if (event instanceof SchemaChangeEvent) {
+ Schema schema;
+ SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
+ if (event instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) event;
+ schema = createTableEvent.getSchema();
+ } else {
+ schema =
+ SchemaUtils.applySchemaChangeEvent(
+ jsonSerializers.get(schemaChangeEvent.tableId()).getSchema(),
+ schemaChangeEvent);
+ }
+ JsonRowDataSerializationSchema jsonSerializer = buildSerializationForPrimaryKey(schema);
+ try {
+ jsonSerializer.open(context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ jsonSerializers.put(
+ schemaChangeEvent.tableId(),
+ new TableSchemaInfo(
+ schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
+ return null;
+ }
+ DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+ RecordData recordData =
+ dataChangeEvent.op().equals(OperationType.DELETE)
+ ? dataChangeEvent.before()
+ : dataChangeEvent.after();
+ TableSchemaInfo tableSchemaInfo = jsonSerializers.get(dataChangeEvent.tableId());
+ return tableSchemaInfo
+ .getSerializationSchema()
+ .serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
+ }
+
+ private JsonRowDataSerializationSchema buildSerializationForPrimaryKey(Schema schema) {
+ DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
+ fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
+ for (int i = 0; i < schema.primaryKeys().size(); i++) {
+ Column column = schema.getColumn(schema.primaryKeys().get(i)).get();
+ fields[i + 1] = DataTypes.FIELD(column.getName(), column.getType());
+ }
+ // the row should never be null
+ DataType dataType = DataTypes.ROW(fields).notNull();
+ LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
+ return new JsonRowDataSerializationSchema(
+ (RowType) rowType,
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
index 2ab264c5a..2dfc021b3 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java
@@ -29,7 +29,6 @@ import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -46,10 +45,12 @@ public class KafkaDataSink implements DataSink {
final DeliveryGuarantee deliveryGuarantee;
- final FlinkKafkaPartitioner partitioner;
+ private final PartitionStrategy partitionStrategy;
final ZoneId zoneId;
+ final SerializationSchema keySerialization;
+
final SerializationSchema valueSerialization;
final String topic;
@@ -61,16 +62,18 @@ public class KafkaDataSink implements DataSink {
public KafkaDataSink(
DeliveryGuarantee deliveryGuarantee,
Properties kafkaProperties,
- FlinkKafkaPartitioner partitioner,
+ PartitionStrategy partitionStrategy,
ZoneId zoneId,
+ SerializationSchema keySerialization,
SerializationSchema valueSerialization,
String topic,
boolean addTableToHeaderEnabled,
String customHeaders) {
this.deliveryGuarantee = deliveryGuarantee;
this.kafkaProperties = kafkaProperties;
- this.partitioner = partitioner;
+ this.partitionStrategy = partitionStrategy;
this.zoneId = zoneId;
+ this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
this.topic = topic;
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
@@ -90,7 +93,8 @@ public class KafkaDataSink implements DataSink {
.setKafkaProducerConfig(kafkaProperties)
.setRecordSerializer(
new PipelineKafkaRecordSerializationSchema(
- partitioner,
+ partitionStrategy,
+ keySerialization,
valueSerialization,
topic,
addTableToHeaderEnabled,
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
index 243991d1a..d89812913 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
@@ -28,7 +28,6 @@ import org.apache.flink.cdc.connectors.kafka.json.ChangeLogJsonFormatFactory;
import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import java.time.ZoneId;
import java.util.HashSet;
@@ -37,6 +36,8 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.KEY_FORMAT;
+import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PARTITION_STRATEGY;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
@@ -65,6 +66,9 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
}
+ KeyFormat keyFormat = context.getFactoryConfiguration().get(KEY_FORMAT);
+ SerializationSchema keySerialization =
+ KeySerializationFactory.createSerializationSchema(configuration, keyFormat, zoneId);
JsonSerializationType jsonSerializationType =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
SerializationSchema valueSerialization =
@@ -86,11 +90,14 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
.get(KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED);
String customHeaders =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
+ PartitionStrategy partitionStrategy =
+ context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
return new KafkaDataSink(
deliveryGuarantee,
kafkaProperties,
- new FlinkFixedPartitioner<>(),
+ partitionStrategy,
zoneId,
+ keySerialization,
valueSerialization,
topic,
addTableToHeaderEnabled,
@@ -110,7 +117,9 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
@Override
public Set> optionalOptions() {
Set> options = new HashSet<>();
+ options.add(KEY_FORMAT);
options.add(VALUE_FORMAT);
+ options.add(PARTITION_STRATEGY);
options.add(TOPIC);
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
options.add(SINK_CUSTOM_HEADER);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
index e55e149a9..ca82f5c80 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
@@ -35,6 +35,22 @@ public class KafkaDataSinkOptions {
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");
+ public static final ConfigOption PARTITION_STRATEGY =
+ key("partition.strategy")
+ .enumType(PartitionStrategy.class)
+ .defaultValue(PartitionStrategy.ALL_TO_ZERO)
+ .withDescription(
+ "Defines the strategy for sending record to kafka topic, "
+ + "available options are `all-to-zero` and `hash-by-key`, default option is `all-to-zero`.");
+
+ public static final ConfigOption KEY_FORMAT =
+ key("key.format")
+ .enumType(KeyFormat.class)
+ .defaultValue(KeyFormat.JSON)
+ .withDescription(
+ "Defines the format identifier for encoding key data, "
+ + "available options are `csv` and `json`, default option is `json`.");
+
public static final ConfigOption VALUE_FORMAT =
key("value.format")
.enumType(JsonSerializationType.class)
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeyFormat.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeyFormat.java
new file mode 100644
index 000000000..7c065d47f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeyFormat.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/** Enum class for building {@link SerializationSchema} for {@link ProducerRecord}. */
+public enum KeyFormat {
+ JSON("json"),
+
+ CSV("csv");
+
+ private final String value;
+
+ KeyFormat(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
new file mode 100644
index 000000000..76132d8e5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
+import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonFormatOptionsUtil;
+
+import java.time.ZoneId;
+
+import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
+import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
+
+/**
+ * Format factory for providing configured instances of {@link SerializationSchema} to convert
+ * {@link Event} to byte.
+ */
+public class KeySerializationFactory {
+
+ /**
+ * Creates a configured instance of {@link SerializationSchema} to convert {@link Event} to
+ * byte.
+ */
+ public static SerializationSchema createSerializationSchema(
+ ReadableConfig formatOptions, KeyFormat keyFormat, ZoneId zoneId) {
+ switch (keyFormat) {
+ case JSON:
+ {
+ TimestampFormat timestampFormat =
+ JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
+ JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
+ String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
+
+ final boolean encodeDecimalAsPlainNumber =
+ formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ return new JsonSerializationSchema(
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ zoneId,
+ encodeDecimalAsPlainNumber);
+ }
+ case CSV:
+ {
+ return new CsvSerializationSchema(zoneId);
+ }
+ default:
+ {
+ throw new IllegalArgumentException("UnSupport key format of " + keyFormat);
+ }
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PartitionStrategy.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PartitionStrategy.java
new file mode 100644
index 000000000..7638c6635
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PartitionStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.sink;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/** Partition Strategy for sending {@link ProducerRecord} to kafka partition. */
+public enum PartitionStrategy {
+
+ /** All {@link ProducerRecord} will be sent to partition 0. */
+ ALL_TO_ZERO("all-to-zero"),
+
+ /** {@link ProducerRecord} will be sent to specific partition by primary key. */
+ HASH_BY_KEY("hash-by-key");
+
+ private final String value;
+
+ PartitionStrategy(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
index bb27753d9..85e5e3f19 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java
@@ -23,14 +23,11 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
-import javax.annotation.Nullable;
-
import java.util.HashMap;
import java.util.Map;
@@ -46,7 +43,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class PipelineKafkaRecordSerializationSchema
implements KafkaRecordSerializationSchema {
- private final FlinkKafkaPartitioner partitioner;
+
+ private final Integer partition;
+
+ private final SerializationSchema keySerialization;
+
private final SerializationSchema valueSerialization;
private final String unifiedTopic;
@@ -63,12 +64,13 @@ public class PipelineKafkaRecordSerializationSchema
public static final String TABLE_NAME_HEADER_KEY = "tableName";
PipelineKafkaRecordSerializationSchema(
- @Nullable FlinkKafkaPartitioner partitioner,
+ PartitionStrategy partitionStrategy,
+ SerializationSchema keySerialization,
SerializationSchema valueSerialization,
String unifiedTopic,
boolean addTableToHeaderEnabled,
String customHeaderString) {
- this.partitioner = partitioner;
+ this.keySerialization = keySerialization;
this.valueSerialization = checkNotNull(valueSerialization);
this.unifiedTopic = unifiedTopic;
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
@@ -87,12 +89,14 @@ public class PipelineKafkaRecordSerializationSchema
}
}
}
+ partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null;
}
@Override
public ProducerRecord serialize(
Event event, KafkaSinkContext context, Long timestamp) {
ChangeEvent changeEvent = (ChangeEvent) event;
+ final byte[] keySerialized = keySerialization.serialize(event);
final byte[] valueSerialized = valueSerialization.serialize(event);
if (event instanceof SchemaChangeEvent) {
// skip sending SchemaChangeEvent.
@@ -121,37 +125,13 @@ public class PipelineKafkaRecordSerializationSchema
}
}
return new ProducerRecord<>(
- topic,
- extractPartition(
- changeEvent, valueSerialized, context.getPartitionsForTopic(topic)),
- null,
- null,
- valueSerialized,
- recordHeaders);
+ topic, partition, null, keySerialized, valueSerialized, recordHeaders);
}
@Override
public void open(
SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
throws Exception {
- if (partitioner != null) {
- partitioner.open(
- sinkContext.getParallelInstanceId(),
- sinkContext.getNumberOfParallelInstances());
- }
valueSerialization.open(context);
}
-
- private Integer extractPartition(
- ChangeEvent changeEvent, byte[] valueSerialized, int[] partitions) {
- if (partitioner != null) {
- return partitioner.partition(
- changeEvent,
- null,
- valueSerialized,
- changeEvent.tableId().toString(),
- partitions);
- }
- return null;
- }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
index 73f8ad64d..2c2feaae7 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfoTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@@ -95,7 +96,9 @@ public class TableSchemaInfoTest {
"null_string", org.apache.flink.cdc.common.types.DataTypes.STRING())
.primaryKey("col1")
.build();
- TableSchemaInfo tableSchemaInfo = new TableSchemaInfo(schema, null, ZoneId.of("UTC+8"));
+ TableSchemaInfo tableSchemaInfo =
+ new TableSchemaInfo(
+ TableId.parse("testDatabase.testTable"), schema, null, ZoneId.of("UTC+8"));
Object[] testData =
new Object[] {
BinaryStringData.fromString("pk"),
@@ -159,6 +162,6 @@ public class TableSchemaInfoTest {
org.apache.flink.table.data.TimestampData.fromInstant(
Instant.parse("2023-01-01T08:00:00.000Z")),
null),
- tableSchemaInfo.getRowDataFromRecordData(recordData));
+ tableSchemaInfo.getRowDataFromRecordData(recordData, false));
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchemaTest.java
new file mode 100644
index 000000000..92b302f7e
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/CsvSerializationSchemaTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.kafka.json.MockInitializationContext;
+import org.apache.flink.cdc.connectors.kafka.sink.KeyFormat;
+import org.apache.flink.cdc.connectors.kafka.sink.KeySerializationFactory;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+
+/** Tests for {@link CsvSerializationSchema}. */
+public class CsvSerializationSchemaTest {
+
+ public static final TableId TABLE_1 =
+ TableId.tableId("default_namespace", "default_schema", "table1");
+
+ @Test
+ public void testSerialize() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+ .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ SerializationSchema serializationSchema =
+ KeySerializationFactory.createSerializationSchema(
+ new Configuration(), KeyFormat.CSV, ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
+ Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+
+ // insert
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ String expected = "\"default_namespace.default_schema.table1\",1";
+ String actual = new String(serializationSchema.serialize(insertEvent1));
+ Assertions.assertEquals(expected, actual);
+
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ expected = "\"default_namespace.default_schema.table1\",2";
+ actual = new String(serializationSchema.serialize(insertEvent2));
+ Assertions.assertEquals(expected, actual);
+
+ DataChangeEvent deleteEvent =
+ DataChangeEvent.deleteEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ expected = "\"default_namespace.default_schema.table1\",2";
+ actual = new String(serializationSchema.serialize(deleteEvent));
+ Assertions.assertEquals(expected, actual);
+
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("x")
+ }));
+ expected = "\"default_namespace.default_schema.table1\",1";
+ actual = new String(serializationSchema.serialize(updateEvent));
+ Assertions.assertEquals(expected, actual);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchemaTest.java
new file mode 100644
index 000000000..d21f1fb5f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchemaTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.serialization;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.kafka.json.MockInitializationContext;
+import org.apache.flink.cdc.connectors.kafka.sink.KeyFormat;
+import org.apache.flink.cdc.connectors.kafka.sink.KeySerializationFactory;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.ZoneId;
+
+/** Tests for {@link JsonSerializationSchema}. */
+public class JsonSerializationSchemaTest {
+
+ public static final TableId TABLE_1 =
+ TableId.tableId("default_namespace", "default_schema", "table1");
+
+ @Test
+ public void testSerialize() throws Exception {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+ .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ SerializationSchema serializationSchema =
+ KeySerializationFactory.createSerializationSchema(
+ new Configuration(), KeyFormat.JSON, ZoneId.systemDefault());
+ serializationSchema.open(new MockInitializationContext());
+
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
+ Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+
+ // insert
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }));
+ JsonNode expected =
+ mapper.readTree(
+ "{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"1\"}");
+ JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
+ Assertions.assertEquals(expected, actual);
+
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ expected =
+ mapper.readTree(
+ "{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"2\"}");
+ actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
+ Assertions.assertEquals(expected, actual);
+
+ DataChangeEvent deleteEvent =
+ DataChangeEvent.deleteEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("2"),
+ BinaryStringData.fromString("2")
+ }));
+ expected =
+ mapper.readTree(
+ "{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"2\"}");
+ actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
+ Assertions.assertEquals(expected, actual);
+
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("1")
+ }),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
+ BinaryStringData.fromString("x")
+ }));
+ expected =
+ mapper.readTree(
+ "{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"1\"}");
+ actual = mapper.readTree(serializationSchema.serialize(updateEvent));
+ Assertions.assertEquals(expected, actual);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
index 1ef10603b..7936c0ef3 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -67,9 +68,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -259,7 +262,7 @@ class KafkaDataSinkITCase extends TestLogger {
env.execute();
final List> collectedRecords =
- drainAllRecordsFromTopic(topic, false);
+ drainAllRecordsFromTopic(topic, false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
ObjectMapper mapper =
@@ -321,7 +324,7 @@ class KafkaDataSinkITCase extends TestLogger {
env.execute();
final List> collectedRecords =
- drainAllRecordsFromTopic(topic, false);
+ drainAllRecordsFromTopic(topic, false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
for (ConsumerRecord consumerRecord : collectedRecords) {
@@ -363,6 +366,104 @@ class KafkaDataSinkITCase extends TestLogger {
checkProducerLeak();
}
+ @Test
+ void testHashByKeyPartitionStrategyUsingJson() throws Exception {
+ final StreamExecutionEnvironment env = new LocalStreamEnvironment();
+ env.enableCheckpointing(1000L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ final DataStream source =
+ env.fromCollection(createSourceEvents(), new EventTypeInfo());
+ Map config = new HashMap<>();
+ Properties properties = getKafkaClientConfiguration();
+ properties.forEach(
+ (key, value) ->
+ config.put(
+ KafkaDataSinkOptions.PROPERTIES_PREFIX + key.toString(),
+ value.toString()));
+ config.put(KafkaDataSinkOptions.KEY_FORMAT.key(), KeyFormat.JSON.toString());
+ config.put(
+ KafkaDataSinkOptions.VALUE_FORMAT.key(),
+ JsonSerializationType.CANAL_JSON.toString());
+ source.sinkTo(
+ ((FlinkSinkProvider)
+ (new KafkaDataSinkFactory()
+ .createDataSink(
+ new FactoryHelper.DefaultContext(
+ Configuration.fromMap(config),
+ Configuration.fromMap(new HashMap<>()),
+ this.getClass().getClassLoader()))
+ .getEventSinkProvider()))
+ .getSink());
+ env.execute();
+
+ final List> collectedRecords =
+ drainAllRecordsFromTopic(topic, false);
+ final long recordsCount = 5;
+ assertThat(recordsCount).isEqualTo(collectedRecords.size());
+ for (ConsumerRecord consumerRecord : collectedRecords) {
+ assertThat(
+ consumerRecord
+ .headers()
+ .headers(
+ PipelineKafkaRecordSerializationSchema
+ .TABLE_NAME_HEADER_KEY)
+ .iterator())
+ .isExhausted();
+ }
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+ .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ List> expected =
+ Arrays.asList(
+ Tuple2.of(
+ mapper.readTree(
+ String.format(
+ "{\"TableId\":\"%s\",\"col1\":\"1\"}",
+ table1.toString())),
+ mapper.readTree(
+ String.format(
+ "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+ table1.getTableName()))),
+ Tuple2.of(
+ mapper.readTree(
+ String.format(
+ "{\"TableId\":\"%s\",\"col1\":\"2\"}",
+ table1.toString())),
+ mapper.readTree(
+ String.format(
+ "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+ table1.getTableName()))),
+ Tuple2.of(
+ mapper.readTree(
+ String.format(
+ "{\"TableId\":\"%s\",\"col1\":\"3\"}",
+ table1.toString())),
+ mapper.readTree(
+ String.format(
+ "{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+ table1.getTableName()))),
+ Tuple2.of(
+ mapper.readTree(
+ String.format(
+ "{\"TableId\":\"%s\",\"col1\":\"1\"}",
+ table1.toString())),
+ mapper.readTree(
+ String.format(
+ "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
+ table1.getTableName()))),
+ Tuple2.of(
+ mapper.readTree(
+ String.format(
+ "{\"TableId\":\"%s\",\"col1\":\"2\"}",
+ table1.toString())),
+ mapper.readTree(
+ String.format(
+ "{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
+ table1.getTableName()))));
+ assertThat(deserializeKeyValues(collectedRecords)).containsAll(expected);
+ checkProducerLeak();
+ }
+
@Test
void testTopicAndHeaderOption() throws Exception {
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
@@ -392,7 +493,7 @@ class KafkaDataSinkITCase extends TestLogger {
env.execute();
final List> collectedRecords =
- drainAllRecordsFromTopic("test_topic", false);
+ drainAllRecordsFromTopic("test_topic", false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
for (ConsumerRecord consumerRecord : collectedRecords) {
@@ -460,9 +561,13 @@ class KafkaDataSinkITCase extends TestLogger {
}
private List> drainAllRecordsFromTopic(
- String topic, boolean committed) {
+ String topic, boolean committed, int... partitionArr) {
Properties properties = getKafkaClientConfiguration();
- return KafkaUtil.drainAllRecordsFromTopic(topic, properties, committed);
+ Set partitions = new HashSet<>();
+ for (int partition : partitionArr) {
+ partitions.add(partition);
+ }
+ return KafkaUtil.drainAllRecordsFromTopic(topic, properties, committed, partitions);
}
private void checkProducerLeak() throws InterruptedException {
@@ -486,6 +591,18 @@ class KafkaDataSinkITCase extends TestLogger {
+ leaks.stream().map(this::format).collect(Collectors.joining("\n\n")));
}
+ private static List> deserializeKeyValues(
+ List> records) throws IOException {
+ ObjectMapper mapper =
+ JacksonMapperFactory.createObjectMapper()
+ .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+ List> result = new ArrayList<>();
+ for (ConsumerRecord record : records) {
+ result.add(Tuple2.of(mapper.readTree(record.key()), mapper.readTree(record.value())));
+ }
+ return result;
+ }
+
private static List deserializeValues(List> records)
throws IOException {
ObjectMapper mapper =
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
index d1f36f657..775b5b8a0 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaUtil.java
@@ -115,13 +115,14 @@ public class KafkaUtil {
* @throws KafkaException
*/
public static List> drainAllRecordsFromTopic(
- String topic, Properties properties, boolean committed) throws KafkaException {
+ String topic, Properties properties, boolean committed, Set partitions)
+ throws KafkaException {
final Properties consumerConfig = new Properties();
consumerConfig.putAll(properties);
consumerConfig.put(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
committed ? "read_committed" : "read_uncommitted");
- return drainAllRecordsFromTopic(topic, consumerConfig);
+ return drainAllRecordsFromTopic(topic, consumerConfig, partitions);
}
/**
@@ -137,13 +138,17 @@ public class KafkaUtil {
* @throws KafkaException
*/
public static List> drainAllRecordsFromTopic(
- String topic, Properties properties) throws KafkaException {
+ String topic, Properties properties, Set partitions) throws KafkaException {
final Properties consumerConfig = new Properties();
consumerConfig.putAll(properties);
consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
try (KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig)) {
Set topicPartitions = getAllPartitions(consumer, topic);
+ if (!partitions.isEmpty()) {
+ topicPartitions.removeIf(
+ topicPartition -> !partitions.contains(topicPartition.partition()));
+ }
Map endOffsets = consumer.endOffsets(topicPartitions);
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);