[cdc-composer][tests] Add integration tests for FlinkPipelineComposer (#2776)
This closes #2766.pull/2795/head
parent
100fb4a59e
commit
cc9d368b63
@ -0,0 +1,251 @@
|
||||
/*
|
||||
* Copyright 2023 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.ververica.cdc.composer.flink;
|
||||
|
||||
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
||||
import org.apache.flink.test.junit5.MiniClusterExtension;
|
||||
|
||||
import com.ververica.cdc.common.configuration.Configuration;
|
||||
import com.ververica.cdc.common.pipeline.PipelineOptions;
|
||||
import com.ververica.cdc.composer.PipelineExecution;
|
||||
import com.ververica.cdc.composer.definition.PipelineDef;
|
||||
import com.ververica.cdc.composer.definition.SinkDef;
|
||||
import com.ververica.cdc.composer.definition.SourceDef;
|
||||
import com.ververica.cdc.connectors.values.ValuesDatabase;
|
||||
import com.ververica.cdc.connectors.values.factory.ValuesDataFactory;
|
||||
import com.ververica.cdc.connectors.values.sink.ValuesDataSinkOptions;
|
||||
import com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper;
|
||||
import com.ververica.cdc.connectors.values.source.ValuesDataSourceOptions;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
|
||||
import static com.ververica.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2;
|
||||
import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/** Integration test for {@link FlinkPipelineComposer}. */
|
||||
class FlinkPipelineComposerITCase {
|
||||
private static final int MAX_PARALLELISM = 4;
|
||||
|
||||
// Always use parent-first classloader for CDC classes.
|
||||
// The reason is that ValuesDatabase uses static field for holding data, we need to make sure
|
||||
// the class is loaded by AppClassloader so that we can verify data in the test case.
|
||||
private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
|
||||
new org.apache.flink.configuration.Configuration();
|
||||
|
||||
static {
|
||||
MINI_CLUSTER_CONFIG.set(
|
||||
ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
|
||||
Collections.singletonList("com.ververica.cdc"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
|
||||
* every test case.
|
||||
*/
|
||||
@RegisterExtension
|
||||
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
|
||||
new MiniClusterExtension(
|
||||
new MiniClusterResourceConfiguration.Builder()
|
||||
.setNumberTaskManagers(1)
|
||||
.setNumberSlotsPerTaskManager(MAX_PARALLELISM)
|
||||
.setConfiguration(MINI_CLUSTER_CONFIG)
|
||||
.build());
|
||||
|
||||
private final PrintStream standardOut = System.out;
|
||||
private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
|
||||
|
||||
@BeforeEach
|
||||
void init() {
|
||||
// Take over STDOUT as we need to check the output of values sink
|
||||
System.setOut(new PrintStream(outCaptor));
|
||||
// Initialize in-memory database
|
||||
ValuesDatabase.clear();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void cleanup() {
|
||||
System.setOut(standardOut);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSingleSplitSingleTable() throws Exception {
|
||||
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
|
||||
|
||||
// Setup value source
|
||||
Configuration sourceConfig = new Configuration();
|
||||
sourceConfig.set(
|
||||
ValuesDataSourceOptions.EVENT_SET_ID,
|
||||
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
|
||||
SourceDef sourceDef =
|
||||
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
|
||||
|
||||
// Setup value sink
|
||||
Configuration sinkConfig = new Configuration();
|
||||
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
|
||||
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
|
||||
|
||||
// Setup pipeline
|
||||
Configuration pipelineConfig = new Configuration();
|
||||
pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1);
|
||||
PipelineDef pipelineDef =
|
||||
new PipelineDef(
|
||||
sourceDef,
|
||||
sinkDef,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
pipelineConfig);
|
||||
|
||||
// Execute the pipeline
|
||||
PipelineExecution execution = composer.compose(pipelineDef);
|
||||
execution.execute();
|
||||
|
||||
// Check result in ValuesDatabase
|
||||
List<String> results = ValuesDatabase.getResults(TABLE_1);
|
||||
assertThat(results)
|
||||
.contains(
|
||||
"default_namespace.default_schema.table1:col1=2;newCol3=x",
|
||||
"default_namespace.default_schema.table1:col1=3;newCol3=");
|
||||
|
||||
// Check the order and content of all received events
|
||||
String[] outputEvents = outCaptor.toString().trim().split("\n");
|
||||
assertThat(outputEvents)
|
||||
.containsExactly(
|
||||
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
|
||||
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
|
||||
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
|
||||
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSingleSplitMultipleTables() throws Exception {
|
||||
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
|
||||
|
||||
// Setup value source
|
||||
Configuration sourceConfig = new Configuration();
|
||||
sourceConfig.set(
|
||||
ValuesDataSourceOptions.EVENT_SET_ID,
|
||||
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
|
||||
SourceDef sourceDef =
|
||||
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
|
||||
|
||||
// Setup value sink
|
||||
Configuration sinkConfig = new Configuration();
|
||||
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
|
||||
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
|
||||
|
||||
// Setup pipeline
|
||||
Configuration pipelineConfig = new Configuration();
|
||||
pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, 1);
|
||||
PipelineDef pipelineDef =
|
||||
new PipelineDef(
|
||||
sourceDef,
|
||||
sinkDef,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
pipelineConfig);
|
||||
|
||||
// Execute the pipeline
|
||||
PipelineExecution execution = composer.compose(pipelineDef);
|
||||
execution.execute();
|
||||
|
||||
// Check result in ValuesDatabase
|
||||
List<String> table1Results = ValuesDatabase.getResults(TABLE_1);
|
||||
assertThat(table1Results)
|
||||
.containsExactly(
|
||||
"default_namespace.default_schema.table1:col1=2;newCol3=x",
|
||||
"default_namespace.default_schema.table1:col1=3;newCol3=");
|
||||
List<String> table2Results = ValuesDatabase.getResults(TABLE_2);
|
||||
assertThat(table2Results)
|
||||
.contains(
|
||||
"default_namespace.default_schema.table2:col1=1;col2=1",
|
||||
"default_namespace.default_schema.table2:col1=2;col2=2",
|
||||
"default_namespace.default_schema.table2:col1=3;col2=3");
|
||||
|
||||
// Check the order and content of all received events
|
||||
String[] outputEvents = outCaptor.toString().trim().split("\n");
|
||||
assertThat(outputEvents)
|
||||
.containsExactly(
|
||||
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
|
||||
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
|
||||
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
|
||||
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
|
||||
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
|
||||
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMultiSplitsSingleTable() throws Exception {
|
||||
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
|
||||
|
||||
// Setup value source
|
||||
Configuration sourceConfig = new Configuration();
|
||||
sourceConfig.set(
|
||||
ValuesDataSourceOptions.EVENT_SET_ID,
|
||||
ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE);
|
||||
SourceDef sourceDef =
|
||||
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
|
||||
|
||||
// Setup value sink
|
||||
Configuration sinkConfig = new Configuration();
|
||||
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
|
||||
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
|
||||
|
||||
// Setup pipeline
|
||||
Configuration pipelineConfig = new Configuration();
|
||||
pipelineConfig.set(PipelineOptions.GLOBAL_PARALLELISM, MAX_PARALLELISM);
|
||||
PipelineDef pipelineDef =
|
||||
new PipelineDef(
|
||||
sourceDef,
|
||||
sinkDef,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
pipelineConfig);
|
||||
|
||||
// Execute the pipeline
|
||||
PipelineExecution execution = composer.compose(pipelineDef);
|
||||
execution.execute();
|
||||
|
||||
// Check result in ValuesDatabase
|
||||
List<String> table1Results = ValuesDatabase.getResults(TABLE_1);
|
||||
assertThat(table1Results)
|
||||
.contains(
|
||||
"default_namespace.default_schema.table1:col1=1;col2=1;col3=x",
|
||||
"default_namespace.default_schema.table1:col1=3;col2=3;col3=x",
|
||||
"default_namespace.default_schema.table1:col1=5;col2=5;col3=");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue