From d4ed7db8dca18906bb8a1d7738cb158da0222bc3 Mon Sep 17 00:00:00 2001 From: Kunni Date: Fri, 19 Apr 2024 15:37:35 +0800 Subject: [PATCH] [FLINK-35127][cdc][values] Remove HybridSource from ValuesSource to avoid CI failure. (#3237) --- .../cdc/connectors/values/source/ValuesDataSource.java | 7 +------ .../connectors/values/source/ValuesDataSourceHelper.java | 8 +++++++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java index 88a723fce..61699a5d0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java @@ -35,7 +35,6 @@ import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.source.MetadataAccessor; import org.apache.flink.cdc.connectors.values.ValuesDatabase; -import org.apache.flink.connector.base.source.hybrid.HybridSource; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -73,11 +72,7 @@ public class ValuesDataSource implements DataSource { @Override public EventSourceProvider getEventSourceProvider() { ValuesDataSourceHelper.setSourceEvents(eventSetId); - HybridSource hybridSource = - HybridSource.builder(new ValuesSource(failAtPos, eventSetId, true)) - .addSource(new ValuesSource(failAtPos, eventSetId, false)) - .build(); - return FlinkSourceProvider.of(hybridSource); + return FlinkSourceProvider.of(new ValuesSource(failAtPos, eventSetId, false)); } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java index 431cbaf17..24ba78165 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java @@ -73,7 +73,13 @@ public class ValuesDataSourceHelper { // use default enum of SINGLE_SPLIT_SINGLE_TABLE sourceEvents = singleSplitSingleTable(); } - return sourceEvents; + // put all events into one list to avoid CI failure and make sure that SchemaChangeEvent are + // sent in order. + List mergeEvents = new ArrayList<>(); + for (List events : sourceEvents) { + mergeEvents.addAll(events); + } + return Collections.singletonList(mergeEvents); } /** set sourceEvents using custom events. */