From a16abd5d243fdf045cf1ed1be734b14dcc11458b Mon Sep 17 00:00:00 2001 From: Kunni Date: Thu, 16 Jan 2025 20:23:38 +0800 Subject: [PATCH] [FLINK-36913][pipeline-connector][kafka] Introduce option to define custom mapping from upstream table id to downstream topic name This closes #3805 --- .../connectors/pipeline-connectors/kafka.md | 7 ++ .../connectors/pipeline-connectors/kafka.md | 7 ++ .../connectors/kafka/sink/KafkaDataSink.java | 9 ++- .../kafka/sink/KafkaDataSinkFactory.java | 6 +- .../kafka/sink/KafkaDataSinkOptions.java | 21 ++++++ ...ipelineKafkaRecordSerializationSchema.java | 35 +++++++++- .../kafka/utils/KafkaSinkUtils.java | 52 ++++++++++++++ .../kafka/sink/KafkaDataSinkFactoryTest.java | 3 + .../kafka/sink/KafkaDataSinkITCase.java | 67 +++++++++++++++++-- 9 files changed, 198 insertions(+), 9 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md index 66467b4d5..136eb4d4d 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md @@ -143,6 +143,13 @@ Pipeline 连接器配置项 String Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 + + sink.tableId-to-topic.mapping + optional + (none) + String + 自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`。 + diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md b/docs/content/docs/connectors/pipeline-connectors/kafka.md index 6bb94bc37..619ade45c 100644 --- a/docs/content/docs/connectors/pipeline-connectors/kafka.md +++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md @@ -141,6 +141,13 @@ Pipeline Connector Options String custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. + + sink.tableId-to-topic.mapping + optional + (none) + String + Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java index 51e8b180d..1970a9b82 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java @@ -53,6 +53,8 @@ public class KafkaDataSink implements DataSink { final String customHeaders; + final String tableMapping; + public KafkaDataSink( DeliveryGuarantee deliveryGuarantee, Properties kafkaProperties, @@ -62,7 +64,8 @@ public class KafkaDataSink implements DataSink { SerializationSchema valueSerialization, String topic, boolean addTableToHeaderEnabled, - String customHeaders) { + String customHeaders, + String tableMapping) { this.deliveryGuarantee = deliveryGuarantee; this.kafkaProperties = kafkaProperties; this.partitionStrategy = partitionStrategy; @@ -72,6 +75,7 @@ public class KafkaDataSink implements DataSink { this.topic = topic; this.addTableToHeaderEnabled = addTableToHeaderEnabled; this.customHeaders = customHeaders; + this.tableMapping = tableMapping; } @Override @@ -92,7 +96,8 @@ public class KafkaDataSink implements DataSink { valueSerialization, topic, addTableToHeaderEnabled, - customHeaders)) + customHeaders, + tableMapping)) .build()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java index d89812913..53c821517 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java @@ -41,6 +41,7 @@ import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PA import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT; @@ -92,6 +93,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory { context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER); PartitionStrategy partitionStrategy = context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY); + String tableMapping = context.getFactoryConfiguration().get(SINK_TABLE_ID_TO_TOPIC_MAPPING); return new KafkaDataSink( deliveryGuarantee, kafkaProperties, @@ -101,7 +103,8 @@ public class KafkaDataSinkFactory implements DataSinkFactory { valueSerialization, topic, addTableToHeaderEnabled, - customHeaders); + customHeaders, + tableMapping); } @Override @@ -124,6 +127,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory { options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED); options.add(SINK_CUSTOM_HEADER); options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE); + options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java index ca82f5c80..1b7bbf9bd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.kafka.sink; import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.description.Description; import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType; import org.apache.flink.connector.base.DeliveryGuarantee; @@ -29,6 +30,10 @@ public class KafkaDataSinkOptions { // Prefix for Kafka specific properties. public static final String PROPERTIES_PREFIX = "properties."; + public static final String DELIMITER_TABLE_MAPPINGS = ";"; + + public static final String DELIMITER_SELECTOR_TOPIC = ":"; + public static final ConfigOption DELIVERY_GUARANTEE = key("sink.delivery-guarantee") .enumType(DeliveryGuarantee.class) @@ -79,4 +84,20 @@ public class KafkaDataSinkOptions { .defaultValue("") .withDescription( "custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'."); + + public static final ConfigOption SINK_TABLE_ID_TO_TOPIC_MAPPING = + key("sink.tableId-to-topic.mapping") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by ") + .text(DELIMITER_TABLE_MAPPINGS) + .text( + ", separate upstream tableId selectors and downstream Kafka topic by ") + .text(DELIMITER_SELECTOR_TOPIC) + .text( + ". For example, we can set 'sink.tableId-to-topic.mappingg' like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.") + .build()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java index 85e5e3f19..6a65f836a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/PipelineKafkaRecordSerializationSchema.java @@ -22,6 +22,8 @@ import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.connectors.kafka.utils.KafkaSinkUtils; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; @@ -57,6 +59,13 @@ public class PipelineKafkaRecordSerializationSchema // key value pairs to be put into Kafka Record Header. public final Map customHeaders; + private final String mappingRuleString; + + private Map selectorsToTopicMap; + + // A cache to speed up TableId to Topic mapping. + private Map tableIdToTopicCache; + public static final String NAMESPACE_HEADER_KEY = "namespace"; public static final String SCHEMA_NAME_HEADER_KEY = "schemaName"; @@ -69,7 +78,8 @@ public class PipelineKafkaRecordSerializationSchema SerializationSchema valueSerialization, String unifiedTopic, boolean addTableToHeaderEnabled, - String customHeaderString) { + String customHeaderString, + String mappingRuleString) { this.keySerialization = keySerialization; this.valueSerialization = checkNotNull(valueSerialization); this.unifiedTopic = unifiedTopic; @@ -90,6 +100,7 @@ public class PipelineKafkaRecordSerializationSchema } } partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null; + this.mappingRuleString = mappingRuleString; } @Override @@ -102,7 +113,7 @@ public class PipelineKafkaRecordSerializationSchema // skip sending SchemaChangeEvent. return null; } - String topic = unifiedTopic == null ? changeEvent.tableId().toString() : unifiedTopic; + String topic = inferTopicName(changeEvent.tableId()); RecordHeaders recordHeaders = new RecordHeaders(); if (addTableToHeaderEnabled) { String namespace = @@ -128,10 +139,30 @@ public class PipelineKafkaRecordSerializationSchema topic, partition, null, keySerialized, valueSerialized, recordHeaders); } + private String inferTopicName(TableId tableId) { + return tableIdToTopicCache.computeIfAbsent( + tableId, + (table -> { + if (unifiedTopic != null && !unifiedTopic.isEmpty()) { + return unifiedTopic; + } + if (selectorsToTopicMap != null && !selectorsToTopicMap.isEmpty()) { + for (Map.Entry entry : selectorsToTopicMap.entrySet()) { + if (entry.getKey().isMatch(tableId)) { + return entry.getValue(); + } + } + } + return table.toString(); + })); + } + @Override public void open( SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception { + this.selectorsToTopicMap = KafkaSinkUtils.parseSelectorsToTopicMap(mappingRuleString); + this.tableIdToTopicCache = new HashMap<>(); valueSerialization.open(context); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java new file mode 100644 index 000000000..b013bba9f --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.kafka.utils; + +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_SELECTOR_TOPIC; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_TABLE_MAPPINGS; + +/** Util class for {@link org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink}. */ +public class KafkaSinkUtils { + + /** Parse the mapping text to a map from Selectors to Kafka Topic name. */ + public static Map parseSelectorsToTopicMap(String mappingRuleString) { + // Keep the order. + Map result = new LinkedHashMap<>(); + if (mappingRuleString == null || mappingRuleString.isEmpty()) { + return result; + } + for (String mapping : mappingRuleString.split(DELIMITER_TABLE_MAPPINGS)) { + String[] selectorsAndTopic = mapping.split(DELIMITER_SELECTOR_TOPIC); + Preconditions.checkArgument( + selectorsAndTopic.length == 2, + "Please check your configuration of " + + KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING); + Selectors selectors = + new Selectors.SelectorsBuilder().includeTables(selectorsAndTopic[0]).build(); + result.put(selectors, selectorsAndTopic[1]); + } + return result; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java index f0736dd0b..c473afb30 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactoryTest.java @@ -39,6 +39,9 @@ public class KafkaDataSinkFactoryTest { Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class); Configuration conf = Configuration.fromMap(ImmutableMap.builder().build()); + conf.set( + KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING, + "mydb.mytable1:topic1;mydb.mytable2:topic2"); DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java index ddce0a9e4..bb5d4b880 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java @@ -263,8 +263,7 @@ class KafkaDataSinkITCase extends TestLogger { final List> collectedRecords = drainAllRecordsFromTopic(topic, false, 0); - final long recordsCount = 5; - assertThat(recordsCount).isEqualTo(collectedRecords.size()); + assertThat(collectedRecords).hasSize(5); ObjectMapper mapper = JacksonMapperFactory.createObjectMapper() .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); @@ -325,8 +324,7 @@ class KafkaDataSinkITCase extends TestLogger { final List> collectedRecords = drainAllRecordsFromTopic(topic, false, 0); - final long recordsCount = 5; - assertThat(recordsCount).isEqualTo(collectedRecords.size()); + assertThat(collectedRecords).hasSize(5); for (ConsumerRecord consumerRecord : collectedRecords) { assertThat( consumerRecord @@ -560,6 +558,67 @@ class KafkaDataSinkITCase extends TestLogger { checkProducerLeak(); } + @Test + void testSinkTableMapping() throws Exception { + final StreamExecutionEnvironment env = new LocalStreamEnvironment(); + env.enableCheckpointing(1000L); + env.setRestartStrategy(RestartStrategies.noRestart()); + final DataStream source = env.fromData(createSourceEvents(), new EventTypeInfo()); + Map config = new HashMap<>(); + config.put( + KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING.key(), + "default_namespace.default_schema_copy.\\.*:test_topic_mapping_copy;default_namespace.default_schema.\\.*:test_topic_mapping"); + Properties properties = getKafkaClientConfiguration(); + properties.forEach( + (key, value) -> + config.put( + KafkaDataSinkOptions.PROPERTIES_PREFIX + key.toString(), + value.toString())); + source.sinkTo( + ((FlinkSinkProvider) + (new KafkaDataSinkFactory() + .createDataSink( + new FactoryHelper.DefaultContext( + Configuration.fromMap(config), + Configuration.fromMap(new HashMap<>()), + this.getClass().getClassLoader())) + .getEventSinkProvider())) + .getSink()); + env.execute(); + + final List> collectedRecords = + drainAllRecordsFromTopic("test_topic_mapping", false, 0); + final long recordsCount = 5; + assertThat(recordsCount).isEqualTo(collectedRecords.size()); + ObjectMapper mapper = + JacksonMapperFactory.createObjectMapper() + .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false); + List expected = + Arrays.asList( + mapper.readTree( + String.format( + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), + mapper.readTree( + String.format( + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), + mapper.readTree( + String.format( + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), + mapper.readTree( + String.format( + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName())), + mapper.readTree( + String.format( + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", + table1.getTableName()))); + assertThat(deserializeValues(collectedRecords)).containsAll(expected); + checkProducerLeak(); + } + private List> drainAllRecordsFromTopic( String topic, boolean committed, int... partitionArr) { Properties properties = getKafkaClientConfiguration();