[FLINK-34853] Submit CDC Job To Flink K8S Native Application Mode (#3093)

pull/3523/head
ConradJam 6 months ago committed by GitHub
parent 81d916fc73
commit d6b687b61d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,35 @@
#/*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */
FROM flink
ARG FLINK_CDC_VERSION=3.2-SNAPSHOT
ARG PIPELINE_DEFINITION_FILE
RUN mkdir -p /opt/flink-cdc
RUN mkdir -p /opt/flink/usrlib
ENV FLINK_CDC_HOME /opt/flink-cdc
COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/
RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \
mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \
mv /opt/flink-cdc/lib/flink-cdc-dist-${FLINK_CDC_VERSION}.jar /opt/flink-cdc/lib/flink-cdc-dist.jar && \
rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz
# copy jars to cdc libs
COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar
COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar
# copy flink cdc pipeline conf file, Here is an example. Users can replace it according to their needs.
COPY $PIPELINE_DEFINITION_FILE $FLINK_CDC_HOME/conf

@ -28,7 +28,7 @@ limitations under the License.
<artifactId>flink-cdc-cli</artifactId>
<properties>
<commons-cli.version>1.6.0</commons-cli.version>
<commons-cli.version>1.7.0</commons-cli.version>
<snakeyaml.version>2.6</snakeyaml.version>
</properties>
@ -55,6 +55,14 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

@ -19,14 +19,19 @@ package org.apache.flink.cdc.cli;
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.flink.deployment.ComposeDeploymentFactory;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.commons.cli.CommandLine;
import java.nio.file.Path;
import java.util.List;
@ -39,17 +44,21 @@ public class CliExecutor {
private final boolean useMiniCluster;
private final List<Path> additionalJars;
private final CommandLine commandLine;
private PipelineComposer composer = null;
private final SavepointRestoreSettings savepointSettings;
public CliExecutor(
CommandLine commandLine,
Path pipelineDefPath,
Configuration flinkConfig,
Configuration globalPipelineConfig,
boolean useMiniCluster,
List<Path> additionalJars,
SavepointRestoreSettings savepointSettings) {
this.commandLine = commandLine;
this.pipelineDefPath = pipelineDefPath;
this.flinkConfig = flinkConfig;
this.globalPipelineConfig = globalPipelineConfig;
@ -59,22 +68,31 @@ public class CliExecutor {
}
public PipelineExecution.ExecutionInfo run() throws Exception {
// Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute the pipeline
return execution.execute();
// Create Submit Executor to deployment flink cdc job Or Run Flink CDC Job
boolean isDeploymentMode = ConfigurationUtils.isDeploymentMode(commandLine);
if (isDeploymentMode) {
ComposeDeploymentFactory composeDeploymentFactory = new ComposeDeploymentFactory();
PipelineDeploymentExecutor composeExecutor =
composeDeploymentFactory.getFlinkComposeExecutor(commandLine);
return composeExecutor.deploy(
commandLine,
org.apache.flink.configuration.Configuration.fromMap(flinkConfig.toMap()),
additionalJars);
} else {
// Run CDC Job And Parse pipeline definition file
PipelineDefinitionParser pipelineDefinitionParser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
pipelineDefinitionParser.parse(pipelineDefPath, globalPipelineConfig);
// Create composer
PipelineComposer composer = getComposer();
// Compose pipeline
PipelineExecution execution = composer.compose(pipelineDef);
// Execute or submit the pipeline
return execution.execute();
}
}
private PipelineComposer getComposer() {
private PipelineComposer getComposer() throws Exception {
if (composer == null) {
return FlinkEnvironmentUtils.createComposer(
useMiniCluster, flinkConfig, additionalJars, savepointSettings);
@ -102,6 +120,11 @@ public class CliExecutor {
return additionalJars;
}
@VisibleForTesting
public String getDeploymentTarget() {
return commandLine.getOptionValue("target");
}
public SavepointRestoreSettings getSavepointSettings() {
return savepointSettings;
}

@ -34,8 +34,6 @@ import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@ -83,13 +81,9 @@ public class CliFrontend {
"Missing pipeline definition file path in arguments. ");
}
// Take the first unparsed argument as the pipeline definition file
Path pipelineDefPath = Paths.get(unparsedArgs.get(0));
if (!Files.exists(pipelineDefPath)) {
throw new FileNotFoundException(
String.format("Cannot find pipeline definition file \"%s\"", pipelineDefPath));
}
// Take the first unparsed argument as the pipeline definition file
LOG.info("Real Path pipelineDefPath {}", pipelineDefPath);
// Global pipeline configuration
Configuration globalPipelineConfig = getGlobalConfig(commandLine);
@ -111,6 +105,7 @@ public class CliFrontend {
// Build executor
return new CliExecutor(
commandLine,
pipelineDefPath,
flinkConfig,
globalPipelineConfig,

@ -46,6 +46,16 @@ public class CliFrontendOptions {
.desc("JARs to be submitted together with the pipeline")
.build();
public static final Option TARGET =
Option.builder("t")
.longOpt("target")
.hasArg()
.desc(
"The deployment target for the execution. This can take one of the following values "
+ "local/remote/yarn-session/yarn-application/kubernetes-session/kubernetes"
+ "-application")
.build();
public static final Option USE_MINI_CLUSTER =
Option.builder()
.longOpt("use-mini-cluster")
@ -91,6 +101,8 @@ public class CliFrontendOptions {
.addOption(FLINK_HOME)
.addOption(GLOBAL_CONFIG)
.addOption(USE_MINI_CLUSTER)
.addOption(TARGET)
.addOption(USE_MINI_CLUSTER)
.addOption(SAVEPOINT_PATH_OPTION)
.addOption(SAVEPOINT_CLAIM_MODE)
.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);

@ -18,12 +18,18 @@
package org.apache.flink.cdc.cli.utils;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.client.deployment.executors.LocalExecutor;
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.commons.cli.CommandLine;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
/** Utilities for handling {@link Configuration}. */
public class ConfigurationUtils {
@ -62,4 +68,11 @@ public class ConfigurationUtils {
return flattenedMap;
}
public static boolean isDeploymentMode(CommandLine commandLine) {
String target = commandLine.getOptionValue(TARGET);
return target != null
&& !target.equalsIgnoreCase(LocalExecutor.NAME)
&& !target.equalsIgnoreCase(RemoteExecutor.NAME);
}
}

@ -106,6 +106,19 @@ class CliFrontendTest {
assertThat(executor.getSavepointSettings().allowNonRestoredState()).isTrue();
}
@Test
void testDeploymentTargetConfiguration() throws Exception {
CliExecutor executor =
createExecutor(
pipelineDef(),
"--flink-home",
flinkHome(),
"-t",
"kubernetes-application",
"-n");
assertThat(executor.getDeploymentTarget()).isEqualTo("kubernetes-application");
}
@Test
void testAdditionalJar() throws Exception {
String aJar = "/foo/jar/a.jar";
@ -177,6 +190,10 @@ class CliFrontendTest {
+ " was triggered.\n"
+ " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n"
+ " (for example hdfs:///flink/savepoint-1537\n"
+ " -t,--target <arg> The deployment target for the execution. This\n"
+ " can take one of the following values\n"
+ " local/remote/yarn-session/yarn-application/ku\n"
+ " bernetes-session/kubernetes-application\n"
+ " --use-mini-cluster Use Flink MiniCluster to run the pipeline\n";
private static class NoOpComposer implements PipelineComposer {

@ -48,7 +48,7 @@ limitations under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@ -56,6 +56,11 @@ limitations under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.composer;
import org.apache.flink.configuration.Configuration;
import org.apache.commons.cli.CommandLine;
import java.nio.file.Path;
import java.util.List;
/** PipelineDeploymentExecutor to execute flink cdc job from different target. */
public interface PipelineDeploymentExecutor {
PipelineExecution.ExecutionInfo deploy(
CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars)
throws Exception;
}

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.composer.flink.deployment;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.commons.cli.CommandLine;
/** Create deployment methods corresponding to different goals. */
public class ComposeDeploymentFactory {
public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine) {
String target = commandLine.getOptionValue("target");
if (target.equalsIgnoreCase("kubernetes-application")) {
return new K8SApplicationDeploymentExecutor();
}
throw new IllegalArgumentException(
String.format("Deployment target %s is not supported", target));
}
}

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.composer.flink.deployment;
import org.apache.flink.cdc.composer.PipelineDeploymentExecutor;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.commons.cli.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
/** deploy flink cdc job by native k8s application mode. */
public class K8SApplicationDeploymentExecutor implements PipelineDeploymentExecutor {
private static final Logger LOG =
LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class);
@Override
public PipelineExecution.ExecutionInfo deploy(
CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) {
LOG.info("Submitting application in 'Flink K8S Application Mode'.");
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
List<String> jars = new ArrayList<>();
if (flinkConfig.get(PipelineOptions.JARS) == null) {
// must be added cdc dist jar by default docker container path
jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist.jar");
flinkConfig.set(PipelineOptions.JARS, jars);
}
// set the default cdc latest docker image
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, "flink/flink-cdc:latest");
flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, commandLine.getArgList());
flinkConfig.set(
ApplicationConfiguration.APPLICATION_MAIN_CLASS,
"org.apache.flink.cdc.cli.CliFrontend");
KubernetesClusterClientFactory kubernetesClusterClientFactory =
new KubernetesClusterClientFactory();
KubernetesClusterDescriptor descriptor =
kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig);
ClusterSpecification specification =
kubernetesClusterClientFactory.getClusterSpecification(flinkConfig);
ApplicationConfiguration applicationConfiguration =
ApplicationConfiguration.fromConfiguration(flinkConfig);
ClusterClient<String> client = null;
try {
ClusterClientProvider<String> clusterClientProvider =
descriptor.deployApplicationCluster(specification, applicationConfiguration);
client = clusterClientProvider.getClusterClient();
String clusterId = client.getClusterId();
LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
return new PipelineExecution.ExecutionInfo(clusterId, "submit job successful");
} catch (Exception e) {
if (client != null) {
client.shutDownCluster();
}
throw new RuntimeException("Failed to deploy Flink CDC job", e);
} finally {
descriptor.close();
if (client != null) {
client.close();
}
}
}
}

@ -93,6 +93,12 @@ public class FactoryDiscoveryUtils {
try {
T factory = getFactoryByIdentifier(identifier, factoryClass);
URL url = factory.getClass().getProtectionDomain().getCodeSource().getLocation();
String urlString = url.toString();
if (urlString.contains("usrlib")) {
String flinkHome = System.getenv("FLINK_HOME");
urlString = urlString.replace("usrlib", flinkHome + "/usrlib");
}
url = new URL(urlString);
if (Files.isDirectory(Paths.get(url.toURI()))) {
LOG.warn(
"The factory class \"{}\" is contained by directory \"{}\" instead of JAR. "
@ -104,7 +110,8 @@ public class FactoryDiscoveryUtils {
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to search JAR by factory identifier \"%s\"", identifier));
String.format("Failed to search JAR by factory identifier \"%s\"", identifier),
e);
}
}
}

Loading…
Cancel
Save