diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index fcce3edbe..f364cf3d4 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -36,6 +36,24 @@ under the License.
flink-cdc-runtime
${revision}
+
+ com.ververica
+ flink-cdc-pipeline-connector-values
+ ${revision}
+ test
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
\ No newline at end of file
diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java
index e03c8dcbf..91bcbc557 100644
--- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java
+++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/flink/FlinkPipelineComposer.java
@@ -55,7 +55,8 @@ public class FlinkPipelineComposer implements PipelineComposer {
}
public static FlinkPipelineComposer ofMiniCluster() {
- return new FlinkPipelineComposer(StreamExecutionEnvironment.createLocalEnvironment(), true);
+ return new FlinkPipelineComposer(
+ StreamExecutionEnvironment.getExecutionEnvironment(), true);
}
private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking) {
diff --git a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java
index b7f0f6100..06b2da946 100644
--- a/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java
+++ b/flink-cdc-composer/src/main/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtils.java
@@ -58,9 +58,10 @@ public class FactoryDiscoveryUtils {
if (factoryList.isEmpty()) {
throw new RuntimeException(
String.format(
- "No factory found in the classpath.\n\n"
+ "Cannot find factory with identifier \"%s\" in the classpath.\n\n"
+ "Available factory classes are:\n\n"
+ "%s",
+ identifier,
StreamSupport.stream(loader.spliterator(), false)
.map(f -> f.getClass().getName())
.sorted()
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java
new file mode 100644
index 000000000..530e1804b
--- /dev/null
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.composer.flink;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import com.ververica.cdc.common.configuration.Configuration;
+import com.ververica.cdc.common.pipeline.PipelineOptions;
+import com.ververica.cdc.composer.PipelineExecution;
+import com.ververica.cdc.composer.definition.PipelineDef;
+import com.ververica.cdc.composer.definition.SinkDef;
+import com.ververica.cdc.composer.definition.SourceDef;
+import com.ververica.cdc.connectors.values.ValuesDatabase;
+import com.ververica.cdc.connectors.values.factory.ValuesDataFactory;
+import com.ververica.cdc.connectors.values.sink.ValuesDataSinkOptions;
+import com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper;
+import com.ververica.cdc.connectors.values.source.ValuesDataSourceOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+
+import static com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
+import static com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
+import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for {@link FlinkPipelineComposer}. */
+class FlinkPipelineComposerITCase {
+ private static final int MAX_PARALLELISM = 4;
+
+ // Always use parent-first classloader for CDC classes.
+ // The reason is that ValuesDatabase uses static field for holding data, we need to make sure
+ // the class is loaded by AppClassloader so that we can verify data in the test case.
+ private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
+ new org.apache.flink.configuration.Configuration();
+
+ static {
+ MINI_CLUSTER_CONFIG.set(
+ ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ Collections.singletonList("com.ververica.cdc"));
+ }
+
+ /**
+ * Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
+ * every test case.
+ */
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(MAX_PARALLELISM)
+ .setConfiguration(MINI_CLUSTER_CONFIG)
+ .build());
+
+ private final PrintStream standardOut = System.out;
+ private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
+
+ @BeforeEach
+ void init() {
+ // Take over STDOUT as we need to check the output of values sink
+ System.setOut(new PrintStream(outCaptor));
+ // Initialize in-memory database
+ ValuesDatabase.clear();
+ }
+
+ @AfterEach
+ void cleanup() {
+ System.setOut(standardOut);
+ }
+
+ @Test
+ void testSingleSplitSingleTable() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ List results = ValuesDatabase.getResults(TABLE_1);
+ assertThat(results)
+ .contains(
+ "default_namespace.default_schema.table1:col1=2;newCol3=x",
+ "default_namespace.default_schema.table1:col1=3;newCol3=");
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(outputEvents)
+ .containsExactly(
+ "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
+ "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
+ "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
+ "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
+ }
+
+ @Test
+ void testSingleSplitMultipleTables() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ List table1Results = ValuesDatabase.getResults(TABLE_1);
+ assertThat(table1Results)
+ .containsExactly(
+ "default_namespace.default_schema.table1:col1=2;newCol3=x",
+ "default_namespace.default_schema.table1:col1=3;newCol3=");
+ List table2Results = ValuesDatabase.getResults(TABLE_2);
+ assertThat(table2Results)
+ .contains(
+ "default_namespace.default_schema.table2:col1=1;col2=1",
+ "default_namespace.default_schema.table2:col1=2;col2=2",
+ "default_namespace.default_schema.table2:col1=3;col2=3");
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(outputEvents)
+ .containsExactly(
+ "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+ "CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
+ "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
+ "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
+ "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
+ "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
+ }
+
+ @Test
+ void testMultiSplitsSingleTable() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, MAX_PARALLELISM);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ List table1Results = ValuesDatabase.getResults(TABLE_1);
+ assertThat(table1Results)
+ .contains(
+ "default_namespace.default_schema.table1:col1=1;col2=1;col3=x",
+ "default_namespace.default_schema.table1:col1=3;col2=3;col3=x",
+ "default_namespace.default_schema.table1:col1=5;col2=5;col3=");
+ }
+}
diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
index cb4c45522..a6b90ec78 100644
--- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
+++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/utils/FactoryDiscoveryUtilsTest.java
@@ -43,15 +43,8 @@ class FactoryDiscoveryUtilsTest {
() ->
FactoryDiscoveryUtils.getFactoryByIdentifier(
"data-sink-factory-3", Factory.class))
- .hasMessage(
- "No factory found in the classpath.\n"
- + "\n"
- + "Available factory classes are:\n"
- + "\n"
- + "com.ververica.cdc.composer.utils.factory.DataSinkFactory1\n"
- + "com.ververica.cdc.composer.utils.factory.DataSinkFactory2\n"
- + "com.ververica.cdc.composer.utils.factory.DataSourceFactory1\n"
- + "com.ververica.cdc.composer.utils.factory.DataSourceFactory2");
+ .hasMessageStartingWith(
+ "Cannot find factory with identifier \"data-sink-factory-3\" in the classpath");
}
@Test
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/sink/ValuesDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/sink/ValuesDataSink.java
index 2c67bd0df..353b23e18 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/sink/ValuesDataSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/sink/ValuesDataSink.java
@@ -78,7 +78,11 @@ public class ValuesDataSink implements DataSink, Serializable {
@Override
public SinkWriter createWriter(InitContext context) {
- return new ValuesSinkWriter(materializedInMemory, print);
+ return new ValuesSinkWriter(
+ materializedInMemory,
+ print,
+ context.getSubtaskId(),
+ context.getNumberOfParallelSubtasks());
}
}
@@ -91,6 +95,10 @@ public class ValuesDataSink implements DataSink, Serializable {
private final boolean print;
+ private final int subtaskIndex;
+
+ private final int numSubtasks;
+
/**
* keep the relationship of TableId and Schema as write method may rely on the schema
* information of DataChangeEvent.
@@ -99,10 +107,13 @@ public class ValuesDataSink implements DataSink, Serializable {
private final Map> fieldGetterMaps;
- public ValuesSinkWriter(boolean materializedInMemory, boolean print) {
+ public ValuesSinkWriter(
+ boolean materializedInMemory, boolean print, int subtaskIndex, int numSubtasks) {
super();
this.materializedInMemory = materializedInMemory;
this.print = print;
+ this.subtaskIndex = subtaskIndex;
+ this.numSubtasks = numSubtasks;
schemaMaps = new HashMap<>();
fieldGetterMaps = new HashMap<>();
}
@@ -130,10 +141,13 @@ public class ValuesDataSink implements DataSink, Serializable {
ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event);
}
if (print) {
+ String prefix = numSubtasks > 1 ? subtaskIndex + "> " : "";
// print the detail message to console for verification.
System.out.println(
- ValuesDataSinkHelper.convertEventToStr(
- event, fieldGetterMaps.get(((ChangeEvent) event).tableId())));
+ prefix
+ + ValuesDataSinkHelper.convertEventToStr(
+ event,
+ fieldGetterMaps.get(((ChangeEvent) event).tableId())));
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java
index 54f15acaf..0a3b7549a 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java
@@ -54,10 +54,10 @@ public class ValuesDataSourceHelper {
CUSTOM_SOURCE_EVENTS
}
- private static final TableId table1 =
+ public static final TableId TABLE_1 =
TableId.tableId("default_namespace", "default_schema", "table1");
- private static final TableId table2 =
+ public static final TableId TABLE_2 =
TableId.tableId("default_namespace", "default_schema", "table2");
/**
@@ -106,7 +106,7 @@ public class ValuesDataSourceHelper {
}
}
- private static List> singleSplitSingleTable() {
+ public static List> singleSplitSingleTable() {
List> eventOfSplits = new ArrayList<>();
List split1 = new ArrayList<>();
@@ -117,7 +117,7 @@ public class ValuesDataSourceHelper {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
- CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
split1.add(createTableEvent);
BinaryRecordDataGenerator generator =
@@ -125,7 +125,7 @@ public class ValuesDataSourceHelper {
// insert
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -134,7 +134,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -143,7 +143,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent2);
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
@@ -156,20 +156,20 @@ public class ValuesDataSourceHelper {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
- new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
+ new AddColumnEvent(TABLE_1, Collections.singletonList(columnWithPosition));
split1.add(addColumnEvent);
// rename column
Map nameMapping = new HashMap<>();
nameMapping.put("col2", "newCol2");
nameMapping.put("col3", "newCol3");
- RenameColumnEvent renameColumnEvent = new RenameColumnEvent(table1, nameMapping);
+ RenameColumnEvent renameColumnEvent = new RenameColumnEvent(TABLE_1, nameMapping);
split1.add(renameColumnEvent);
// drop column
DropColumnEvent dropColumnEvent =
new DropColumnEvent(
- table1,
+ TABLE_1,
Collections.singletonList(
Column.physicalColumn("newCol2", DataTypes.STRING())));
split1.add(dropColumnEvent);
@@ -177,7 +177,7 @@ public class ValuesDataSourceHelper {
// delete
split1.add(
DataChangeEvent.deleteEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -187,7 +187,7 @@ public class ValuesDataSourceHelper {
// update
split1.add(
DataChangeEvent.updateEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -203,7 +203,7 @@ public class ValuesDataSourceHelper {
return eventOfSplits;
}
- private static List> singleSplitMultiTables() {
+ public static List> singleSplitMultiTables() {
List> eventOfSplits = new ArrayList<>();
List split1 = new ArrayList<>();
@@ -214,9 +214,9 @@ public class ValuesDataSourceHelper {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
- CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
split1.add(createTableEvent);
- CreateTableEvent createTableEvent2 = new CreateTableEvent(table2, schema);
+ CreateTableEvent createTableEvent2 = new CreateTableEvent(TABLE_2, schema);
split1.add(createTableEvent2);
BinaryRecordDataGenerator generator =
@@ -224,7 +224,7 @@ public class ValuesDataSourceHelper {
// insert into table1
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -233,7 +233,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -242,7 +242,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent2);
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
@@ -255,13 +255,13 @@ public class ValuesDataSourceHelper {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
- new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
+ new AddColumnEvent(TABLE_1, Collections.singletonList(columnWithPosition));
split1.add(addColumnEvent);
// insert into table2
insertEvent1 =
DataChangeEvent.insertEvent(
- table2,
+ TABLE_2,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -270,7 +270,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent1);
insertEvent2 =
DataChangeEvent.insertEvent(
- table2,
+ TABLE_2,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -279,7 +279,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent2);
insertEvent3 =
DataChangeEvent.insertEvent(
- table2,
+ TABLE_2,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
@@ -291,13 +291,13 @@ public class ValuesDataSourceHelper {
Map nameMapping = new HashMap<>();
nameMapping.put("col2", "newCol2");
nameMapping.put("col3", "newCol3");
- RenameColumnEvent renameColumnEvent = new RenameColumnEvent(table1, nameMapping);
+ RenameColumnEvent renameColumnEvent = new RenameColumnEvent(TABLE_1, nameMapping);
split1.add(renameColumnEvent);
// drop column
DropColumnEvent dropColumnEvent =
new DropColumnEvent(
- table1,
+ TABLE_1,
Collections.singletonList(
Column.physicalColumn("newCol2", DataTypes.STRING())));
split1.add(dropColumnEvent);
@@ -305,7 +305,7 @@ public class ValuesDataSourceHelper {
// delete
split1.add(
DataChangeEvent.deleteEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -315,7 +315,7 @@ public class ValuesDataSourceHelper {
// update
split1.add(
DataChangeEvent.updateEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -331,7 +331,7 @@ public class ValuesDataSourceHelper {
return eventOfSplits;
}
- private static List> multiSplitsSingleTable() {
+ public static List> multiSplitsSingleTable() {
List> eventOfSplits = new ArrayList<>();
List split1 = new ArrayList<>();
// create table
@@ -341,15 +341,15 @@ public class ValuesDataSourceHelper {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
- CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
split1.add(createTableEvent);
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
- // create slit1
+ // create split1
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -358,7 +358,7 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -367,12 +367,12 @@ public class ValuesDataSourceHelper {
split1.add(insertEvent2);
eventOfSplits.add(split1);
- // create slit2
+ // create split2
List split2 = new ArrayList<>();
split2.add(createTableEvent);
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
@@ -381,7 +381,7 @@ public class ValuesDataSourceHelper {
split2.add(insertEvent3);
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("4"),
@@ -390,12 +390,12 @@ public class ValuesDataSourceHelper {
split2.add(insertEvent4);
eventOfSplits.add(split2);
- // create slit3
+ // create split3
List split3 = new ArrayList<>();
split3.add(createTableEvent);
DataChangeEvent insertEvent5 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("5"),
@@ -404,7 +404,7 @@ public class ValuesDataSourceHelper {
split3.add(insertEvent5);
DataChangeEvent insertEvent6 =
DataChangeEvent.insertEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("6"),
@@ -413,12 +413,12 @@ public class ValuesDataSourceHelper {
split3.add(insertEvent6);
eventOfSplits.add(split3);
- // create slit4
+ // create split4
List split4 = new ArrayList<>();
split4.add(createTableEvent);
DataChangeEvent deleteEvent1 =
DataChangeEvent.deleteEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
@@ -427,7 +427,7 @@ public class ValuesDataSourceHelper {
split4.add(deleteEvent1);
DataChangeEvent deleteEvent2 =
DataChangeEvent.deleteEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("4"),
@@ -436,7 +436,7 @@ public class ValuesDataSourceHelper {
split4.add(deleteEvent2);
DataChangeEvent deleteEvent3 =
DataChangeEvent.deleteEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("6"),
@@ -447,14 +447,14 @@ public class ValuesDataSourceHelper {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
- new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
+ new AddColumnEvent(TABLE_1, Collections.singletonList(columnWithPosition));
split4.add(addColumnEvent);
generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent updateEvent1 =
DataChangeEvent.updateEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
@@ -470,7 +470,7 @@ public class ValuesDataSourceHelper {
split4.add(updateEvent1);
DataChangeEvent updateEvent2 =
DataChangeEvent.updateEvent(
- table1,
+ TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),