[cdc-composer] Initialize context with existing config of SourceDef and SinkDef (#2749)

This closes #2749.
pull/2673/head
Kunni 1 year ago committed by GitHub
parent f697c5fff3
commit a26607a027
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,7 +20,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.DataSourceFactory;
@ -115,7 +114,7 @@ public class FlinkPipelineComposer implements PipelineComposer {
return sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig().toMap(),
new Configuration(),
sinkDef.getConfig(),
Thread.currentThread().getContextClassLoader()));
}
}

@ -21,7 +21,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
@ -52,7 +51,7 @@ public class DataSourceTranslator {
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig().toMap(),
new Configuration(),
sourceDef.getConfig(),
Thread.currentThread().getContextClassLoader()));
// Add source JAR to environment

@ -0,0 +1,59 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.flink;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.sink.DataSink;
import com.ververica.cdc.composer.definition.SinkDef;
import com.ververica.cdc.composer.utils.FactoryDiscoveryUtils;
import com.ververica.cdc.composer.utils.factory.DataSinkFactory1;
import org.junit.Assert;
import org.junit.Test;
/** A test for the {@link FlinkPipelineComposer}. */
public class FlinkPipelineComposerTest {
@Test
public void testCreateDataSinkFromSinkDef() {
SinkDef sinkDef =
new SinkDef(
"data-sink-factory-1",
"sink-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "0.0.0.0")
.build()));
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sinkDef.getType(), DataSinkFactory.class);
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig().toMap(),
sinkDef.getConfig(),
Thread.currentThread().getContextClassLoader()));
Assert.assertTrue(dataSink instanceof DataSinkFactory1.TestDataSink);
Assert.assertEquals("0.0.0.0", ((DataSinkFactory1.TestDataSink) dataSink).getHost());
}
}

@ -0,0 +1,59 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.flink.translator;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import com.ververica.cdc.common.configuration.Configuration;
import com.ververica.cdc.common.factories.DataSourceFactory;
import com.ververica.cdc.common.factories.FactoryHelper;
import com.ververica.cdc.common.source.DataSource;
import com.ververica.cdc.composer.definition.SourceDef;
import com.ververica.cdc.composer.utils.FactoryDiscoveryUtils;
import com.ververica.cdc.composer.utils.factory.DataSourceFactory1;
import org.junit.Assert;
import org.junit.Test;
/** A test for the {@link DataSourceTranslator}. */
public class DataSourceTranslatorTest {
@Test
public void testCreateDataSourceFromSourceDef() {
SourceDef sourceDef =
new SourceDef(
"data-source-factory-1",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("host", "0.0.0.0")
.build()));
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sourceDef.getType(), DataSourceFactory.class);
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig().toMap(),
sourceDef.getConfig(),
Thread.currentThread().getContextClassLoader()));
Assert.assertTrue(dataSource instanceof DataSourceFactory1.TestDataSource);
Assert.assertEquals("0.0.0.0", ((DataSourceFactory1.TestDataSource) dataSource).getHost());
}
}

@ -29,17 +29,7 @@ import java.util.Set;
public class DataSinkFactory1 implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
return new DataSink() {
@Override
public EventSinkProvider getEventSinkProvider() {
return null;
}
@Override
public MetadataApplier getMetadataApplier() {
return null;
}
};
return new TestDataSink(context.getConfiguration().get(TestOptions.HOST));
}
@Override
@ -54,6 +44,32 @@ public class DataSinkFactory1 implements DataSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
Set<ConfigOption<?>> options = new HashSet<>();
options.add(TestOptions.HOST);
return options;
}
/** A dummy {@link DataSink} for testing. */
public static class TestDataSink implements DataSink {
private final String host;
public TestDataSink(String host) {
this.host = host;
}
public String getHost() {
return host;
}
@Override
public EventSinkProvider getEventSinkProvider() {
return null;
}
@Override
public MetadataApplier getMetadataApplier() {
return null;
}
}
}

@ -29,17 +29,7 @@ import java.util.Set;
public class DataSourceFactory1 implements DataSourceFactory {
@Override
public DataSource createDataSource(Context context) {
return new DataSource() {
@Override
public EventSourceProvider getEventSourceProvider() {
return null;
}
@Override
public MetadataAccessor getMetadataAccessor() {
return null;
}
};
return new TestDataSource(context.getConfiguration().get(TestOptions.HOST));
}
@Override
@ -54,6 +44,32 @@ public class DataSourceFactory1 implements DataSourceFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
Set<ConfigOption<?>> options = new HashSet<>();
options.add(TestOptions.HOST);
return options;
}
/** A dummy {@link DataSource} for testing. */
public static class TestDataSource implements DataSource {
private final String host;
public TestDataSource(String host) {
this.host = host;
}
public String getHost() {
return host;
}
@Override
public EventSourceProvider getEventSourceProvider() {
return null;
}
@Override
public MetadataAccessor getMetadataAccessor() {
return null;
}
}
}

@ -0,0 +1,35 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.composer.utils.factory;
import com.ververica.cdc.common.configuration.ConfigOption;
import com.ververica.cdc.common.configuration.ConfigOptions;
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.factories.DataSourceFactory;
/**
* A class used to provide simulation configuration parameters for {@link DataSourceFactory} and
* {@link DataSinkFactory}.
*/
public class TestOptions {
public static final ConfigOption<String> HOST =
ConfigOptions.key("host")
.stringType()
.noDefaultValue()
.withDescription("IP address or host name of the data source.");
}
Loading…
Cancel
Save