[composer] Connector factory and JAR discovery utilities (#2662)

pull/2695/head
gongzhongqiang 1 year ago committed by GitHub
parent 7da3eaef77
commit 98c929e6e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,4 +25,12 @@ under the License.
<artifactId>flink-cdc-composer</artifactId> <artifactId>flink-cdc-composer</artifactId>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> </project>

@ -0,0 +1,88 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils;
import com.ververica.cdc.common.factories.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/** Discovery utilities for {@link Factory}. */
public class FactoryDiscoveryUtils {
private static final Logger LOG = LoggerFactory.getLogger(FactoryDiscoveryUtils.class);
private FactoryDiscoveryUtils() {}
/** Returns the {@link Factory} for the given identifier. */
@SuppressWarnings("unchecked")
public static <T extends Factory> T getFactoryByIdentifier(
String identifier, Class<T> factoryClass) {
final ServiceLoader<Factory> loader = ServiceLoader.load(Factory.class);
final List<Factory> factoryList = new ArrayList<>();
for (Factory factory : loader) {
if (factory != null
&& factory.identifier().equals(identifier)
&& factoryClass.isAssignableFrom(factory.getClass())) {
factoryList.add(factory);
}
}
if (factoryList.isEmpty()) {
throw new RuntimeException(
String.format(
"No factory found in the classpath.\n\n"
+ "Available factory classes are:\n\n"
+ "%s",
StreamSupport.stream(loader.spliterator(), false)
.map(f -> f.getClass().getName())
.sorted()
.collect(Collectors.joining("\n"))));
}
if (factoryList.size() > 1) {
throw new RuntimeException(
String.format(
"Multiple factories found in the classpath.\n\n"
+ "Ambiguous factory classes are:\n\n"
+ "%s",
factoryList.stream()
.map(f -> f.getClass().getName())
.sorted()
.collect(Collectors.joining("\n"))));
}
return (T) factoryList.get(0);
}
/**
* Return the path of the jar file that contains the {@link Factory} for the given identifier.
*/
public static <T extends Factory> URL getJarPathByIdentifier(
String identifier, Class<T> factoryClass) {
Factory factory = getFactoryByIdentifier(identifier, factoryClass);
return factory.getClass().getProtectionDomain().getCodeSource().getLocation();
}
}

@ -0,0 +1,65 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils;
import com.ververica.cdc.common.factories.Factory;
import com.ververica.cdc.composer.utils.factory.DataSinkFactory1;
import com.ververica.cdc.composer.utils.factory.DataSourceFactory1;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link FactoryDiscoveryUtils}. */
class FactoryDiscoveryUtilsTest {
@Test
void getFactoryByIdentifier() {
assertThat(
FactoryDiscoveryUtils.getFactoryByIdentifier(
"data-source-factory-1", Factory.class))
.isInstanceOf(DataSourceFactory1.class);
assertThat(
FactoryDiscoveryUtils.getFactoryByIdentifier(
"data-sink-factory-1", Factory.class))
.isInstanceOf(DataSinkFactory1.class);
assertThatThrownBy(
() ->
FactoryDiscoveryUtils.getFactoryByIdentifier(
"data-sink-factory-3", Factory.class))
.hasMessage(
"No factory found in the classpath.\n"
+ "\n"
+ "Available factory classes are:\n"
+ "\n"
+ "com.ververica.cdc.composer.utils.factory.DataSinkFactory1\n"
+ "com.ververica.cdc.composer.utils.factory.DataSinkFactory2\n"
+ "com.ververica.cdc.composer.utils.factory.DataSourceFactory1\n"
+ "com.ververica.cdc.composer.utils.factory.DataSourceFactory2");
}
@Test
void getJarPathByIdentifier() {
assertThat(
FactoryDiscoveryUtils.getJarPathByIdentifier(
"data-source-factory-1", Factory.class)
.getPath())
.endsWith("/flink-cdc" + "-composer/target/test-classes/");
}
}

@ -0,0 +1,60 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils.factory;
import org.apache.flink.configuration.ConfigOption;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.common.sink.EventSinkProvider;
import com.ververica.cdc.common.sink.MetadataApplier;
import java.util.HashSet;
import java.util.Set;
/** A dummy {@link DataSinkFactory} for testing. */
public class DataSinkFactory1 implements DataSinkFactory {
@Override
public DataSink createDataSink() {
return new DataSink() {
@Override
public EventSinkProvider getEventSinkProvider() {
return null;
}
@Override
public MetadataApplier getMetadataApplier() {
return null;
}
};
}
@Override
public String identifier() {
return "data-sink-factory-1";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
}

@ -0,0 +1,60 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils.factory;
import org.apache.flink.configuration.ConfigOption;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.common.sink.EventSinkProvider;
import com.ververica.cdc.common.sink.MetadataApplier;
import java.util.HashSet;
import java.util.Set;
/** A dummy {@link DataSinkFactory} for testing. */
public class DataSinkFactory2 implements DataSinkFactory {
@Override
public DataSink createDataSink() {
return new DataSink() {
@Override
public EventSinkProvider getEventSinkProvider() {
return null;
}
@Override
public MetadataApplier getMetadataApplier() {
return null;
}
};
}
@Override
public String identifier() {
return "data-source-factory-2";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
}

@ -0,0 +1,60 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils.factory;
import org.apache.flink.configuration.ConfigOption;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
import com.ververica.cdc.common.source.MetadataAccessor;
import java.util.HashSet;
import java.util.Set;
/** A dummy {@link DataSourceFactory} for testing. */
public class DataSourceFactory1 implements DataSourceFactory {
@Override
public DataSource createDataSource() {
return new DataSource() {
@Override
public EventSourceProvider getEventSourceProvider() {
return null;
}
@Override
public MetadataAccessor getMetadataAccessor() {
return null;
}
};
}
@Override
public String identifier() {
return "data-source-factory-1";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
}

@ -0,0 +1,61 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils.factory;
import org.apache.flink.configuration.ConfigOption;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.common.source.EventSourceProvider;
import com.ververica.cdc.common.source.MetadataAccessor;
import java.util.HashSet;
import java.util.Set;
/** A dummy {@link DataSourceFactory} for testing. */
public class DataSourceFactory2 implements DataSourceFactory {
@Override
public DataSource createDataSource() {
return new DataSource() {
@Override
public EventSourceProvider getEventSourceProvider() {
return null;
}
@Override
public MetadataAccessor getMetadataAccessor() {
return null;
}
};
}
@Override
public String identifier() {
return "data-sink-factory-2";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
}

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
com.ververica.cdc.composer.utils.factory.DataSinkFactory1
com.ververica.cdc.composer.utils.factory.DataSinkFactory2
com.ververica.cdc.composer.utils.factory.DataSourceFactory1
com.ververica.cdc.composer.utils.factory.DataSourceFactory2

@ -0,0 +1,26 @@
################################################################################
# Copyright 2023 Ververica Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
Loading…
Cancel
Save