From ace6080c935af07dcd530811bb8f8e1b4be2bf05 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Wed, 22 Nov 2023 14:52:10 +0800 Subject: [PATCH] [3.0][cdc-runtime] Provide SchemaOperator and SchemaRegistry to handle schema changes (#2685) --- .../cdc/common/event/FlushEvent.java | 3 +- .../operators/schema/SchemaOperator.java | 97 ++++++++- .../schema/SchemaOperatorFactory.java | 20 +- .../SchemaOperatorCoordinator.java | 89 -------- .../schema/coordinator/SchemaRegistry.java | 166 +++++++++++++++ ...vider.java => SchemaRegistryProvider.java} | 27 ++- .../SchemaRegistryRequestHandler.java | 194 ++++++++++++++++++ .../schema/event/FlushSuccessEvent.java | 27 ++- ...Event.java => ReleaseUpstreamRequest.java} | 13 +- .../ReleaseUpstreamResponse.java} | 18 +- .../schema/event/SchemaChangeRequest.java | 46 ++++- .../schema/event/SchemaChangeResponse.java | 37 +++- .../schema/event/SinkWriterRegisterEvent.java | 23 ++- .../operators/sink/SchemaEvolutionClient.java | 10 +- 14 files changed, 640 insertions(+), 130 deletions(-) delete mode 100644 flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinator.java create mode 100644 flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java rename flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/{SchemaOperatorCoordinatorProvider.java => SchemaRegistryProvider.java} (55%) create mode 100644 flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java rename flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/{ReleaseUpstreamEvent.java => ReleaseUpstreamRequest.java} (69%) rename flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/{coordinator/SchemaManager.java => event/ReleaseUpstreamResponse.java} (53%) diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/FlushEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/FlushEvent.java index bcbbd8f89..11daecb0b 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/FlushEvent.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/FlushEvent.java @@ -24,8 +24,7 @@ import java.util.Objects; */ public class FlushEvent implements Event { - private static final long serialVersionUID = 1L; - + /** The schema changes from which table. */ private final TableId tableId; public FlushEvent(TableId tableId) { diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperator.java index 6e08d54ab..6f5cc1c3a 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperator.java @@ -16,20 +16,36 @@ package com.ververica.cdc.runtime.operators.schema; +import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.SerializedValue; +import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.FlushEvent; import com.ververica.cdc.common.event.SchemaChangeEvent; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; +import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; +import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest; +import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; + /** - * The operator to evolve schemas to {@link SchemaOperatorCoordinator} for incoming {@link + * The operator will evolve schemas in {@link SchemaRegistry} for incoming {@link * SchemaChangeEvent}s and block the stream for tables before their schema changes finish. */ +@Internal public class SchemaOperator extends AbstractStreamOperator implements OneInputStreamOperator { @@ -37,6 +53,81 @@ public class SchemaOperator extends AbstractStreamOperator private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class); + private transient TaskOperatorEventGateway toCoordinator; + + private final int partitionChannelNum; + + public SchemaOperator(int partitionChannelNum) { + this.partitionChannelNum = partitionChannelNum; + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output> output) { + super.setup(containingTask, config, output); + this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway(); + } + + /** + * This method is guaranteed to not be called concurrently with other methods of the operator. + */ @Override - public void processElement(StreamRecord streamRecord) throws Exception {} + public void processElement(StreamRecord streamRecord) throws Exception { + Event event = streamRecord.getValue(); + if (event instanceof SchemaChangeEvent) { + TableId tableId = ((SchemaChangeEvent) event).tableId(); + LOG.info( + "Table {} received SchemaChangeEvent and start to be blocked.", + tableId.toString()); + handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event); + return; + } + output.collect(streamRecord); + } + + // ---------------------------------------------------------------------------------- + + private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) { + // The request will need to send a FlushEvent or block until flushing finished + SchemaChangeResponse response = + (SchemaChangeResponse) requestSchemaChange(tableId, schemaChangeEvent); + if (response.isShouldSendFlushEvent()) { + LOG.info( + "Sending the FlushEvent for table {} in subtask {}.", + tableId, + getRuntimeContext().getIndexOfThisSubtask()); + broadcastEvents(new FlushEvent(tableId)); + // The request will block until flushing finished in each sink writer + requestReleaseUpstream(); + } + broadcastEvents(schemaChangeEvent); + } + + // TODO + private void broadcastEvents(Event event) { + output.collect(new StreamRecord<>(event)); + } + + private CoordinationResponse requestSchemaChange( + TableId tableId, SchemaChangeEvent schemaChangeEvent) { + return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); + } + + private CoordinationResponse requestReleaseUpstream() { + return sendRequestToCoordinator(new ReleaseUpstreamRequest()); + } + + private CoordinationResponse sendRequestToCoordinator(CoordinationRequest request) { + try { + CompletableFuture responseFuture = + toCoordinator.sendRequestToCoordinator( + getOperatorID(), new SerializedValue<>(request)); + return responseFuture.get(); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to send request to coordinator: " + request.toString(), e); + } + } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorFactory.java index 8eddaf867..993b14b6f 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -21,9 +21,14 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperator; import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.sink.MetadataApplier; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider; + +import java.util.List; +import java.util.Map; /** Factory to create {@link SchemaOperator}. */ public class SchemaOperatorFactory extends SimpleOperatorFactory @@ -31,12 +36,17 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory private static final long serialVersionUID = 1L; - protected SchemaOperatorFactory(StreamOperator operator) { - super(operator); + private final Map> metadataAppliers; + + public SchemaOperatorFactory( + int partitionChannelNum, Map> metadataAppliers) { + super(new SchemaOperator(partitionChannelNum)); + this.metadataAppliers = metadataAppliers; } @Override - public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { - return null; + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new SchemaRegistryProvider(operatorID, operatorName, metadataAppliers); } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinator.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinator.java deleted file mode 100644 index 15d604e98..000000000 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinator.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.runtime.operators.schema.coordinator; - -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; -import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; - -import com.ververica.cdc.runtime.operators.schema.SchemaOperator; -import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent; -import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.util.concurrent.CompletableFuture; - -/** - * The implementation of the {@link OperatorCoordinator} for the {@link SchemaOperator}. - * - *

The SchemaOperatorCoordinator provides an event loop style thread model to - * interact with the Flink runtime. The coordinator ensures that all the state manipulations are - * made by its event loop thread. - * - *

This coordinator is responsible for: - * - *

    - *
  • Apply schema changes when receiving the {@link SchemaChangeRequest} from {@link - * SchemaOperator} - *
  • Notify {@link SchemaOperator} to continue to push data for the table after receiving {@link - * FlushSuccessEvent} from its registered sink writer - *
- */ -public class SchemaOperatorCoordinator implements OperatorCoordinator, CoordinationRequestHandler { - - private static final Logger LOG = LoggerFactory.getLogger(SchemaOperatorCoordinator.class); - - @Override - public CompletableFuture handleCoordinationRequest( - CoordinationRequest coordinationRequest) { - return null; - } - - @Override - public void start() throws Exception {} - - @Override - public void close() throws Exception {} - - @Override - public void handleEventFromOperator(int i, int i1, OperatorEvent operatorEvent) - throws Exception {} - - @Override - public void checkpointCoordinator(long l, CompletableFuture completableFuture) - throws Exception {} - - @Override - public void notifyCheckpointComplete(long l) {} - - @Override - public void resetToCheckpoint(long l, @Nullable byte[] bytes) throws Exception {} - - @Override - public void subtaskReset(int i, long l) {} - - @Override - public void executionAttemptFailed(int i, int i1, @Nullable Throwable throwable) {} - - @Override - public void executionAttemptReady(int i, int i1, SubtaskGateway subtaskGateway) {} -} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java new file mode 100644 index 000000000..39d266681 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -0,0 +1,166 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.operators.schema.coordinator; + +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.FlinkException; + +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.sink.MetadataApplier; +import com.ververica.cdc.runtime.operators.schema.SchemaOperator; +import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent; +import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; +import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest; +import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * The implementation of the {@link OperatorCoordinator} for the {@link SchemaOperator}. + * + *

The SchemaRegister provides an event loop style thread model to interact with the + * Flink runtime. The coordinator ensures that all the state manipulations are made by its event + * loop thread. + * + *

This SchemaRegister is responsible for: + * + *

    + *
  • Apply schema changes when receiving the {@link SchemaChangeRequest} from {@link + * SchemaOperator} + *
  • Notify {@link SchemaOperator} to continue to push data for the table after receiving {@link + * FlushSuccessEvent} from its registered sink writer + *
+ */ +public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestHandler { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class); + + /** The context of the coordinator. */ + private final OperatorCoordinator.Context context; + /** The name of the operator this SchemaOperatorCoordinator is associated with. */ + private final String operatorName; + + /** + * Tracks the subtask failed reason to throw a more meaningful exception in {@link + * #subtaskReset}. + */ + private final Map failedReasons; + + /** The request handler that handle all requests and events. */ + private final SchemaRegistryRequestHandler requestHandler; + + public SchemaRegistry( + String operatorName, + OperatorCoordinator.Context context, + Map> metadataAppliers) { + this.context = context; + this.operatorName = operatorName; + this.failedReasons = new HashMap<>(); + this.requestHandler = new SchemaRegistryRequestHandler(metadataAppliers); + } + + @Override + public void start() throws Exception { + LOG.info("Starting SchemaRegistry for {}.", operatorName); + this.failedReasons.clear(); + LOG.info("Started SchemaRegistry for {}.", operatorName); + } + + @Override + public void close() throws Exception { + LOG.info("SchemaRegistry for {} closed.", operatorName); + } + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) + throws Exception { + if (event instanceof FlushSuccessEvent) { + FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; + LOG.info( + "Sink subtask {} succeed flushing for table {}.", + flushSuccessEvent.getSubtask(), + flushSuccessEvent.getTableId().toString()); + requestHandler.flushSuccess( + flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); + } else if (event instanceof SinkWriterRegisterEvent) { + requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); + } else { + throw new FlinkException("Unrecognized Operator Event: " + event); + } + } + + @Override + public void checkpointCoordinator(long checkpointId, CompletableFuture resultFuture) + throws Exception { + // do nothing + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // do nothing + } + + @Override + public CompletableFuture handleCoordinationRequest( + CoordinationRequest request) { + if (request instanceof SchemaChangeRequest) { + SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; + return requestHandler.handleSchemaChangeRequest(schemaChangeRequest); + } else if (request instanceof ReleaseUpstreamRequest) { + return requestHandler.handleReleaseUpstreamRequest((ReleaseUpstreamRequest) request); + } else { + throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request); + } + } + + @Override + public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) + throws Exception { + // do nothing + } + + @Override + public void subtaskReset(int subtask, long checkpointId) { + Throwable rootCause = failedReasons.get(subtask); + LOG.error( + String.format("Subtask %d reset at checkpoint %d.", subtask, checkpointId), + rootCause); + } + + @Override + public void executionAttemptFailed( + int subtask, int attemptNumber, @Nullable Throwable throwable) { + failedReasons.put(subtask, throwable); + } + + @Override + public void executionAttemptReady( + int subtask, int attemptNumber, SubtaskGateway subtaskGateway) { + // do nothing + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinatorProvider.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java similarity index 55% rename from flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinatorProvider.java rename to flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java index 246c95cee..f3242038a 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaOperatorCoordinatorProvider.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java @@ -19,17 +19,36 @@ package com.ververica.cdc.runtime.operators.schema.coordinator; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; -/** Provider of {@link SchemaOperatorCoordinator}. */ -public class SchemaOperatorCoordinatorProvider implements OperatorCoordinator.Provider { +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.sink.MetadataApplier; + +import java.util.List; +import java.util.Map; + +/** Provider of {@link SchemaRegistry}. */ +public class SchemaRegistryProvider implements OperatorCoordinator.Provider { private static final long serialVersionUID = 1L; + private final OperatorID operatorID; + private final String operatorName; + private final Map> metadataAppliers; + + public SchemaRegistryProvider( + OperatorID operatorID, + String operatorName, + Map> metadataAppliers) { + this.operatorID = operatorID; + this.operatorName = operatorName; + this.metadataAppliers = metadataAppliers; + } + @Override public OperatorID getOperatorId() { - return null; + return operatorID; } @Override public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception { - return null; + return new SchemaRegistry(operatorName, context, metadataAppliers); } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java new file mode 100644 index 000000000..6ba5cb0c5 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -0,0 +1,194 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.operators.schema.coordinator; + +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.event.SchemaChangeEvent; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.sink.MetadataApplier; +import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; +import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; +import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest; +import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** A handler to deal with all requests and events for {@link SchemaRegistry}. */ +@Internal +@NotThreadSafe +public class SchemaRegistryRequestHandler { + private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class); + + /** The {@link MetadataApplier}s for every table. */ + private final Map> metadataAppliers; + /** All active sink writers. */ + private final Set activeSinkWriters; + + /** + * Not applied SchemaChangeRequest's future before receiving all flush success events for its + * table from sink writers. + */ + private PendingSchemaChange waitFlushSuccess; + /** + * Not applied SchemaChangeRequest before receiving all flush success events for its table from + * sink writers. + */ + private final List pendingSchemaChanges; + /** Sink writers which have sent flush success events for the request. */ + private final Set flushedSinkWriters; + + public SchemaRegistryRequestHandler(Map> metadataAppliers) { + this.metadataAppliers = metadataAppliers; + this.activeSinkWriters = new HashSet<>(); + this.flushedSinkWriters = new HashSet<>(); + this.pendingSchemaChanges = new LinkedList<>(); + } + + /** + * Apply the schema change to the external system. + * + * @param tableId the table need to change schema + * @param changeEvent the schema change + */ + private void applySchemaChange(TableId tableId, SchemaChangeEvent changeEvent) { + List appliers = metadataAppliers.get(tableId); + if (appliers == null || appliers.isEmpty()) { + LOG.warn("There is no MetadataApplier for table {}.", tableId); + throw new UnsupportedOperationException( + "Cannot find a metadata applier for the table changes in table " + + tableId.toString()); + } + + for (MetadataApplier applier : appliers) { + LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId); + applier.applySchemaChange(changeEvent); + } + } + + /** + * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing. + * + * @param request the received SchemaChangeRequest + */ + public CompletableFuture handleSchemaChangeRequest( + SchemaChangeRequest request) { + CompletableFuture response; + if (pendingSchemaChanges.isEmpty() && waitFlushSuccess == null) { + LOG.info( + "Received schema change event request from table {}. Start to pend requests for others.", + request.getTableId().toString()); + // TODO : skip flushing and please not put it to pendingSchemaChanges + response = CompletableFuture.completedFuture(new SchemaChangeResponse(true)); + pendingSchemaChanges.add(new PendingSchemaChange(request, response)); + } else { + LOG.info("There are already processing requests. Wait for processing."); + response = new CompletableFuture<>(); + pendingSchemaChanges.add(new PendingSchemaChange(request, response)); + } + return response; + } + + /** + * Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing. + * + * @param request the received SchemaChangeRequest + */ + public CompletableFuture handleReleaseUpstreamRequest( + ReleaseUpstreamRequest request) { + this.waitFlushSuccess = pendingSchemaChanges.remove(0).startToWaitForFlushSuccess(); + return waitFlushSuccess.getResponseFuture(); + } + + /** + * Register a sink subtask. + * + * @param sinkSubtask the sink subtask to register + */ + public void registerSinkWriter(int sinkSubtask) { + LOG.info("Register sink subtask {}.", sinkSubtask); + activeSinkWriters.add(sinkSubtask); + } + + /** + * Record flushed sink subtasks after receiving FlushSuccessEvent. + * + * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about + * @param sinkSubtask the sink subtask succeed flushing + */ + public void flushSuccess(TableId tableId, int sinkSubtask) { + flushedSinkWriters.add(sinkSubtask); + if (flushedSinkWriters.equals(activeSinkWriters)) { + LOG.info( + "All sink subtask have flushed for table {}. Start to apply schema change.", + tableId.toString()); + applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent()); + waitFlushSuccess.getResponseFuture().complete(new ReleaseUpstreamResponse()); + startNextSchemaChangeRequest(); + } + } + + private void startNextSchemaChangeRequest() { + flushedSinkWriters.clear(); + waitFlushSuccess = null; + if (!pendingSchemaChanges.isEmpty()) { + // TODO : if no need to flush, remove it from pendingSchemaChanges + pendingSchemaChanges + .get(0) + .getResponseFuture() + .complete(new SchemaChangeResponse(true)); + } + } + + class PendingSchemaChange { + private final SchemaChangeRequest changeRequest; + private final CompletableFuture responseFuture; + + public PendingSchemaChange( + SchemaChangeRequest changeRequest, + CompletableFuture responseFuture) { + this.changeRequest = changeRequest; + this.responseFuture = responseFuture; + } + + public SchemaChangeRequest getChangeRequest() { + return changeRequest; + } + + public CompletableFuture getResponseFuture() { + return responseFuture; + } + + public PendingSchemaChange startToWaitForFlushSuccess() { + if (!responseFuture.isDone()) { + throw new IllegalStateException( + "Cannot start to wait for flush success before the SchemaChangeRequest is done."); + } + return new PendingSchemaChange(changeRequest, new CompletableFuture<>()); + } + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java index e5d09fc09..34c806b9b 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/FlushSuccessEvent.java @@ -19,17 +19,21 @@ package com.ververica.cdc.runtime.operators.schema.event; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import com.ververica.cdc.common.event.TableId; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; + +import java.util.Objects; /** - * A {@link OperatorEvent} from sink writer to notify {@link SchemaOperatorCoordinator} that it - * finished flushing. + * A {@link OperatorEvent} from sink writer to notify {@link SchemaRegistry} that it finished + * flushing. */ public class FlushSuccessEvent implements OperatorEvent { private static final long serialVersionUID = 1L; + /** The sink subtask finished flushing. */ private final int subtask; + /** The schema changes from which table is executing it. */ private final TableId tableId; public FlushSuccessEvent(int subtask, TableId tableId) { @@ -44,4 +48,21 @@ public class FlushSuccessEvent implements OperatorEvent { public TableId getTableId() { return tableId; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof FlushSuccessEvent)) { + return false; + } + FlushSuccessEvent that = (FlushSuccessEvent) o; + return subtask == that.subtask && Objects.equals(tableId, that.tableId); + } + + @Override + public int hashCode() { + return Objects.hash(subtask, tableId); + } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamEvent.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java similarity index 69% rename from flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamEvent.java rename to flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java index e5c2aafd8..32c80d7eb 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamEvent.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamRequest.java @@ -16,16 +16,17 @@ package com.ververica.cdc.runtime.operators.schema.event; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import com.ververica.cdc.common.event.FlushEvent; import com.ververica.cdc.runtime.operators.schema.SchemaOperator; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; /** - * A {@link OperatorEvent} from {@link SchemaOperatorCoordinator} to notify {@link SchemaOperator} - * that the coordinator has finished schema change and the {@link SchemaOperator} should continue to - * push data. + * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to release upstream + * after sending {@link FlushEvent}. */ -public class ReleaseUpstreamEvent implements OperatorEvent { +public class ReleaseUpstreamRequest implements CoordinationRequest { + private static final long serialVersionUID = 1L; } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManager.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java similarity index 53% rename from flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManager.java rename to flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java index 35aae30b5..fd2e49672 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManager.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java @@ -14,12 +14,18 @@ * limitations under the License. */ -package com.ververica.cdc.runtime.operators.schema.coordinator; +package com.ververica.cdc.runtime.operators.schema.event; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; -/** A manager to manages schema changes. */ -public class SchemaManager { - private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); +import com.ververica.cdc.runtime.operators.schema.SchemaOperator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; + +/** + * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link + * SchemaOperator}. + */ +public class ReleaseUpstreamResponse implements CoordinationResponse { + + private static final long serialVersionUID = 1L; } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeRequest.java index cdac966b5..f04accfd7 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeRequest.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeRequest.java @@ -18,13 +18,53 @@ package com.ververica.cdc.runtime.operators.schema.event; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import com.ververica.cdc.common.event.SchemaChangeEvent; +import com.ververica.cdc.common.event.TableId; import com.ververica.cdc.runtime.operators.schema.SchemaOperator; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; + +import java.util.Objects; /** - * The request from {@link SchemaOperator} to {@link SchemaOperatorCoordinator} to request to change - * schemas. + * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to change schemas. */ public class SchemaChangeRequest implements CoordinationRequest { + private static final long serialVersionUID = 1L; + + /** The sender of the request. */ + private final TableId tableId; + /** The schema changes. */ + private final SchemaChangeEvent schemaChangeEvent; + + public SchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) { + this.tableId = tableId; + this.schemaChangeEvent = schemaChangeEvent; + } + + public TableId getTableId() { + return tableId; + } + + public SchemaChangeEvent getSchemaChangeEvent() { + return schemaChangeEvent; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SchemaChangeRequest)) { + return false; + } + SchemaChangeRequest that = (SchemaChangeRequest) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(schemaChangeEvent, that.schemaChangeEvent); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, schemaChangeEvent); + } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeResponse.java index 23e8de413..8c78e097a 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SchemaChangeResponse.java @@ -19,12 +19,45 @@ package com.ververica.cdc.runtime.operators.schema.event; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import com.ververica.cdc.runtime.operators.schema.SchemaOperator; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; + +import java.util.Objects; /** - * The response for {@link SchemaChangeRequest} from {@link SchemaOperatorCoordinator} to {@link + * The response for {@link SchemaChangeRequest} from {@link SchemaRegistry} to {@link * SchemaOperator}. */ public class SchemaChangeResponse implements CoordinationResponse { private static final long serialVersionUID = 1L; + + /** + * Whether the SchemaOperator need to buffer data and the SchemaOperatorCoordinator need to wait + * for flushing. + */ + private final boolean shouldSendFlushEvent; + + public SchemaChangeResponse(boolean shouldSendFlushEvent) { + this.shouldSendFlushEvent = shouldSendFlushEvent; + } + + public boolean isShouldSendFlushEvent() { + return shouldSendFlushEvent; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SchemaChangeResponse)) { + return false; + } + SchemaChangeResponse response = (SchemaChangeResponse) o; + return shouldSendFlushEvent == response.shouldSendFlushEvent; + } + + @Override + public int hashCode() { + return Objects.hash(shouldSendFlushEvent); + } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java index 7e60081bf..7cadab175 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/schema/event/SinkWriterRegisterEvent.java @@ -18,9 +18,11 @@ package com.ververica.cdc.runtime.operators.schema.event; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -/** A {@link OperatorEvent} that register sink writer to {@link SchemaOperatorCoordinator}. */ +import java.util.Objects; + +/** A {@link OperatorEvent} that register sink writer to {@link SchemaRegistry}. */ public class SinkWriterRegisterEvent implements OperatorEvent { private static final long serialVersionUID = 1L; @@ -34,4 +36,21 @@ public class SinkWriterRegisterEvent implements OperatorEvent { public int getSubtask() { return subtask; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SinkWriterRegisterEvent)) { + return false; + } + SinkWriterRegisterEvent that = (SinkWriterRegisterEvent) o; + return subtask == that.subtask; + } + + @Override + public int hashCode() { + return Objects.hash(subtask); + } } diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java index b4d13f669..b4682f0e6 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/sink/SchemaEvolutionClient.java @@ -23,15 +23,15 @@ import org.apache.flink.util.SerializedValue; import com.ververica.cdc.common.event.TableId; import com.ververica.cdc.runtime.operators.schema.SchemaOperator; -import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator; +import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent; import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent; import java.io.IOException; /** - * Client for {@link DataSinkWriterOperator} interact with {@link SchemaOperatorCoordinator} when - * table schema evolution happened. + * Client for {@link DataSinkWriterOperator} interact with {@link SchemaRegistry} when table schema + * evolution happened. */ public class SchemaEvolutionClient { @@ -46,13 +46,13 @@ public class SchemaEvolutionClient { this.schemaOperatorID = schemaOperatorID; } - /** send {@link SinkWriterRegisterEvent} to {@link SchemaOperatorCoordinator}. */ + /** send {@link SinkWriterRegisterEvent} to {@link SchemaRegistry}. */ public void registerSubtask(int subtask) throws IOException { toCoordinator.sendOperatorEventToCoordinator( schemaOperatorID, new SerializedValue<>(new SinkWriterRegisterEvent(subtask))); } - /** send {@link FlushSuccessEvent} to {@link SchemaOperatorCoordinator}. */ + /** send {@link FlushSuccessEvent} to {@link SchemaRegistry}. */ public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException { toCoordinator.sendOperatorEventToCoordinator( schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));