[3.0][cdc-composer] Introduce pipeline.name config to support custom flink job name (#2768)

Co-authored-by: Leonard Xu <leonard@apache.org>
This closes #2769.
pull/2773/head
gongzhongqiang 1 year ago committed by GitHub
parent 57e4793b21
commit 0cdfd36c08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,6 +27,13 @@ import static com.ververica.cdc.common.configuration.description.TextElement.tex
/** Predefined pipeline configuration options. */
@PublicEvolving
public class PipelineOptions {
public static final ConfigOption<String> PIPELINE_NAME =
ConfigOptions.key("pipeline.name")
.stringType()
.defaultValue("Flink CDC Pipeline Job")
.withDescription("The name of the pipeline");
public static final ConfigOption<Integer> GLOBAL_PARALLELISM =
ConfigOptions.key("pipeline.global.parallelism")
.intType()

@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -172,6 +173,7 @@ public class Schema implements Serializable {
for (Column col : columns) {
nameToColumns.put(col.getName(), col);
}
nameToColumns = Collections.unmodifiableMap(nameToColumns);
}
}
}
@ -210,14 +212,16 @@ public class Schema implements Serializable {
public static final class Builder {
private List<Column> columns;
private List<String> primaryKeys = new ArrayList<>();
private Map<String, String> options = new HashMap<>();
private List<String> primaryKeys;
private final Map<String, String> options;
private @Nullable String comment;
// Used to check duplicate columns
private final Set<String> columnNames;
public Builder() {
this.primaryKeys = new ArrayList<>();
this.options = new HashMap<>();
this.columns = new ArrayList<>();
this.columnNames = new HashSet<>();
}

@ -18,7 +18,6 @@ package com.ververica.cdc.common.utils;
import com.ververica.cdc.common.annotation.Internal;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -112,13 +111,9 @@ public final class StringUtf8Utils {
}
public static int defaultEncodeUTF8(String str, byte[] bytes) {
try {
byte[] buffer = str.getBytes("UTF-8");
System.arraycopy(buffer, 0, bytes, 0, buffer.length);
return buffer.length;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("encodeUTF8 error", e);
}
byte[] buffer = str.getBytes(StandardCharsets.UTF_8);
System.arraycopy(buffer, 0, bytes, 0, buffer.length);
return buffer.length;
}
public static String decodeUTF8(byte[] input, int offset, int byteLen) {

@ -96,7 +96,8 @@ public class FlinkPipelineComposer implements PipelineComposer {
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
sinkTranslator.translate(stream, dataSink, schemaOperatorIDGenerator.generate());
return new FlinkPipelineExecution(env, "CDC Job", isBlocking);
return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}
private DataSink createDataSink(SinkDef sinkDef) {

Loading…
Cancel
Save