From 0470fdbbc13a4281083488274e891a74cc8e81a8 Mon Sep 17 00:00:00 2001 From: Kunni Date: Mon, 19 Aug 2024 21:20:01 +0800 Subject: [PATCH] [FLINK-36082][pipeline-connector][kafka] Fix lamda NotSerializableException in KafkaDataSink This closes #3549 --- .../connectors/kafka/sink/KafkaDataSink.java | 35 +----------- .../kafka/sink/KafkaMetaDataApplier.java | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+), 34 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java index 2dfc021b3..51e8b180d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSink.java @@ -19,9 +19,6 @@ package org.apache.flink.cdc.connectors.kafka.sink; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEventType; -import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.sink.DataSink; import org.apache.flink.cdc.common.sink.EventSinkProvider; import org.apache.flink.cdc.common.sink.FlinkSinkProvider; @@ -33,10 +30,7 @@ import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; import org.apache.kafka.clients.producer.ProducerConfig; import java.time.ZoneId; -import java.util.Arrays; import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; /** A {@link DataSink} for "Kafka" connector. */ public class KafkaDataSink implements DataSink { @@ -104,33 +98,6 @@ public class KafkaDataSink implements DataSink { @Override public MetadataApplier getMetadataApplier() { - return new MetadataApplier() { - - private Set enabledEventTypes = - Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); - - @Override - public MetadataApplier setAcceptedSchemaEvolutionTypes( - Set schemaEvolutionTypes) { - enabledEventTypes = schemaEvolutionTypes; - return this; - } - - @Override - public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { - return enabledEventTypes.contains(schemaChangeEventType); - } - - @Override - public Set getSupportedSchemaEvolutionTypes() { - // All schema change events are supported. - return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); - } - - @Override - public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { - // simply do nothing here because Kafka do not maintain the schemas. - } - }; + return new KafkaMetaDataApplier(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java new file mode 100644 index 000000000..839dd9ea9 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaMetaDataApplier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.kafka.sink; + +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; +import org.apache.flink.cdc.common.sink.MetadataApplier; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** Supports {@link KafkaDataSink} to schema evolution. */ +public class KafkaMetaDataApplier implements MetadataApplier { + + private Set enabledEventTypes = + Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); + + @Override + public MetadataApplier setAcceptedSchemaEvolutionTypes( + Set schemaEvolutionTypes) { + enabledEventTypes = schemaEvolutionTypes; + return this; + } + + @Override + public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { + return enabledEventTypes.contains(schemaChangeEventType); + } + + @Override + public Set getSupportedSchemaEvolutionTypes() { + // All schema change events are supported. + return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) { + // simply do nothing here because Kafka do not maintain the schemas. + } +}