[FLINK-35442][cdc-connect][kafka] add key.format and partition.strategy option to make sure the same record sending to the same partition. (#3522)

pull/3515/head^2
Kunni 6 months ago committed by GitHub
parent 4bf5a395a5
commit e0d6d1d1a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -87,6 +87,20 @@ Pipeline 连接器配置项
<td>String</td>
<td>Sink 的名称。 </td>
</tr>
<tr>
<td>partition.strategy</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 `hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`</td>
</tr>
<tr>
<td>key.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json` 默认值为 `json`</td>
</tr>
<tr>
<td>value.format</td>
<td>optional</td>

@ -85,6 +85,20 @@ Pipeline Connector Options
<td>String</td>
<td>The name of the sink.</td>
</tr>
<tr>
<td>partition.strategy</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`. </td>
</tr>
<tr>
<td>key.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`. </td>
</tr>
<tr>
<td>value.format</td>
<td>optional</td>

@ -45,6 +45,14 @@ limitations under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>

@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
@ -39,27 +40,60 @@ import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
/** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
public class TableSchemaInfo {
private final TableId tableId;
private final Schema schema;
private final List<Integer> primaryKeyColumnIndexes;
private final List<RecordData.FieldGetter> fieldGetters;
private final SerializationSchema<RowData> serializationSchema;
public TableSchemaInfo(
Schema schema, SerializationSchema<RowData> serializationSchema, ZoneId zoneId) {
TableId tableId,
Schema schema,
SerializationSchema<RowData> serializationSchema,
ZoneId zoneId) {
this.tableId = tableId;
this.schema = schema;
this.serializationSchema = serializationSchema;
this.fieldGetters = createFieldGetters(schema, zoneId);
primaryKeyColumnIndexes = new ArrayList<>();
for (int keyIndex = 0; keyIndex < schema.primaryKeys().size(); keyIndex++) {
for (int columnIndex = 0; columnIndex < schema.getColumnCount(); columnIndex++) {
if (schema.getColumns()
.get(columnIndex)
.getName()
.equals(schema.primaryKeys().get(keyIndex))) {
primaryKeyColumnIndexes.add(columnIndex);
break;
}
}
}
}
/** convert to {@link RowData}, which will be pass to serializationSchema. */
public RowData getRowDataFromRecordData(RecordData recordData) {
public RowData getRowDataFromRecordData(RecordData recordData, boolean primaryKeyOnly) {
if (primaryKeyOnly) {
GenericRowData genericRowData = new GenericRowData(primaryKeyColumnIndexes.size() + 1);
genericRowData.setField(0, StringData.fromString(tableId.toString()));
for (int i = 0; i < primaryKeyColumnIndexes.size(); i++) {
genericRowData.setField(
i + 1,
fieldGetters
.get(primaryKeyColumnIndexes.get(i))
.getFieldOrNull(recordData));
}
return genericRowData;
} else {
GenericRowData genericRowData = new GenericRowData(recordData.getArity());
for (int i = 0; i < recordData.getArity(); i++) {
genericRowData.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
}
return genericRowData;
}
}
private static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) {
List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size());

@ -127,7 +127,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
}
jsonSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(schema, jsonSerializer, zoneId));
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
return null;
}
@ -153,7 +154,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData((dataChangeEvent.after()))
.getRowDataFromRecordData(
dataChangeEvent.after(), false)
}));
reuseGenericRowData.setField(2, OP_INSERT);
return jsonSerializers
@ -168,7 +170,7 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
(dataChangeEvent.before()))
dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(1, null);
reuseGenericRowData.setField(2, OP_DELETE);
@ -185,7 +187,7 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
(dataChangeEvent.before()))
dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(
1,
@ -193,7 +195,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema<Event>
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData((dataChangeEvent.after()))
.getRowDataFromRecordData(
dataChangeEvent.after(), false)
}));
reuseGenericRowData.setField(2, OP_UPDATE);
return jsonSerializers

@ -126,7 +126,8 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
}
jsonSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(schema, jsonSerializer, zoneId));
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
return null;
}
@ -144,7 +145,7 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
1,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.after()));
.getRowDataFromRecordData(dataChangeEvent.after(), false));
reuseGenericRowData.setField(2, OP_INSERT);
return jsonSerializers
.get(dataChangeEvent.tableId())
@ -155,7 +156,7 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
0,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.before()));
.getRowDataFromRecordData(dataChangeEvent.before(), false));
reuseGenericRowData.setField(1, null);
reuseGenericRowData.setField(2, OP_DELETE);
return jsonSerializers
@ -168,12 +169,12 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema<Even
0,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.before()));
.getRowDataFromRecordData(dataChangeEvent.before(), false));
reuseGenericRowData.setField(
1,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.after()));
.getRowDataFromRecordData(dataChangeEvent.after(), false));
reuseGenericRowData.setField(2, OP_UPDATE);
return jsonSerializers
.get(dataChangeEvent.tableId())

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
/** A {@link SerializationSchema} to convert {@link Event} into byte of csv format. */
public class CsvSerializationSchema implements SerializationSchema<Event> {
private static final long serialVersionUID = 1L;
/**
* A map of {@link TableId} and its {@link SerializationSchema} to serialize Debezium JSON data.
*/
private final Map<TableId, TableSchemaInfo> csvSerializers;
private final ZoneId zoneId;
private InitializationContext context;
public CsvSerializationSchema(ZoneId zoneId) {
this.zoneId = zoneId;
csvSerializers = new HashMap<>();
}
@Override
public void open(InitializationContext context) {
this.context = context;
}
@Override
public byte[] serialize(Event event) {
if (event instanceof SchemaChangeEvent) {
Schema schema;
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
schema = createTableEvent.getSchema();
} else {
schema =
SchemaUtils.applySchemaChangeEvent(
csvSerializers.get(schemaChangeEvent.tableId()).getSchema(),
schemaChangeEvent);
}
CsvRowDataSerializationSchema csvSerializer = buildSerializationForPrimaryKey(schema);
try {
csvSerializer.open(context);
} catch (Exception e) {
throw new RuntimeException(e);
}
csvSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, csvSerializer, zoneId));
return null;
}
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
RecordData recordData =
dataChangeEvent.op().equals(OperationType.DELETE)
? dataChangeEvent.before()
: dataChangeEvent.after();
TableSchemaInfo tableSchemaInfo = csvSerializers.get(dataChangeEvent.tableId());
return tableSchemaInfo
.getSerializationSchema()
.serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
}
private CsvRowDataSerializationSchema buildSerializationForPrimaryKey(Schema schema) {
DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
for (int i = 0; i < schema.primaryKeys().size(); i++) {
Column column = schema.getColumn(schema.primaryKeys().get(i)).get();
fields[i + 1] = DataTypes.FIELD(column.getName(), column.getType());
}
// the row should never be null
DataType dataType = DataTypes.ROW(fields).notNull();
LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
return new CsvRowDataSerializationSchema.Builder((RowType) rowType).build();
}
}

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
/** A {@link SerializationSchema} to convert {@link Event} into byte of json format. */
public class JsonSerializationSchema implements SerializationSchema<Event> {
private static final long serialVersionUID = 1L;
/**
* A map of {@link TableId} and its {@link SerializationSchema} to serialize Debezium JSON data.
*/
private final Map<TableId, TableSchemaInfo> jsonSerializers;
private final TimestampFormat timestampFormat;
private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
private final String mapNullKeyLiteral;
private final boolean encodeDecimalAsPlainNumber;
private final ZoneId zoneId;
private InitializationContext context;
public JsonSerializationSchema(
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
boolean encodeDecimalAsPlainNumber) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
}
@Override
public void open(InitializationContext context) {
this.context = context;
}
@Override
public byte[] serialize(Event event) {
if (event instanceof SchemaChangeEvent) {
Schema schema;
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
schema = createTableEvent.getSchema();
} else {
schema =
SchemaUtils.applySchemaChangeEvent(
jsonSerializers.get(schemaChangeEvent.tableId()).getSchema(),
schemaChangeEvent);
}
JsonRowDataSerializationSchema jsonSerializer = buildSerializationForPrimaryKey(schema);
try {
jsonSerializer.open(context);
} catch (Exception e) {
throw new RuntimeException(e);
}
jsonSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
return null;
}
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
RecordData recordData =
dataChangeEvent.op().equals(OperationType.DELETE)
? dataChangeEvent.before()
: dataChangeEvent.after();
TableSchemaInfo tableSchemaInfo = jsonSerializers.get(dataChangeEvent.tableId());
return tableSchemaInfo
.getSerializationSchema()
.serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
}
private JsonRowDataSerializationSchema buildSerializationForPrimaryKey(Schema schema) {
DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
for (int i = 0; i < schema.primaryKeys().size(); i++) {
Column column = schema.getColumn(schema.primaryKeys().get(i)).get();
fields[i + 1] = DataTypes.FIELD(column.getName(), column.getType());
}
// the row should never be null
DataType dataType = DataTypes.ROW(fields).notNull();
LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
return new JsonRowDataSerializationSchema(
(RowType) rowType,
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
}
}

@ -29,7 +29,6 @@ import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -46,10 +45,12 @@ public class KafkaDataSink implements DataSink {
final DeliveryGuarantee deliveryGuarantee;
final FlinkKafkaPartitioner<Event> partitioner;
private final PartitionStrategy partitionStrategy;
final ZoneId zoneId;
final SerializationSchema<Event> keySerialization;
final SerializationSchema<Event> valueSerialization;
final String topic;
@ -61,16 +62,18 @@ public class KafkaDataSink implements DataSink {
public KafkaDataSink(
DeliveryGuarantee deliveryGuarantee,
Properties kafkaProperties,
FlinkKafkaPartitioner<Event> partitioner,
PartitionStrategy partitionStrategy,
ZoneId zoneId,
SerializationSchema<Event> keySerialization,
SerializationSchema<Event> valueSerialization,
String topic,
boolean addTableToHeaderEnabled,
String customHeaders) {
this.deliveryGuarantee = deliveryGuarantee;
this.kafkaProperties = kafkaProperties;
this.partitioner = partitioner;
this.partitionStrategy = partitionStrategy;
this.zoneId = zoneId;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
this.topic = topic;
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
@ -90,7 +93,8 @@ public class KafkaDataSink implements DataSink {
.setKafkaProducerConfig(kafkaProperties)
.setRecordSerializer(
new PipelineKafkaRecordSerializationSchema(
partitioner,
partitionStrategy,
keySerialization,
valueSerialization,
topic,
addTableToHeaderEnabled,

@ -28,7 +28,6 @@ import org.apache.flink.cdc.connectors.kafka.json.ChangeLogJsonFormatFactory;
import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import java.time.ZoneId;
import java.util.HashSet;
@ -37,6 +36,8 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.KEY_FORMAT;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PARTITION_STRATEGY;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
@ -65,6 +66,9 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
}
KeyFormat keyFormat = context.getFactoryConfiguration().get(KEY_FORMAT);
SerializationSchema<Event> keySerialization =
KeySerializationFactory.createSerializationSchema(configuration, keyFormat, zoneId);
JsonSerializationType jsonSerializationType =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT);
SerializationSchema<Event> valueSerialization =
@ -86,11 +90,14 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
.get(KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED);
String customHeaders =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
PartitionStrategy partitionStrategy =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
return new KafkaDataSink(
deliveryGuarantee,
kafkaProperties,
new FlinkFixedPartitioner<>(),
partitionStrategy,
zoneId,
keySerialization,
valueSerialization,
topic,
addTableToHeaderEnabled,
@ -110,7 +117,9 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(KEY_FORMAT);
options.add(VALUE_FORMAT);
options.add(PARTITION_STRATEGY);
options.add(TOPIC);
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
options.add(SINK_CUSTOM_HEADER);

@ -35,6 +35,22 @@ public class KafkaDataSinkOptions {
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");
public static final ConfigOption<PartitionStrategy> PARTITION_STRATEGY =
key("partition.strategy")
.enumType(PartitionStrategy.class)
.defaultValue(PartitionStrategy.ALL_TO_ZERO)
.withDescription(
"Defines the strategy for sending record to kafka topic, "
+ "available options are `all-to-zero` and `hash-by-key`, default option is `all-to-zero`.");
public static final ConfigOption<KeyFormat> KEY_FORMAT =
key("key.format")
.enumType(KeyFormat.class)
.defaultValue(KeyFormat.JSON)
.withDescription(
"Defines the format identifier for encoding key data, "
+ "available options are `csv` and `json`, default option is `json`.");
public static final ConfigOption<JsonSerializationType> VALUE_FORMAT =
key("value.format")
.enumType(JsonSerializationType.class)

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
/** Enum class for building {@link SerializationSchema} for {@link ProducerRecord}. */
public enum KeyFormat {
JSON("json"),
CSV("csv");
private final String value;
KeyFormat(String value) {
this.value = value;
}
@Override
public String toString() {
return value;
}
}

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonFormatOptionsUtil;
import java.time.ZoneId;
import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
/**
* Format factory for providing configured instances of {@link SerializationSchema} to convert
* {@link Event} to byte.
*/
public class KeySerializationFactory {
/**
* Creates a configured instance of {@link SerializationSchema} to convert {@link Event} to
* byte.
*/
public static SerializationSchema<Event> createSerializationSchema(
ReadableConfig formatOptions, KeyFormat keyFormat, ZoneId zoneId) {
switch (keyFormat) {
case JSON:
{
TimestampFormat timestampFormat =
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
String mapNullKeyLiteral = formatOptions.get(JSON_MAP_NULL_KEY_LITERAL);
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
return new JsonSerializationSchema(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber);
}
case CSV:
{
return new CsvSerializationSchema(zoneId);
}
default:
{
throw new IllegalArgumentException("UnSupport key format of " + keyFormat);
}
}
}
}

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.kafka.clients.producer.ProducerRecord;
/** Partition Strategy for sending {@link ProducerRecord} to kafka partition. */
public enum PartitionStrategy {
/** All {@link ProducerRecord} will be sent to partition 0. */
ALL_TO_ZERO("all-to-zero"),
/** {@link ProducerRecord} will be sent to specific partition by primary key. */
HASH_BY_KEY("hash-by-key");
private final String value;
PartitionStrategy(String value) {
this.value = value;
}
@Override
public String toString() {
return value;
}
}

@ -23,14 +23,11 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
@ -46,7 +43,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class PipelineKafkaRecordSerializationSchema
implements KafkaRecordSerializationSchema<Event> {
private final FlinkKafkaPartitioner<Event> partitioner;
private final Integer partition;
private final SerializationSchema<Event> keySerialization;
private final SerializationSchema<Event> valueSerialization;
private final String unifiedTopic;
@ -63,12 +64,13 @@ public class PipelineKafkaRecordSerializationSchema
public static final String TABLE_NAME_HEADER_KEY = "tableName";
PipelineKafkaRecordSerializationSchema(
@Nullable FlinkKafkaPartitioner<Event> partitioner,
PartitionStrategy partitionStrategy,
SerializationSchema<Event> keySerialization,
SerializationSchema<Event> valueSerialization,
String unifiedTopic,
boolean addTableToHeaderEnabled,
String customHeaderString) {
this.partitioner = partitioner;
this.keySerialization = keySerialization;
this.valueSerialization = checkNotNull(valueSerialization);
this.unifiedTopic = unifiedTopic;
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
@ -87,12 +89,14 @@ public class PipelineKafkaRecordSerializationSchema
}
}
}
partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
Event event, KafkaSinkContext context, Long timestamp) {
ChangeEvent changeEvent = (ChangeEvent) event;
final byte[] keySerialized = keySerialization.serialize(event);
final byte[] valueSerialized = valueSerialization.serialize(event);
if (event instanceof SchemaChangeEvent) {
// skip sending SchemaChangeEvent.
@ -121,37 +125,13 @@ public class PipelineKafkaRecordSerializationSchema
}
}
return new ProducerRecord<>(
topic,
extractPartition(
changeEvent, valueSerialized, context.getPartitionsForTopic(topic)),
null,
null,
valueSerialized,
recordHeaders);
topic, partition, null, keySerialized, valueSerialized, recordHeaders);
}
@Override
public void open(
SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
throws Exception {
if (partitioner != null) {
partitioner.open(
sinkContext.getParallelInstanceId(),
sinkContext.getNumberOfParallelInstances());
}
valueSerialization.open(context);
}
private Integer extractPartition(
ChangeEvent changeEvent, byte[] valueSerialized, int[] partitions) {
if (partitioner != null) {
return partitioner.partition(
changeEvent,
null,
valueSerialized,
changeEvent.tableId().toString(),
partitions);
}
return null;
}
}

@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
@ -95,7 +96,9 @@ public class TableSchemaInfoTest {
"null_string", org.apache.flink.cdc.common.types.DataTypes.STRING())
.primaryKey("col1")
.build();
TableSchemaInfo tableSchemaInfo = new TableSchemaInfo(schema, null, ZoneId.of("UTC+8"));
TableSchemaInfo tableSchemaInfo =
new TableSchemaInfo(
TableId.parse("testDatabase.testTable"), schema, null, ZoneId.of("UTC+8"));
Object[] testData =
new Object[] {
BinaryStringData.fromString("pk"),
@ -159,6 +162,6 @@ public class TableSchemaInfoTest {
org.apache.flink.table.data.TimestampData.fromInstant(
Instant.parse("2023-01-01T08:00:00.000Z")),
null),
tableSchemaInfo.getRowDataFromRecordData(recordData));
tableSchemaInfo.getRowDataFromRecordData(recordData, false));
}
}

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.kafka.json.MockInitializationContext;
import org.apache.flink.cdc.connectors.kafka.sink.KeyFormat;
import org.apache.flink.cdc.connectors.kafka.sink.KeySerializationFactory;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.ZoneId;
/** Tests for {@link CsvSerializationSchema}. */
public class CsvSerializationSchemaTest {
public static final TableId TABLE_1 =
TableId.tableId("default_namespace", "default_schema", "table1");
@Test
public void testSerialize() throws Exception {
ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
SerializationSchema<Event> serializationSchema =
KeySerializationFactory.createSerializationSchema(
new Configuration(), KeyFormat.CSV, ZoneId.systemDefault());
serializationSchema.open(new MockInitializationContext());
// create table
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
Assertions.assertNull(serializationSchema.serialize(createTableEvent));
// insert
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
String expected = "\"default_namespace.default_schema.table1\",1";
String actual = new String(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
expected = "\"default_namespace.default_schema.table1\",2";
actual = new String(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);
DataChangeEvent deleteEvent =
DataChangeEvent.deleteEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
expected = "\"default_namespace.default_schema.table1\",2";
actual = new String(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);
DataChangeEvent updateEvent =
DataChangeEvent.updateEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("x")
}));
expected = "\"default_namespace.default_schema.table1\",1";
actual = new String(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
}

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.kafka.json.MockInitializationContext;
import org.apache.flink.cdc.connectors.kafka.sink.KeyFormat;
import org.apache.flink.cdc.connectors.kafka.sink.KeySerializationFactory;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.ZoneId;
/** Tests for {@link JsonSerializationSchema}. */
public class JsonSerializationSchemaTest {
public static final TableId TABLE_1 =
TableId.tableId("default_namespace", "default_schema", "table1");
@Test
public void testSerialize() throws Exception {
ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
SerializationSchema<Event> serializationSchema =
KeySerializationFactory.createSerializationSchema(
new Configuration(), KeyFormat.JSON, ZoneId.systemDefault());
serializationSchema.open(new MockInitializationContext());
// create table
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
Assertions.assertNull(serializationSchema.serialize(createTableEvent));
// insert
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
JsonNode expected =
mapper.readTree(
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"1\"}");
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
expected =
mapper.readTree(
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"2\"}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);
DataChangeEvent deleteEvent =
DataChangeEvent.deleteEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
expected =
mapper.readTree(
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"2\"}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);
DataChangeEvent updateEvent =
DataChangeEvent.updateEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("x")
}));
expected =
mapper.readTree(
"{\"TableId\":\"default_namespace.default_schema.table1\",\"col1\":\"1\"}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
}

@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@ -67,9 +68,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -259,7 +262,7 @@ class KafkaDataSinkITCase extends TestLogger {
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, false);
drainAllRecordsFromTopic(topic, false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
ObjectMapper mapper =
@ -321,7 +324,7 @@ class KafkaDataSinkITCase extends TestLogger {
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, false);
drainAllRecordsFromTopic(topic, false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) {
@ -363,6 +366,104 @@ class KafkaDataSinkITCase extends TestLogger {
checkProducerLeak();
}
@Test
void testHashByKeyPartitionStrategyUsingJson() throws Exception {
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
env.enableCheckpointing(1000L);
env.setRestartStrategy(RestartStrategies.noRestart());
final DataStream<Event> source =
env.fromCollection(createSourceEvents(), new EventTypeInfo());
Map<String, String> config = new HashMap<>();
Properties properties = getKafkaClientConfiguration();
properties.forEach(
(key, value) ->
config.put(
KafkaDataSinkOptions.PROPERTIES_PREFIX + key.toString(),
value.toString()));
config.put(KafkaDataSinkOptions.KEY_FORMAT.key(), KeyFormat.JSON.toString());
config.put(
KafkaDataSinkOptions.VALUE_FORMAT.key(),
JsonSerializationType.CANAL_JSON.toString());
source.sinkTo(
((FlinkSinkProvider)
(new KafkaDataSinkFactory()
.createDataSink(
new FactoryHelper.DefaultContext(
Configuration.fromMap(config),
Configuration.fromMap(new HashMap<>()),
this.getClass().getClassLoader()))
.getEventSinkProvider()))
.getSink());
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, false);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) {
assertThat(
consumerRecord
.headers()
.headers(
PipelineKafkaRecordSerializationSchema
.TABLE_NAME_HEADER_KEY)
.iterator())
.isExhausted();
}
ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
List<Tuple2<JsonNode, JsonNode>> expected =
Arrays.asList(
Tuple2.of(
mapper.readTree(
String.format(
"{\"TableId\":\"%s\",\"col1\":\"1\"}",
table1.toString())),
mapper.readTree(
String.format(
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
table1.getTableName()))),
Tuple2.of(
mapper.readTree(
String.format(
"{\"TableId\":\"%s\",\"col1\":\"2\"}",
table1.toString())),
mapper.readTree(
String.format(
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
table1.getTableName()))),
Tuple2.of(
mapper.readTree(
String.format(
"{\"TableId\":\"%s\",\"col1\":\"3\"}",
table1.toString())),
mapper.readTree(
String.format(
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
table1.getTableName()))),
Tuple2.of(
mapper.readTree(
String.format(
"{\"TableId\":\"%s\",\"col1\":\"1\"}",
table1.toString())),
mapper.readTree(
String.format(
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]})",
table1.getTableName()))),
Tuple2.of(
mapper.readTree(
String.format(
"{\"TableId\":\"%s\",\"col1\":\"2\"}",
table1.toString())),
mapper.readTree(
String.format(
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName()))));
assertThat(deserializeKeyValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
@Test
void testTopicAndHeaderOption() throws Exception {
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
@ -392,7 +493,7 @@ class KafkaDataSinkITCase extends TestLogger {
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic("test_topic", false);
drainAllRecordsFromTopic("test_topic", false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) {
@ -460,9 +561,13 @@ class KafkaDataSinkITCase extends TestLogger {
}
private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
String topic, boolean committed) {
String topic, boolean committed, int... partitionArr) {
Properties properties = getKafkaClientConfiguration();
return KafkaUtil.drainAllRecordsFromTopic(topic, properties, committed);
Set<Integer> partitions = new HashSet<>();
for (int partition : partitionArr) {
partitions.add(partition);
}
return KafkaUtil.drainAllRecordsFromTopic(topic, properties, committed, partitions);
}
private void checkProducerLeak() throws InterruptedException {
@ -486,6 +591,18 @@ class KafkaDataSinkITCase extends TestLogger {
+ leaks.stream().map(this::format).collect(Collectors.joining("\n\n")));
}
private static List<Tuple2<JsonNode, JsonNode>> deserializeKeyValues(
List<ConsumerRecord<byte[], byte[]>> records) throws IOException {
ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
List<Tuple2<JsonNode, JsonNode>> result = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
result.add(Tuple2.of(mapper.readTree(record.key()), mapper.readTree(record.value())));
}
return result;
}
private static List<JsonNode> deserializeValues(List<ConsumerRecord<byte[], byte[]>> records)
throws IOException {
ObjectMapper mapper =

@ -115,13 +115,14 @@ public class KafkaUtil {
* @throws KafkaException
*/
public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
String topic, Properties properties, boolean committed) throws KafkaException {
String topic, Properties properties, boolean committed, Set<Integer> partitions)
throws KafkaException {
final Properties consumerConfig = new Properties();
consumerConfig.putAll(properties);
consumerConfig.put(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
committed ? "read_committed" : "read_uncommitted");
return drainAllRecordsFromTopic(topic, consumerConfig);
return drainAllRecordsFromTopic(topic, consumerConfig, partitions);
}
/**
@ -137,13 +138,17 @@ public class KafkaUtil {
* @throws KafkaException
*/
public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
String topic, Properties properties) throws KafkaException {
String topic, Properties properties, Set<Integer> partitions) throws KafkaException {
final Properties consumerConfig = new Properties();
consumerConfig.putAll(properties);
consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
Set<TopicPartition> topicPartitions = getAllPartitions(consumer, topic);
if (!partitions.isEmpty()) {
topicPartitions.removeIf(
topicPartition -> !partitions.contains(topicPartition.partition()));
}
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);

Loading…
Cancel
Save