[cdc-pipeline-connector][values] Add event set MULTI_SPLITS_SINGLE_TABLE to mock source with multiple splits (#2737)

This closes #2737.
pull/2764/head
Kunni 1 year ago committed by GitHub
parent 071523eae6
commit 4a5f2161aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -40,8 +40,8 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {
@Override
public DataSource createDataSource(Context context) {
ValuesDataSourceHelper.SourceEventType eventType =
context.getConfiguration().get(ValuesDataSourceOptions.SOURCE_EVENT_TYPE);
ValuesDataSourceHelper.EventSetId eventType =
context.getConfiguration().get(ValuesDataSourceOptions.EVENT_SET_ID);
int failAtPos =
context.getConfiguration().get(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX);
return new ValuesDataSource(eventType, failAtPos);
@ -66,7 +66,7 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ValuesDataSourceOptions.SOURCE_EVENT_TYPE);
options.add(ValuesDataSourceOptions.EVENT_SET_ID);
options.add(ValuesDataSourceOptions.FAILURE_INJECTION_INDEX);
options.add(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY);
return options;

@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@ -54,25 +55,29 @@ import java.util.List;
public class ValuesDataSource implements DataSource {
/** index of testCase for {@link ValuesDataSourceHelper}. */
private final ValuesDataSourceHelper.SourceEventType eventType;
private final ValuesDataSourceHelper.EventSetId eventSetId;
/** index for {@link EventIteratorReader} to fail when reading. */
private final int failAtPos;
public ValuesDataSource(ValuesDataSourceHelper.SourceEventType eventType) {
this.eventType = eventType;
public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId) {
this.eventSetId = eventSetId;
this.failAtPos = Integer.MAX_VALUE;
}
public ValuesDataSource(ValuesDataSourceHelper.SourceEventType eventType, int failAtPos) {
this.eventType = eventType;
public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId, int failAtPos) {
this.eventSetId = eventSetId;
this.failAtPos = failAtPos;
}
@Override
public EventSourceProvider getEventSourceProvider() {
ValuesDataSourceHelper.setSourceEvents(eventType);
return FlinkSourceProvider.of(new ValuesSource(failAtPos));
ValuesDataSourceHelper.setSourceEvents(eventSetId);
HybridSource<Event> hybridSource =
HybridSource.builder(new ValuesSource(failAtPos, eventSetId, true))
.addSource(new ValuesSource(failAtPos, eventSetId, false))
.build();
return FlinkSourceProvider.of(hybridSource);
}
@Override
@ -91,8 +96,18 @@ public class ValuesDataSource implements DataSource {
private final int failAtPos;
public ValuesSource(int failAtPos) {
private final ValuesDataSourceHelper.EventSetId eventSetId;
/** True this source is in snapshot stage, otherwise is in incremental stage. */
private final boolean isInSnapshotPhase;
public ValuesSource(
int failAtPos,
ValuesDataSourceHelper.EventSetId eventSetId,
boolean isInSnapshotPhase) {
this.failAtPos = failAtPos;
this.eventSetId = eventSetId;
this.isInSnapshotPhase = isInSnapshotPhase;
}
@Override
@ -103,10 +118,17 @@ public class ValuesDataSource implements DataSource {
@Override
public SplitEnumerator<EventIteratorSplit, Collection<EventIteratorSplit>> createEnumerator(
SplitEnumeratorContext<EventIteratorSplit> enumContext) {
ValuesDataSourceHelper.setSourceEvents(eventSetId);
Collection<EventIteratorSplit> eventIteratorSplits = new ArrayList<>();
List<List<Event>> eventWithSplits = ValuesDataSourceHelper.getSourceEvents();
for (int i = 0; i < eventWithSplits.size(); i++) {
eventIteratorSplits.add(new EventIteratorSplit(i, 0));
// make the last EventIteratorSplit of eventWithSplits to be an incremental
// EventIteratorSplit.
if (isInSnapshotPhase) {
for (int i = 0; i < eventWithSplits.size() - 1; i++) {
eventIteratorSplits.add(new EventIteratorSplit(i, 0));
}
} else {
eventIteratorSplits.add(new EventIteratorSplit(eventWithSplits.size() - 1, 0));
}
return new IteratorSourceEnumerator<>(enumContext, eventIteratorSplits);
}
@ -133,7 +155,7 @@ public class ValuesDataSource implements DataSource {
@Override
public SourceReader<Event, EventIteratorSplit> createReader(
SourceReaderContext readerContext) {
return new EventIteratorReader(readerContext, failAtPos);
return new EventIteratorReader(readerContext, failAtPos, eventSetId);
}
private static void serializeEventIteratorSplit(
@ -238,11 +260,23 @@ public class ValuesDataSource implements DataSource {
// position for this Split to fail
private final int failAtPos;
private final ValuesDataSourceHelper.EventSetId eventSetId;
private int numberOfEventsEmit = 0;
public EventIteratorReader(SourceReaderContext context, int failAtPos) {
public EventIteratorReader(
SourceReaderContext context,
int failAtPos,
ValuesDataSourceHelper.EventSetId eventSetId) {
super(context);
this.failAtPos = failAtPos;
this.eventSetId = eventSetId;
}
@Override
public void start() {
ValuesDataSourceHelper.setSourceEvents(eventSetId);
super.start();
}
@Override

@ -37,16 +37,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** A helper class for {@link ValuesDataSource} to build events of each split. */
/**
* A helper class for {@link ValuesDataSource} to build events of each split.
*
* <p>the last list of getSourceEvents method is defined as the split for incremental stage.
*/
public class ValuesDataSourceHelper {
/**
* Different situations for creating sourceEvents, {@link
* ValuesDataSourceOptions#SOURCE_EVENT_TYPE}.
* Different situations for creating sourceEvents, {@link ValuesDataSourceOptions#EVENT_SET_ID}.
*/
public enum SourceEventType {
public enum EventSetId {
SINGLE_SPLIT_SINGLE_TABLE,
SINGLE_SPLIT_MULTI_TABLES,
MULTI_SPLITS_SINGLE_TABLE,
CUSTOM_SOURCE_EVENTS
}
@ -62,8 +66,8 @@ public class ValuesDataSourceHelper {
public static List<List<Event>> getSourceEvents() {
if (sourceEvents == null) {
throw new IllegalArgumentException(
"sourceEvents should be set by `setSourceEvents` method.");
// use default enum of SINGLE_SPLIT_SINGLE_TABLE
sourceEvents = singleSplitSingleTable();
}
return sourceEvents;
}
@ -74,19 +78,23 @@ public class ValuesDataSourceHelper {
}
/** set sourceEvents using predefined events. */
public static void setSourceEvents(SourceEventType eventType) {
public static void setSourceEvents(EventSetId eventType) {
switch (eventType) {
case SINGLE_SPLIT_SINGLE_TABLE:
{
sourceEvents = singleSplitSingleTable();
break;
}
case SINGLE_SPLIT_MULTI_TABLES:
{
sourceEvents = singleSplitMultiTables();
break;
}
case MULTI_SPLITS_SINGLE_TABLE:
{
sourceEvents = multiSplitsSingleTable();
break;
}
case CUSTOM_SOURCE_EVENTS:
{
break;
@ -320,4 +328,162 @@ public class ValuesDataSourceHelper {
eventOfSplits.add(split1);
return eventOfSplits;
}
private static List<List<Event>> multiSplitsSingleTable() {
List<List<Event>> eventOfSplits = new ArrayList<>();
List<Event> split1 = new ArrayList<>();
// create table
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
split1.add(createTableEvent);
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
// create slit1
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
split1.add(insertEvent2);
eventOfSplits.add(split1);
// create slit2
List<Event> split2 = new ArrayList<>();
split2.add(createTableEvent);
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
}));
split2.add(insertEvent3);
DataChangeEvent insertEvent4 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("4"),
BinaryStringData.fromString("4")
}));
split2.add(insertEvent4);
eventOfSplits.add(split2);
// create slit3
List<Event> split3 = new ArrayList<>();
split3.add(createTableEvent);
DataChangeEvent insertEvent5 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("5"),
BinaryStringData.fromString("5")
}));
split3.add(insertEvent5);
DataChangeEvent insertEvent6 =
DataChangeEvent.insertEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("6"),
BinaryStringData.fromString("6")
}));
split3.add(insertEvent6);
eventOfSplits.add(split3);
// create slit4
List<Event> split4 = new ArrayList<>();
split4.add(createTableEvent);
DataChangeEvent deleteEvent1 =
DataChangeEvent.deleteEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
split4.add(deleteEvent1);
DataChangeEvent deleteEvent2 =
DataChangeEvent.deleteEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("4"),
BinaryStringData.fromString("4")
}));
split4.add(deleteEvent2);
DataChangeEvent deleteEvent3 =
DataChangeEvent.deleteEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("6"),
BinaryStringData.fromString("6")
}));
split4.add(deleteEvent3);
AddColumnEvent.ColumnWithPosition columnWithPosition =
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col3", DataTypes.STRING()));
AddColumnEvent addColumnEvent =
new AddColumnEvent(table1, Collections.singletonList(columnWithPosition));
split4.add(addColumnEvent);
generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent updateEvent1 =
DataChangeEvent.updateEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1"),
BinaryStringData.fromString("")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1"),
BinaryStringData.fromString("x")
}));
split4.add(updateEvent1);
DataChangeEvent updateEvent2 =
DataChangeEvent.updateEvent(
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3"),
BinaryStringData.fromString("")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3"),
BinaryStringData.fromString("x")
}));
split4.add(updateEvent2);
eventOfSplits.add(split4);
return eventOfSplits;
}
}

@ -26,13 +26,14 @@ import static com.ververica.cdc.common.configuration.description.TextElement.tex
/** Configurations for {@link ValuesDataSource}. */
public class ValuesDataSourceOptions {
public static final ConfigOption<ValuesDataSourceHelper.SourceEventType> SOURCE_EVENT_TYPE =
ConfigOptions.key("source.event.type")
.enumType(ValuesDataSourceHelper.SourceEventType.class)
.defaultValue(ValuesDataSourceHelper.SourceEventType.SINGLE_SPLIT_SINGLE_TABLE)
public static final ConfigOption<ValuesDataSourceHelper.EventSetId> EVENT_SET_ID =
ConfigOptions.key("event-set.id")
.enumType(ValuesDataSourceHelper.EventSetId.class)
.defaultValue(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE)
.withDescription(
Description.builder()
.text("Type of creating source change events. ")
.text(
"Id for creating source change events from ValuesDataSourceHelper.EventSetId.")
.linebreak()
.add(
ListElement.list(
@ -40,6 +41,8 @@ public class ValuesDataSourceOptions {
"SINGLE_SPLIT_SINGLE_TABLE: Default and predetermined case. Creating schema changes of single table and put them into one split."),
text(
"SINGLE_SPLIT_MULTI_TABLES: A predetermined case. Creating schema changes of multiple tables and put them into one split."),
text(
"MULTI_SPLITS_SINGLE_TABLE: A predetermined case. Creating schema changes of single table and put them into multiple splits."),
text(
"CUSTOM_SOURCE_EVENTS: Passed change events by the user through calling `setSourceEvents` method.")))
.build());

@ -43,8 +43,11 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/** Integration tests for different enumeration situations of {@link ValuesDataSourceHelper}. */
public class ValuesDataSourceHelperTest {
/**
* Integration tests for {@link ValuesDataSource} in different enumeration situations of {@link
* ValuesDataSourceHelper}.
*/
public class ValuesDataSourceITCase {
@Before
public void before() {
@ -56,17 +59,13 @@ public class ValuesDataSourceHelperTest {
ValuesDatabase.clear();
}
@Test
public void testSingleSplitSingleTable() throws Exception {
/** read Events from {@link ValuesDataSource} and apply the events to ValuesDatabase. */
private void executeDataStreamJob(ValuesDataSourceHelper.EventSetId type) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new ValuesDataSource(
ValuesDataSourceHelper.SourceEventType
.SINGLE_SPLIT_SINGLE_TABLE)
.getEventSourceProvider();
(FlinkSourceProvider) new ValuesDataSource(type).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
@ -82,7 +81,11 @@ public class ValuesDataSourceHelperTest {
ValuesDatabase.applySchemaChangeEvent((SchemaChangeEvent) event);
}
});
}
@Test
public void testSingleSplitSingleTable() throws Exception {
executeDataStreamJob(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE);
List<String> results = new ArrayList<>();
results.add("default.default.table1:col1=2;newCol3=x");
results.add("default.default.table1:col1=3;newCol3=");
@ -92,31 +95,7 @@ public class ValuesDataSourceHelperTest {
@Test
public void testSingleSplitMultiTables() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new ValuesDataSource(
ValuesDataSourceHelper.SourceEventType
.SINGLE_SPLIT_MULTI_TABLES)
.getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
ValuesDataFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
events.forEachRemaining(
(event) -> {
if (event instanceof DataChangeEvent) {
ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event);
} else if (event instanceof SchemaChangeEvent) {
ValuesDatabase.applySchemaChangeEvent((SchemaChangeEvent) event);
}
});
executeDataStreamJob(ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
List<String> results = new ArrayList<>();
results.add("default.default.table1:col1=2;newCol3=x");
results.add("default.default.table1:col1=3;newCol3=");
@ -133,10 +112,6 @@ public class ValuesDataSourceHelperTest {
@Test
public void testCustomSourceEvents() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.setRestartStrategy(RestartStrategies.noRestart());
List<List<Event>> splits = new ArrayList<>();
List<Event> split1 = new ArrayList<>();
TableId table1 = TableId.tableId("default", "default", "table1");
@ -146,11 +121,10 @@ public class ValuesDataSourceHelperTest {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
split1.add(createTableEvent);
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
split1.add(createTableEvent);
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
@ -171,27 +145,7 @@ public class ValuesDataSourceHelperTest {
split1.add(insertEvent2);
splits.add(split1);
ValuesDataSourceHelper.setSourceEvents(splits);
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new ValuesDataSource(
ValuesDataSourceHelper.SourceEventType.CUSTOM_SOURCE_EVENTS)
.getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
ValuesDataFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
events.forEachRemaining(
(event) -> {
if (event instanceof DataChangeEvent) {
ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event);
} else if (event instanceof SchemaChangeEvent) {
ValuesDatabase.applySchemaChangeEvent((SchemaChangeEvent) event);
}
});
executeDataStreamJob(ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
List<String> results = new ArrayList<>();
results.add("default.default.table1:col1=1;col2=1");
@ -199,4 +153,15 @@ public class ValuesDataSourceHelperTest {
Assert.assertEquals(
results, ValuesDatabase.getResults(TableId.parse("default.default.table1")));
}
@Test
public void testMultiSplitsSingleTable() throws Exception {
executeDataStreamJob(ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE);
List<String> results = new ArrayList<>();
results.add("default.default.table1:col1=1;col2=1;col3=x");
results.add("default.default.table1:col1=3;col2=3;col3=x");
results.add("default.default.table1:col1=5;col2=5;col3=");
Assert.assertEquals(
results, ValuesDatabase.getResults(TableId.parse("default.default.table1")));
}
}
Loading…
Cancel
Save