[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile for thread safe consideration

This closes #3556.
pull/3559/head
Hongshun Wang 5 months ago committed by GitHub
parent 7f08c6caba
commit d3473de4db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -97,7 +97,8 @@ public class FlinkPipelineComposer implements PipelineComposer {
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();

@ -23,7 +23,6 @@ import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
@ -41,12 +40,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataSourceTranslator {
public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
SourceDef sourceDef,
StreamExecutionEnvironment env,
Configuration pipelineConfig,
int sourceParallelism) {
// Create data source
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);
// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source

@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final Set<Integer> flushedSinkWriters;
/** Status of the execution of current schema change request. */
private boolean isSchemaChangeApplying;
private volatile boolean isSchemaChangeApplying;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;

Loading…
Cancel
Save