From 0cdfd36c0810e612754e57df4f2d0aa7ed06dafe Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Tue, 28 Nov 2023 20:47:57 +0800 Subject: [PATCH] [3.0][cdc-composer] Introduce pipeline.name config to support custom flink job name (#2768) Co-authored-by: Leonard Xu This closes #2769. --- .../cdc/common/pipeline/PipelineOptions.java | 7 +++++++ .../java/com/ververica/cdc/common/schema/Schema.java | 8 ++++++-- .../ververica/cdc/common/utils/StringUtf8Utils.java | 11 +++-------- .../cdc/composer/flink/FlinkPipelineComposer.java | 3 ++- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java index e009bc940..838e0c159 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/pipeline/PipelineOptions.java @@ -27,6 +27,13 @@ import static com.ververica.cdc.common.configuration.description.TextElement.tex /** Predefined pipeline configuration options. */ @PublicEvolving public class PipelineOptions { + + public static final ConfigOption PIPELINE_NAME = + ConfigOptions.key("pipeline.name") + .stringType() + .defaultValue("Flink CDC Pipeline Job") + .withDescription("The name of the pipeline"); + public static final ConfigOption GLOBAL_PARALLELISM = ConfigOptions.key("pipeline.global.parallelism") .intType() diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java index 3635c86bd..64bc53384 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -172,6 +173,7 @@ public class Schema implements Serializable { for (Column col : columns) { nameToColumns.put(col.getName(), col); } + nameToColumns = Collections.unmodifiableMap(nameToColumns); } } } @@ -210,14 +212,16 @@ public class Schema implements Serializable { public static final class Builder { private List columns; - private List primaryKeys = new ArrayList<>(); - private Map options = new HashMap<>(); + private List primaryKeys; + private final Map options; private @Nullable String comment; // Used to check duplicate columns private final Set columnNames; public Builder() { + this.primaryKeys = new ArrayList<>(); + this.options = new HashMap<>(); this.columns = new ArrayList<>(); this.columnNames = new HashSet<>(); } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java index 522ae53e4..326754f3c 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java @@ -18,7 +18,6 @@ package com.ververica.cdc.common.utils; import com.ververica.cdc.common.annotation.Internal; -import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -112,13 +111,9 @@ public final class StringUtf8Utils { } public static int defaultEncodeUTF8(String str, byte[] bytes) { - try { - byte[] buffer = str.getBytes("UTF-8"); - System.arraycopy(buffer, 0, bytes, 0, buffer.length); - return buffer.length; - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("encodeUTF8 error", e); - } + byte[] buffer = str.getBytes(StandardCharsets.UTF_8); + System.arraycopy(buffer, 0, bytes, 0, buffer.length); + return buffer.length; } public static String decodeUTF8(byte[] input, int offset, int byteLen) { diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java index 44b20f6d7..b277e15d0 100644 --- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java @@ -96,7 +96,8 @@ public class FlinkPipelineComposer implements PipelineComposer { DataSinkTranslator sinkTranslator = new DataSinkTranslator(); sinkTranslator.translate(stream, dataSink, schemaOperatorIDGenerator.generate()); - return new FlinkPipelineExecution(env, "CDC Job", isBlocking); + return new FlinkPipelineExecution( + env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking); } private DataSink createDataSink(SinkDef sinkDef) {