[cdc-composer] Add FlinkEnvironmentUtils for adding JAR to StreamExecutionEnvironment

pull/2749/head
Qingsheng Ren 1 year ago committed by Leonard Xu
parent 32de7776d2
commit 5f0343e29e

@ -0,0 +1,52 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.common.annotation.Internal;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/** Utilities for {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. */
@Internal
public class FlinkEnvironmentUtils {
/**
* Add the specified JAR to {@link StreamExecutionEnvironment} so that the JAR will be uploaded
* together with the job graph.
*/
public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
try {
Class<StreamExecutionEnvironment> envClass = StreamExecutionEnvironment.class;
Field field = envClass.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = ((Configuration) field.get(env));
List<String> jars =
configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>());
jars.add(jarUrl.toString());
configuration.set(PipelineOptions.JARS, jars);
} catch (Exception e) {
throw new RuntimeException("Failed to add JAR to Flink execution environment", e);
}
}
}

@ -16,18 +16,23 @@
package com.ververica.cdc.composer.utils;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.factories.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/** Discovery utilities for {@link Factory}. */
@Internal
public class FactoryDiscoveryUtils {
private static final Logger LOG = LoggerFactory.getLogger(FactoryDiscoveryUtils.class);
@ -80,9 +85,23 @@ public class FactoryDiscoveryUtils {
/**
* Return the path of the jar file that contains the {@link Factory} for the given identifier.
*/
public static <T extends Factory> URL getJarPathByIdentifier(
public static <T extends Factory> Optional<URL> getJarPathByIdentifier(
String identifier, Class<T> factoryClass) {
Factory factory = getFactoryByIdentifier(identifier, factoryClass);
return factory.getClass().getProtectionDomain().getCodeSource().getLocation();
try {
T factory = getFactoryByIdentifier(identifier, factoryClass);
URL url = factory.getClass().getProtectionDomain().getCodeSource().getLocation();
if (Files.isDirectory(Paths.get(url.toURI()))) {
LOG.warn(
"The factory class \"{}\" is contained by directory \"{}\" instead of JAR. "
+ "This might happen in integration test. Will ignore the directory.",
factory.getClass().getCanonicalName(),
url);
return Optional.empty();
}
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to search JAR by factory identifier \"%s\"", identifier));
}
}
}

@ -58,8 +58,7 @@ class FactoryDiscoveryUtilsTest {
void getJarPathByIdentifier() {
assertThat(
FactoryDiscoveryUtils.getJarPathByIdentifier(
"data-source-factory-1", Factory.class)
.getPath())
.endsWith("/flink-cdc" + "-composer/target/test-classes/");
"data-source-factory-1", Factory.class))
.isNotPresent();
}
}

Loading…
Cancel
Save