[cdc-cli][cdc-composer][cdc-dist] Support submitting job to general Flink environments (#2807)

This closes #2807.
pull/2782/head
Qingsheng Ren 1 year ago committed by GitHub
parent 9ce8450cf2
commit a328f9895a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,8 +16,6 @@
package com.ververica.cdc.cli.utils;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.ConfigOptions;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.composer.flink.FlinkPipelineComposer;
@ -30,12 +28,6 @@ public class FlinkEnvironmentUtils {
private static final String FLINK_CONF_DIR = "conf";
private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
public static final ConfigOption<String> FLINK_REST_ADDRESS =
ConfigOptions.key("rest.address").stringType().noDefaultValue();
public static final ConfigOption<Integer> FLINK_REST_PORT =
ConfigOptions.key("rest.port").intType().defaultValue(8081);
public static Configuration loadFlinkConfiguration(Path flinkHome) throws Exception {
Path flinkConfPath = flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
@ -46,8 +38,8 @@ public class FlinkEnvironmentUtils {
if (useMiniCluster) {
return FlinkPipelineComposer.ofMiniCluster();
}
String host = flinkConfig.get(FLINK_REST_ADDRESS);
Integer port = flinkConfig.get(FLINK_REST_PORT);
return FlinkPipelineComposer.ofRemoteCluster(host, port, additionalJars);
return FlinkPipelineComposer.ofRemoteCluster(
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
}
}

@ -18,7 +18,6 @@ package com.ververica.cdc.cli;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
import com.ververica.cdc.cli.utils.FlinkEnvironmentUtils;
import com.ververica.cdc.composer.PipelineComposer;
import com.ververica.cdc.composer.PipelineExecution;
import com.ververica.cdc.composer.definition.PipelineDef;
@ -71,15 +70,6 @@ class CliFrontendTest {
+ "Please make sure Flink home is properly set. ");
}
@Test
void testFlinkConfigParsing() throws Exception {
CliExecutor executor = createExecutor(pipelineDef(), "--flink-home", flinkHome());
assertThat(executor.getFlinkConfig().get(FlinkEnvironmentUtils.FLINK_REST_ADDRESS))
.isEqualTo("localhost");
assertThat(executor.getFlinkConfig().get(FlinkEnvironmentUtils.FLINK_REST_PORT))
.isEqualTo(8081);
}
@Test
void testGlobalPipelineConfigParsing() throws Exception {
CliExecutor executor =

@ -16,6 +16,7 @@
package com.ververica.cdc.composer.flink;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@ -57,10 +58,26 @@ public class FlinkPipelineComposer implements PipelineComposer {
private final boolean isBlocking;
public static FlinkPipelineComposer ofRemoteCluster(
String host, int port, List<Path> additionalJars) {
String[] jarPaths = additionalJars.stream().map(Path::toString).toArray(String[]::new);
return new FlinkPipelineComposer(
StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarPaths), false);
org.apache.flink.configuration.Configuration flinkConfig, List<Path> additionalJars) {
org.apache.flink.configuration.Configuration effectiveConfiguration =
new org.apache.flink.configuration.Configuration();
// Use "remote" as the default target
effectiveConfiguration.set(DeploymentOptions.TARGET, "remote");
effectiveConfiguration.addAll(flinkConfig);
StreamExecutionEnvironment env = new StreamExecutionEnvironment(effectiveConfiguration);
additionalJars.forEach(
jarPath -> {
try {
FlinkEnvironmentUtils.addJar(env, jarPath.toUri().toURL());
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Unable to convert JAR path \"%s\" to URL when adding JAR to Flink environment",
jarPath),
e);
}
});
return new FlinkPipelineComposer(env, false);
}
public static FlinkPipelineComposer ofMiniCluster() {

@ -17,7 +17,7 @@
# Setup FLINK_HOME
args=("$@")
# Loop through command-line arguments
# Check if FLINK_HOME is set in command-line arguments by "--flink-home"
for ((i=0; i < ${#args[@]}; i++)); do
case "${args[i]}" in
--flink-home)
@ -28,8 +28,20 @@ for ((i=0; i < ${#args[@]}; i++)); do
;;
esac
done
if [[ -z $FLINK_HOME ]]; then
echo "[ERROR] Unable to find FLINK_HOME either in command-line argument \"--flink-home\" or environment variable \"FLINK_HOME\"."
exit 1
fi
# Setup Flink related configurations
# Setting _FLINK_HOME_DETERMINED in order to avoid config.sh to overwrite it
_FLINK_HOME_DETERMINED=1
# FLINK_CONF_DIR is required by config.sh
FLINK_CONF_DIR=$FLINK_HOME/conf
# Use config.sh to setup Flink related configurations
. $FLINK_HOME/bin/config.sh
# Define directories
# Define Flink CDC directories
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
FLINK_CDC_HOME="$SCRIPT_DIR"/..
FLINK_CDC_CONF="$FLINK_CDC_HOME"/conf
@ -46,23 +58,14 @@ done
for jar in "$FLINK_CDC_LIB"/*.jar; do
CLASSPATH=$CLASSPATH:$jar
done
# Add Hadoop classpath, which is defined in config.sh
CLASSPATH=$CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
# Trim classpath
CLASSPATH=${CLASSPATH#:}
# Setup Java by operating system
UNAME=$(uname -s)
if [ "${UNAME:0:6}" == "CYGWIN" ]; then
JAVA_RUN=java
else
if [[ -d "$JAVA_HOME" ]]; then
JAVA_RUN="$JAVA_HOME"/bin/java
else
JAVA_RUN=java
fi
fi
# Setup logging
LOG=$FLINK_CDC_LOG/flink-cdc-cli-$HOSTNAME.log
LOG_SETTINGS=(-Dlog.file="$LOG" -Dlog4j.configuration=file:"$FLINK_CDC_CONF"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CDC_CONF"/log4j-cli.properties)
# JAVA_RUN should have been setup in config.sh
exec "$JAVA_RUN" -classpath "$CLASSPATH" "${LOG_SETTINGS[@]}" com.ververica.cdc.cli.CliFrontend "$@"

Loading…
Cancel
Save