[3.0][cdc-runtime] Provide SchemaOperator and SchemaRegistry to handle schema changes (#2685)

pull/2732/head
Hang Ruan 1 year ago committed by GitHub
parent 4770f360d0
commit ace6080c93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,8 +24,7 @@ import java.util.Objects;
*/
public class FlushEvent implements Event {
private static final long serialVersionUID = 1L;
/** The schema changes from which table. */
private final TableId tableId;
public FlushEvent(TableId tableId) {

@ -16,20 +16,36 @@
package com.ververica.cdc.runtime.operators.schema;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.SerializedValue;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
/**
* The operator to evolve schemas to {@link SchemaOperatorCoordinator} for incoming {@link
* The operator will evolve schemas in {@link SchemaRegistry} for incoming {@link
* SchemaChangeEvent}s and block the stream for tables before their schema changes finish.
*/
@Internal
public class SchemaOperator extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
@ -37,6 +53,81 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private static final Logger LOG = LoggerFactory.getLogger(SchemaOperator.class);
private transient TaskOperatorEventGateway toCoordinator;
private final int partitionChannelNum;
public SchemaOperator(int partitionChannelNum) {
this.partitionChannelNum = partitionChannelNum;
}
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Event>> output) {
super.setup(containingTask, config, output);
this.toCoordinator = containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
}
/**
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
@Override
public void processElement(StreamRecord<Event> streamRecord) throws Exception {}
public void processElement(StreamRecord<Event> streamRecord) throws Exception {
Event event = streamRecord.getValue();
if (event instanceof SchemaChangeEvent) {
TableId tableId = ((SchemaChangeEvent) event).tableId();
LOG.info(
"Table {} received SchemaChangeEvent and start to be blocked.",
tableId.toString());
handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
return;
}
output.collect(streamRecord);
}
// ----------------------------------------------------------------------------------
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing finished
SchemaChangeResponse response =
(SchemaChangeResponse) requestSchemaChange(tableId, schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
broadcastEvents(new FlushEvent(tableId));
// The request will block until flushing finished in each sink writer
requestReleaseUpstream();
}
broadcastEvents(schemaChangeEvent);
}
// TODO
private void broadcastEvents(Event event) {
output.collect(new StreamRecord<>(event));
}
private CoordinationResponse requestSchemaChange(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
}
private CoordinationResponse requestReleaseUpstream() {
return sendRequestToCoordinator(new ReleaseUpstreamRequest());
}
private CoordinationResponse sendRequestToCoordinator(CoordinationRequest request) {
try {
CompletableFuture<CoordinationResponse> responseFuture =
toCoordinator.sendRequestToCoordinator(
getOperatorID(), new SerializedValue<>(request));
return responseFuture.get();
} catch (Exception e) {
throw new IllegalStateException(
"Failed to send request to coordinator: " + request.toString(), e);
}
}
}

@ -21,9 +21,14 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
import java.util.List;
import java.util.Map;
/** Factory to create {@link SchemaOperator}. */
public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
@ -31,12 +36,17 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
private static final long serialVersionUID = 1L;
protected SchemaOperatorFactory(StreamOperator<Event> operator) {
super(operator);
private final Map<TableId, List<MetadataApplier>> metadataAppliers;
public SchemaOperatorFactory(
int partitionChannelNum, Map<TableId, List<MetadataApplier>> metadataAppliers) {
super(new SchemaOperator(partitionChannelNum));
this.metadataAppliers = metadataAppliers;
}
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return null;
public OperatorCoordinator.Provider getCoordinatorProvider(
String operatorName, OperatorID operatorID) {
return new SchemaRegistryProvider(operatorID, operatorName, metadataAppliers);
}
}

@ -1,89 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.concurrent.CompletableFuture;
/**
* The implementation of the {@link OperatorCoordinator} for the {@link SchemaOperator}.
*
* <p>The <code>SchemaOperatorCoordinator</code> provides an event loop style thread model to
* interact with the Flink runtime. The coordinator ensures that all the state manipulations are
* made by its event loop thread.
*
* <p>This coordinator is responsible for:
*
* <ul>
* <li>Apply schema changes when receiving the {@link SchemaChangeRequest} from {@link
* SchemaOperator}
* <li>Notify {@link SchemaOperator} to continue to push data for the table after receiving {@link
* FlushSuccessEvent} from its registered sink writer
* </ul>
*/
public class SchemaOperatorCoordinator implements OperatorCoordinator, CoordinationRequestHandler {
private static final Logger LOG = LoggerFactory.getLogger(SchemaOperatorCoordinator.class);
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest coordinationRequest) {
return null;
}
@Override
public void start() throws Exception {}
@Override
public void close() throws Exception {}
@Override
public void handleEventFromOperator(int i, int i1, OperatorEvent operatorEvent)
throws Exception {}
@Override
public void checkpointCoordinator(long l, CompletableFuture<byte[]> completableFuture)
throws Exception {}
@Override
public void notifyCheckpointComplete(long l) {}
@Override
public void resetToCheckpoint(long l, @Nullable byte[] bytes) throws Exception {}
@Override
public void subtaskReset(int i, long l) {}
@Override
public void executionAttemptFailed(int i, int i1, @Nullable Throwable throwable) {}
@Override
public void executionAttemptReady(int i, int i1, SubtaskGateway subtaskGateway) {}
}

@ -0,0 +1,166 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FlinkException;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* The implementation of the {@link OperatorCoordinator} for the {@link SchemaOperator}.
*
* <p>The <code>SchemaRegister</code> provides an event loop style thread model to interact with the
* Flink runtime. The coordinator ensures that all the state manipulations are made by its event
* loop thread.
*
* <p>This <code>SchemaRegister</code> is responsible for:
*
* <ul>
* <li>Apply schema changes when receiving the {@link SchemaChangeRequest} from {@link
* SchemaOperator}
* <li>Notify {@link SchemaOperator} to continue to push data for the table after receiving {@link
* FlushSuccessEvent} from its registered sink writer
* </ul>
*/
public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestHandler {
private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class);
/** The context of the coordinator. */
private final OperatorCoordinator.Context context;
/** The name of the operator this SchemaOperatorCoordinator is associated with. */
private final String operatorName;
/**
* Tracks the subtask failed reason to throw a more meaningful exception in {@link
* #subtaskReset}.
*/
private final Map<Integer, Throwable> failedReasons;
/** The request handler that handle all requests and events. */
private final SchemaRegistryRequestHandler requestHandler;
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
Map<TableId, List<MetadataApplier>> metadataAppliers) {
this.context = context;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
this.requestHandler = new SchemaRegistryRequestHandler(metadataAppliers);
}
@Override
public void start() throws Exception {
LOG.info("Starting SchemaRegistry for {}.", operatorName);
this.failedReasons.clear();
LOG.info("Started SchemaRegistry for {}.", operatorName);
}
@Override
public void close() throws Exception {
LOG.info("SchemaRegistry for {} closed.", operatorName);
}
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
}
@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)
throws Exception {
// do nothing
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
// do nothing
}
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest request) {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
} else if (request instanceof ReleaseUpstreamRequest) {
return requestHandler.handleReleaseUpstreamRequest((ReleaseUpstreamRequest) request);
} else {
throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request);
}
}
@Override
public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData)
throws Exception {
// do nothing
}
@Override
public void subtaskReset(int subtask, long checkpointId) {
Throwable rootCause = failedReasons.get(subtask);
LOG.error(
String.format("Subtask %d reset at checkpoint %d.", subtask, checkpointId),
rootCause);
}
@Override
public void executionAttemptFailed(
int subtask, int attemptNumber, @Nullable Throwable throwable) {
failedReasons.put(subtask, throwable);
}
@Override
public void executionAttemptReady(
int subtask, int attemptNumber, SubtaskGateway subtaskGateway) {
// do nothing
}
}

@ -19,17 +19,36 @@ package com.ververica.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
/** Provider of {@link SchemaOperatorCoordinator}. */
public class SchemaOperatorCoordinatorProvider implements OperatorCoordinator.Provider {
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import java.util.List;
import java.util.Map;
/** Provider of {@link SchemaRegistry}. */
public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
private static final long serialVersionUID = 1L;
private final OperatorID operatorID;
private final String operatorName;
private final Map<TableId, List<MetadataApplier>> metadataAppliers;
public SchemaRegistryProvider(
OperatorID operatorID,
String operatorName,
Map<TableId, List<MetadataApplier>> metadataAppliers) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataAppliers = metadataAppliers;
}
@Override
public OperatorID getOperatorId() {
return null;
return operatorID;
}
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
return null;
return new SchemaRegistry(operatorName, context, metadataAppliers);
}
}

@ -0,0 +1,194 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/** A handler to deal with all requests and events for {@link SchemaRegistry}. */
@Internal
@NotThreadSafe
public class SchemaRegistryRequestHandler {
private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
/** The {@link MetadataApplier}s for every table. */
private final Map<TableId, List<MetadataApplier>> metadataAppliers;
/** All active sink writers. */
private final Set<Integer> activeSinkWriters;
/**
* Not applied SchemaChangeRequest's future before receiving all flush success events for its
* table from sink writers.
*/
private PendingSchemaChange waitFlushSuccess;
/**
* Not applied SchemaChangeRequest before receiving all flush success events for its table from
* sink writers.
*/
private final List<PendingSchemaChange> pendingSchemaChanges;
/** Sink writers which have sent flush success events for the request. */
private final Set<Integer> flushedSinkWriters;
public SchemaRegistryRequestHandler(Map<TableId, List<MetadataApplier>> metadataAppliers) {
this.metadataAppliers = metadataAppliers;
this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
}
/**
* Apply the schema change to the external system.
*
* @param tableId the table need to change schema
* @param changeEvent the schema change
*/
private void applySchemaChange(TableId tableId, SchemaChangeEvent changeEvent) {
List<MetadataApplier> appliers = metadataAppliers.get(tableId);
if (appliers == null || appliers.isEmpty()) {
LOG.warn("There is no MetadataApplier for table {}.", tableId);
throw new UnsupportedOperationException(
"Cannot find a metadata applier for the table changes in table "
+ tableId.toString());
}
for (MetadataApplier applier : appliers) {
LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId);
applier.applySchemaChange(changeEvent);
}
}
/**
* Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.
*
* @param request the received SchemaChangeRequest
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
CompletableFuture<CoordinationResponse> response;
if (pendingSchemaChanges.isEmpty() && waitFlushSuccess == null) {
LOG.info(
"Received schema change event request from table {}. Start to pend requests for others.",
request.getTableId().toString());
// TODO : skip flushing and please not put it to pendingSchemaChanges
response = CompletableFuture.completedFuture(new SchemaChangeResponse(true));
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
} else {
LOG.info("There are already processing requests. Wait for processing.");
response = new CompletableFuture<>();
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
}
return response;
}
/**
* Handle the {@link ReleaseUpstreamRequest} and wait for all sink subtasks flushing.
*
* @param request the received SchemaChangeRequest
*/
public CompletableFuture<CoordinationResponse> handleReleaseUpstreamRequest(
ReleaseUpstreamRequest request) {
this.waitFlushSuccess = pendingSchemaChanges.remove(0).startToWaitForFlushSuccess();
return waitFlushSuccess.getResponseFuture();
}
/**
* Register a sink subtask.
*
* @param sinkSubtask the sink subtask to register
*/
public void registerSinkWriter(int sinkSubtask) {
LOG.info("Register sink subtask {}.", sinkSubtask);
activeSinkWriters.add(sinkSubtask);
}
/**
* Record flushed sink subtasks after receiving FlushSuccessEvent.
*
* @param tableId the subtask in SchemaOperator and table that the FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
public void flushSuccess(TableId tableId, int sinkSubtask) {
flushedSinkWriters.add(sinkSubtask);
if (flushedSinkWriters.equals(activeSinkWriters)) {
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
waitFlushSuccess.getResponseFuture().complete(new ReleaseUpstreamResponse());
startNextSchemaChangeRequest();
}
}
private void startNextSchemaChangeRequest() {
flushedSinkWriters.clear();
waitFlushSuccess = null;
if (!pendingSchemaChanges.isEmpty()) {
// TODO : if no need to flush, remove it from pendingSchemaChanges
pendingSchemaChanges
.get(0)
.getResponseFuture()
.complete(new SchemaChangeResponse(true));
}
}
class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private final CompletableFuture<CoordinationResponse> responseFuture;
public PendingSchemaChange(
SchemaChangeRequest changeRequest,
CompletableFuture<CoordinationResponse> responseFuture) {
this.changeRequest = changeRequest;
this.responseFuture = responseFuture;
}
public SchemaChangeRequest getChangeRequest() {
return changeRequest;
}
public CompletableFuture<CoordinationResponse> getResponseFuture() {
return responseFuture;
}
public PendingSchemaChange startToWaitForFlushSuccess() {
if (!responseFuture.isDone()) {
throw new IllegalStateException(
"Cannot start to wait for flush success before the SchemaChangeRequest is done.");
}
return new PendingSchemaChange(changeRequest, new CompletableFuture<>());
}
}
}

@ -19,17 +19,21 @@ package com.ververica.cdc.runtime.operators.schema.event;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.util.Objects;
/**
* A {@link OperatorEvent} from sink writer to notify {@link SchemaOperatorCoordinator} that it
* finished flushing.
* A {@link OperatorEvent} from sink writer to notify {@link SchemaRegistry} that it finished
* flushing.
*/
public class FlushSuccessEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;
/** The sink subtask finished flushing. */
private final int subtask;
/** The schema changes from which table is executing it. */
private final TableId tableId;
public FlushSuccessEvent(int subtask, TableId tableId) {
@ -44,4 +48,21 @@ public class FlushSuccessEvent implements OperatorEvent {
public TableId getTableId() {
return tableId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FlushSuccessEvent)) {
return false;
}
FlushSuccessEvent that = (FlushSuccessEvent) o;
return subtask == that.subtask && Objects.equals(tableId, that.tableId);
}
@Override
public int hashCode() {
return Objects.hash(subtask, tableId);
}
}

@ -16,16 +16,17 @@
package com.ververica.cdc.runtime.operators.schema.event;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
/**
* A {@link OperatorEvent} from {@link SchemaOperatorCoordinator} to notify {@link SchemaOperator}
* that the coordinator has finished schema change and the {@link SchemaOperator} should continue to
* push data.
* The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to release upstream
* after sending {@link FlushEvent}.
*/
public class ReleaseUpstreamEvent implements OperatorEvent {
public class ReleaseUpstreamRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
}

@ -14,12 +14,18 @@
* limitations under the License.
*/
package com.ververica.cdc.runtime.operators.schema.coordinator;
package com.ververica.cdc.runtime.operators.schema.event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
/** A manager to manages schema changes. */
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
/**
* The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link
* SchemaOperator}.
*/
public class ReleaseUpstreamResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
}

@ -18,13 +18,53 @@ package com.ververica.cdc.runtime.operators.schema.event;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.util.Objects;
/**
* The request from {@link SchemaOperator} to {@link SchemaOperatorCoordinator} to request to change
* schemas.
* The request from {@link SchemaOperator} to {@link SchemaRegistry} to request to change schemas.
*/
public class SchemaChangeRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
/** The sender of the request. */
private final TableId tableId;
/** The schema changes. */
private final SchemaChangeEvent schemaChangeEvent;
public SchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
this.tableId = tableId;
this.schemaChangeEvent = schemaChangeEvent;
}
public TableId getTableId() {
return tableId;
}
public SchemaChangeEvent getSchemaChangeEvent() {
return schemaChangeEvent;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SchemaChangeRequest)) {
return false;
}
SchemaChangeRequest that = (SchemaChangeRequest) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
}
@Override
public int hashCode() {
return Objects.hash(tableId, schemaChangeEvent);
}
}

@ -19,12 +19,45 @@ package com.ververica.cdc.runtime.operators.schema.event;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.util.Objects;
/**
* The response for {@link SchemaChangeRequest} from {@link SchemaOperatorCoordinator} to {@link
* The response for {@link SchemaChangeRequest} from {@link SchemaRegistry} to {@link
* SchemaOperator}.
*/
public class SchemaChangeResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
/**
* Whether the SchemaOperator need to buffer data and the SchemaOperatorCoordinator need to wait
* for flushing.
*/
private final boolean shouldSendFlushEvent;
public SchemaChangeResponse(boolean shouldSendFlushEvent) {
this.shouldSendFlushEvent = shouldSendFlushEvent;
}
public boolean isShouldSendFlushEvent() {
return shouldSendFlushEvent;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SchemaChangeResponse)) {
return false;
}
SchemaChangeResponse response = (SchemaChangeResponse) o;
return shouldSendFlushEvent == response.shouldSendFlushEvent;
}
@Override
public int hashCode() {
return Objects.hash(shouldSendFlushEvent);
}
}

@ -18,9 +18,11 @@ package com.ververica.cdc.runtime.operators.schema.event;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
/** A {@link OperatorEvent} that register sink writer to {@link SchemaOperatorCoordinator}. */
import java.util.Objects;
/** A {@link OperatorEvent} that register sink writer to {@link SchemaRegistry}. */
public class SinkWriterRegisterEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;
@ -34,4 +36,21 @@ public class SinkWriterRegisterEvent implements OperatorEvent {
public int getSubtask() {
return subtask;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SinkWriterRegisterEvent)) {
return false;
}
SinkWriterRegisterEvent that = (SinkWriterRegisterEvent) o;
return subtask == that.subtask;
}
@Override
public int hashCode() {
return Objects.hash(subtask);
}
}

@ -23,15 +23,15 @@ import org.apache.flink.util.SerializedValue;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaOperatorCoordinator;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import com.ververica.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
import java.io.IOException;
/**
* Client for {@link DataSinkWriterOperator} interact with {@link SchemaOperatorCoordinator} when
* table schema evolution happened.
* Client for {@link DataSinkWriterOperator} interact with {@link SchemaRegistry} when table schema
* evolution happened.
*/
public class SchemaEvolutionClient {
@ -46,13 +46,13 @@ public class SchemaEvolutionClient {
this.schemaOperatorID = schemaOperatorID;
}
/** send {@link SinkWriterRegisterEvent} to {@link SchemaOperatorCoordinator}. */
/** send {@link SinkWriterRegisterEvent} to {@link SchemaRegistry}. */
public void registerSubtask(int subtask) throws IOException {
toCoordinator.sendOperatorEventToCoordinator(
schemaOperatorID, new SerializedValue<>(new SinkWriterRegisterEvent(subtask)));
}
/** send {@link FlushSuccessEvent} to {@link SchemaOperatorCoordinator}. */
/** send {@link FlushSuccessEvent} to {@link SchemaRegistry}. */
public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {
toCoordinator.sendOperatorEventToCoordinator(
schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));

Loading…
Cancel
Save