diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index 2c35956c3..14900b7f2 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -25,4 +25,12 @@ under the License.
flink-cdc-composer
+
+
+ com.ververica
+ flink-cdc-common
+ ${project.version}
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java
new file mode 100644
index 000000000..29ee33fa8
--- /dev/null
+++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.utils;
+
+import com.ververica.cdc.common.factories.Factory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Discovery utilities for {@link Factory}. */
+public class FactoryDiscoveryUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FactoryDiscoveryUtils.class);
+
+ private FactoryDiscoveryUtils() {}
+
+ /** Returns the {@link Factory} for the given identifier. */
+ @SuppressWarnings("unchecked")
+ public static T getFactoryByIdentifier(
+ String identifier, Class factoryClass) {
+
+ final ServiceLoader loader = ServiceLoader.load(Factory.class);
+ final List factoryList = new ArrayList<>();
+
+ for (Factory factory : loader) {
+ if (factory != null
+ && factory.identifier().equals(identifier)
+ && factoryClass.isAssignableFrom(factory.getClass())) {
+ factoryList.add(factory);
+ }
+ }
+
+ if (factoryList.isEmpty()) {
+ throw new RuntimeException(
+ String.format(
+ "No factory found in the classpath.\n\n"
+ + "Available factory classes are:\n\n"
+ + "%s",
+ StreamSupport.stream(loader.spliterator(), false)
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ if (factoryList.size() > 1) {
+ throw new RuntimeException(
+ String.format(
+ "Multiple factories found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ factoryList.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) factoryList.get(0);
+ }
+
+ /**
+ * Return the path of the jar file that contains the {@link Factory} for the given identifier.
+ */
+ public static URL getJarPathByIdentifier(
+ String identifier, Class factoryClass) {
+ Factory factory = getFactoryByIdentifier(identifier, factoryClass);
+ return factory.getClass().getProtectionDomain().getCodeSource().getLocation();
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
new file mode 100644
index 000000000..f6615ca25
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.utils;
+
+import com.ververica.cdc.common.factories.Factory;
+import com.ververica.cdc.composer.utils.factory.DataSinkFactory1;
+import com.ververica.cdc.composer.utils.factory.DataSourceFactory1;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FactoryDiscoveryUtils}. */
+class FactoryDiscoveryUtilsTest {
+
+ @Test
+ void getFactoryByIdentifier() {
+ assertThat(
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ "data-source-factory-1", Factory.class))
+ .isInstanceOf(DataSourceFactory1.class);
+
+ assertThat(
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ "data-sink-factory-1", Factory.class))
+ .isInstanceOf(DataSinkFactory1.class);
+
+ assertThatThrownBy(
+ () ->
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ "data-sink-factory-3", Factory.class))
+ .hasMessage(
+ "No factory found in the classpath.\n"
+ + "\n"
+ + "Available factory classes are:\n"
+ + "\n"
+ + "com.ververica.cdc.composer.utils.factory.DataSinkFactory1\n"
+ + "com.ververica.cdc.composer.utils.factory.DataSinkFactory2\n"
+ + "com.ververica.cdc.composer.utils.factory.DataSourceFactory1\n"
+ + "com.ververica.cdc.composer.utils.factory.DataSourceFactory2");
+ }
+
+ @Test
+ void getJarPathByIdentifier() {
+ assertThat(
+ FactoryDiscoveryUtils.getJarPathByIdentifier(
+ "data-source-factory-1", Factory.class)
+ .getPath())
+ .endsWith("/flink-cdc" + "-composer/target/test-classes/");
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java
new file mode 100644
index 000000000..4f7133643
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory1.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.utils.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import com.ververica.cdc.common.factories.DataSinkFactory;
+import com.ververica.cdc.common.sink.DataSink;
+import com.ververica.cdc.common.sink.EventSinkProvider;
+import com.ververica.cdc.common.sink.MetadataApplier;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A dummy {@link DataSinkFactory} for testing. */
+public class DataSinkFactory1 implements DataSinkFactory {
+ @Override
+ public DataSink createDataSink() {
+ return new DataSink() {
+ @Override
+ public EventSinkProvider getEventSinkProvider() {
+ return null;
+ }
+
+ @Override
+ public MetadataApplier getMetadataApplier() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return "data-sink-factory-1";
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory2.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory2.java
new file mode 100644
index 000000000..1150b3885
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSinkFactory2.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.utils.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import com.ververica.cdc.common.factories.DataSinkFactory;
+import com.ververica.cdc.common.sink.DataSink;
+import com.ververica.cdc.common.sink.EventSinkProvider;
+import com.ververica.cdc.common.sink.MetadataApplier;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A dummy {@link DataSinkFactory} for testing. */
+public class DataSinkFactory2 implements DataSinkFactory {
+ @Override
+ public DataSink createDataSink() {
+ return new DataSink() {
+ @Override
+ public EventSinkProvider getEventSinkProvider() {
+ return null;
+ }
+
+ @Override
+ public MetadataApplier getMetadataApplier() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return "data-source-factory-2";
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java
new file mode 100644
index 000000000..68497bf02
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory1.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.utils.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import com.ververica.cdc.common.factories.DataSourceFactory;
+import com.ververica.cdc.common.source.DataSource;
+import com.ververica.cdc.common.source.EventSourceProvider;
+import com.ververica.cdc.common.source.MetadataAccessor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A dummy {@link DataSourceFactory} for testing. */
+public class DataSourceFactory1 implements DataSourceFactory {
+ @Override
+ public DataSource createDataSource() {
+ return new DataSource() {
+ @Override
+ public EventSourceProvider getEventSourceProvider() {
+ return null;
+ }
+
+ @Override
+ public MetadataAccessor getMetadataAccessor() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return "data-source-factory-1";
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory2.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory2.java
new file mode 100644
index 000000000..052b62e4b
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/factory/DataSourceFactory2.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.utils.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import com.ververica.cdc.common.factories.DataSourceFactory;
+import com.ververica.cdc.common.source.DataSource;
+import com.ververica.cdc.common.source.EventSourceProvider;
+import com.ververica.cdc.common.source.MetadataAccessor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A dummy {@link DataSourceFactory} for testing. */
+public class DataSourceFactory2 implements DataSourceFactory {
+
+ @Override
+ public DataSource createDataSource() {
+ return new DataSource() {
+ @Override
+ public EventSourceProvider getEventSourceProvider() {
+ return null;
+ }
+
+ @Override
+ public MetadataAccessor getMetadataAccessor() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public String identifier() {
+ return "data-sink-factory-2";
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ return new HashSet<>();
+ }
+}
diff --git a/flink-cdc-composer/src/test/resources/META-INF/services/com.ververica.cdc.common.factories.Factory b/flink-cdc-composer/src/test/resources/META-INF/services/com.ververica.cdc.common.factories.Factory
new file mode 100644
index 000000000..7938060ae
--- /dev/null
+++ b/flink-cdc-composer/src/test/resources/META-INF/services/com.ververica.cdc.common.factories.Factory
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+com.ververica.cdc.composer.utils.factory.DataSinkFactory1
+com.ververica.cdc.composer.utils.factory.DataSinkFactory2
+com.ververica.cdc.composer.utils.factory.DataSourceFactory1
+com.ververica.cdc.composer.utils.factory.DataSourceFactory2
diff --git a/flink-cdc-composer/src/test/resources/log4j2-test.properties b/flink-cdc-composer/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..a9d045e0e
--- /dev/null
+++ b/flink-cdc-composer/src/test/resources/log4j2-test.properties
@@ -0,0 +1,26 @@
+################################################################################
+# Copyright 2023 Ververica Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n