[FLINK-36913][pipeline-connector][kafka] Introduce option to define custom mapping from upstream table id to downstream topic name

This closes #3805
pull/3658/head
Kunni 2 weeks ago committed by GitHub
parent 77785c1c08
commit a16abd5d24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -143,6 +143,13 @@ Pipeline 连接器配置项
<td>String</td>
<td>Kafka 记录自定义的 Header。每个 Header 使用 ','分割, 键值使用 ':' 分割。举例来说,可以使用这种方式 'key1:value1,key2:value2'。 </td>
</tr>
<tr>
<td>sink.tableId-to-topic.mapping</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 `mydb.mytable1:topic1;mydb.mytable2:topic2`</td>
</tr>
</tbody>
</table>
</div>

@ -141,6 +141,13 @@ Pipeline Connector Options
<td>String</td>
<td>custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'. </td>
</tr>
<tr>
<td>sink.tableId-to-topic.mapping</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by `;`, separate upstream tableId and downstream Kafka topic by `:`, For example, we can set `sink.tableId-to-topic.mapping` like `mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
</tr>
</tbody>
</table>
</div>

@ -53,6 +53,8 @@ public class KafkaDataSink implements DataSink {
final String customHeaders;
final String tableMapping;
public KafkaDataSink(
DeliveryGuarantee deliveryGuarantee,
Properties kafkaProperties,
@ -62,7 +64,8 @@ public class KafkaDataSink implements DataSink {
SerializationSchema<Event> valueSerialization,
String topic,
boolean addTableToHeaderEnabled,
String customHeaders) {
String customHeaders,
String tableMapping) {
this.deliveryGuarantee = deliveryGuarantee;
this.kafkaProperties = kafkaProperties;
this.partitionStrategy = partitionStrategy;
@ -72,6 +75,7 @@ public class KafkaDataSink implements DataSink {
this.topic = topic;
this.addTableToHeaderEnabled = addTableToHeaderEnabled;
this.customHeaders = customHeaders;
this.tableMapping = tableMapping;
}
@Override
@ -92,7 +96,8 @@ public class KafkaDataSink implements DataSink {
valueSerialization,
topic,
addTableToHeaderEnabled,
customHeaders))
customHeaders,
tableMapping))
.build());
}

@ -41,6 +41,7 @@ import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PA
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
@ -92,6 +93,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
context.getFactoryConfiguration().get(KafkaDataSinkOptions.SINK_CUSTOM_HEADER);
PartitionStrategy partitionStrategy =
context.getFactoryConfiguration().get(KafkaDataSinkOptions.PARTITION_STRATEGY);
String tableMapping = context.getFactoryConfiguration().get(SINK_TABLE_ID_TO_TOPIC_MAPPING);
return new KafkaDataSink(
deliveryGuarantee,
kafkaProperties,
@ -101,7 +103,8 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
valueSerialization,
topic,
addTableToHeaderEnabled,
customHeaders);
customHeaders,
tableMapping);
}
@Override
@ -124,6 +127,7 @@ public class KafkaDataSinkFactory implements DataSinkFactory {
options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED);
options.add(SINK_CUSTOM_HEADER);
options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
return options;
}
}

@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.kafka.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.description.Description;
import org.apache.flink.cdc.connectors.kafka.json.JsonSerializationType;
import org.apache.flink.connector.base.DeliveryGuarantee;
@ -29,6 +30,10 @@ public class KafkaDataSinkOptions {
// Prefix for Kafka specific properties.
public static final String PROPERTIES_PREFIX = "properties.";
public static final String DELIMITER_TABLE_MAPPINGS = ";";
public static final String DELIMITER_SELECTOR_TOPIC = ":";
public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
key("sink.delivery-guarantee")
.enumType(DeliveryGuarantee.class)
@ -79,4 +84,20 @@ public class KafkaDataSinkOptions {
.defaultValue("")
.withDescription(
"custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'.");
public static final ConfigOption<String> SINK_TABLE_ID_TO_TOPIC_MAPPING =
key("sink.tableId-to-topic.mapping")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Custom table mappings for each table from upstream tableId to downstream Kafka topic. Each mapping is separated by ")
.text(DELIMITER_TABLE_MAPPINGS)
.text(
", separate upstream tableId selectors and downstream Kafka topic by ")
.text(DELIMITER_SELECTOR_TOPIC)
.text(
". For example, we can set 'sink.tableId-to-topic.mappingg' like 'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
.build());
}

@ -22,6 +22,8 @@ import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.connectors.kafka.utils.KafkaSinkUtils;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -57,6 +59,13 @@ public class PipelineKafkaRecordSerializationSchema
// key value pairs to be put into Kafka Record Header.
public final Map<String, String> customHeaders;
private final String mappingRuleString;
private Map<Selectors, String> selectorsToTopicMap;
// A cache to speed up TableId to Topic mapping.
private Map<TableId, String> tableIdToTopicCache;
public static final String NAMESPACE_HEADER_KEY = "namespace";
public static final String SCHEMA_NAME_HEADER_KEY = "schemaName";
@ -69,7 +78,8 @@ public class PipelineKafkaRecordSerializationSchema
SerializationSchema<Event> valueSerialization,
String unifiedTopic,
boolean addTableToHeaderEnabled,
String customHeaderString) {
String customHeaderString,
String mappingRuleString) {
this.keySerialization = keySerialization;
this.valueSerialization = checkNotNull(valueSerialization);
this.unifiedTopic = unifiedTopic;
@ -90,6 +100,7 @@ public class PipelineKafkaRecordSerializationSchema
}
}
partition = partitionStrategy.equals(PartitionStrategy.ALL_TO_ZERO) ? 0 : null;
this.mappingRuleString = mappingRuleString;
}
@Override
@ -102,7 +113,7 @@ public class PipelineKafkaRecordSerializationSchema
// skip sending SchemaChangeEvent.
return null;
}
String topic = unifiedTopic == null ? changeEvent.tableId().toString() : unifiedTopic;
String topic = inferTopicName(changeEvent.tableId());
RecordHeaders recordHeaders = new RecordHeaders();
if (addTableToHeaderEnabled) {
String namespace =
@ -128,10 +139,30 @@ public class PipelineKafkaRecordSerializationSchema
topic, partition, null, keySerialized, valueSerialized, recordHeaders);
}
private String inferTopicName(TableId tableId) {
return tableIdToTopicCache.computeIfAbsent(
tableId,
(table -> {
if (unifiedTopic != null && !unifiedTopic.isEmpty()) {
return unifiedTopic;
}
if (selectorsToTopicMap != null && !selectorsToTopicMap.isEmpty()) {
for (Map.Entry<Selectors, String> entry : selectorsToTopicMap.entrySet()) {
if (entry.getKey().isMatch(tableId)) {
return entry.getValue();
}
}
}
return table.toString();
}));
}
@Override
public void open(
SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
throws Exception {
this.selectorsToTopicMap = KafkaSinkUtils.parseSelectorsToTopicMap(mappingRuleString);
this.tableIdToTopicCache = new HashMap<>();
valueSerialization.open(context);
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.kafka.utils;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions;
import java.util.LinkedHashMap;
import java.util.Map;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_SELECTOR_TOPIC;
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DELIMITER_TABLE_MAPPINGS;
/** Util class for {@link org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSink}. */
public class KafkaSinkUtils {
/** Parse the mapping text to a map from Selectors to Kafka Topic name. */
public static Map<Selectors, String> parseSelectorsToTopicMap(String mappingRuleString) {
// Keep the order.
Map<Selectors, String> result = new LinkedHashMap<>();
if (mappingRuleString == null || mappingRuleString.isEmpty()) {
return result;
}
for (String mapping : mappingRuleString.split(DELIMITER_TABLE_MAPPINGS)) {
String[] selectorsAndTopic = mapping.split(DELIMITER_SELECTOR_TOPIC);
Preconditions.checkArgument(
selectorsAndTopic.length == 2,
"Please check your configuration of "
+ KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING);
Selectors selectors =
new Selectors.SelectorsBuilder().includeTables(selectorsAndTopic[0]).build();
result.put(selectors, selectorsAndTopic[1]);
}
return result;
}
}

@ -39,6 +39,9 @@ public class KafkaDataSinkFactoryTest {
Assertions.assertThat(sinkFactory).isInstanceOf(KafkaDataSinkFactory.class);
Configuration conf = Configuration.fromMap(ImmutableMap.<String, String>builder().build());
conf.set(
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING,
"mydb.mytable1:topic1;mydb.mytable2:topic2");
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(

@ -263,8 +263,7 @@ class KafkaDataSinkITCase extends TestLogger {
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
assertThat(collectedRecords).hasSize(5);
ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
@ -325,8 +324,7 @@ class KafkaDataSinkITCase extends TestLogger {
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
assertThat(collectedRecords).hasSize(5);
for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) {
assertThat(
consumerRecord
@ -560,6 +558,67 @@ class KafkaDataSinkITCase extends TestLogger {
checkProducerLeak();
}
@Test
void testSinkTableMapping() throws Exception {
final StreamExecutionEnvironment env = new LocalStreamEnvironment();
env.enableCheckpointing(1000L);
env.setRestartStrategy(RestartStrategies.noRestart());
final DataStream<Event> source = env.fromData(createSourceEvents(), new EventTypeInfo());
Map<String, String> config = new HashMap<>();
config.put(
KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING.key(),
"default_namespace.default_schema_copy.\\.*:test_topic_mapping_copy;default_namespace.default_schema.\\.*:test_topic_mapping");
Properties properties = getKafkaClientConfiguration();
properties.forEach(
(key, value) ->
config.put(
KafkaDataSinkOptions.PROPERTIES_PREFIX + key.toString(),
value.toString()));
source.sinkTo(
((FlinkSinkProvider)
(new KafkaDataSinkFactory()
.createDataSink(
new FactoryHelper.DefaultContext(
Configuration.fromMap(config),
Configuration.fromMap(new HashMap<>()),
this.getClass().getClassLoader()))
.getEventSinkProvider()))
.getSink());
env.execute();
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic("test_topic_mapping", false, 0);
final long recordsCount = 5;
assertThat(recordsCount).isEqualTo(collectedRecords.size());
ObjectMapper mapper =
JacksonMapperFactory.createObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
String topic, boolean committed, int... partitionArr) {
Properties properties = getKafkaClientConfiguration();

Loading…
Cancel
Save