[FLINK-35127][cdc][values] Remove HybridSource from ValuesSource to avoid CI failure. (#3237)

pull/3224/head
Kunni 10 months ago committed by GitHub
parent ef9491b78c
commit d4ed7db8dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -35,7 +35,6 @@ import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.source.MetadataAccessor;
import org.apache.flink.cdc.connectors.values.ValuesDatabase;
import org.apache.flink.connector.base.source.hybrid.HybridSource;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@ -73,11 +72,7 @@ public class ValuesDataSource implements DataSource {
@Override
public EventSourceProvider getEventSourceProvider() {
ValuesDataSourceHelper.setSourceEvents(eventSetId);
HybridSource<Event> hybridSource =
HybridSource.builder(new ValuesSource(failAtPos, eventSetId, true))
.addSource(new ValuesSource(failAtPos, eventSetId, false))
.build();
return FlinkSourceProvider.of(hybridSource);
return FlinkSourceProvider.of(new ValuesSource(failAtPos, eventSetId, false));
}
@Override

@ -73,7 +73,13 @@ public class ValuesDataSourceHelper {
// use default enum of SINGLE_SPLIT_SINGLE_TABLE
sourceEvents = singleSplitSingleTable();
}
return sourceEvents;
// put all events into one list to avoid CI failure and make sure that SchemaChangeEvent are
// sent in order.
List<Event> mergeEvents = new ArrayList<>();
for (List<Event> events : sourceEvents) {
mergeEvents.addAll(events);
}
return Collections.singletonList(mergeEvents);
}
/** set sourceEvents using custom events. */

Loading…
Cancel
Save