[cdc-runtime] Use CollectCoordinationResponse as a wrapper of CoordinationResponse

The reason we do this is that there is potentially a bug in Flink RPC system, that CoordinationResponse can only be deserialized by AppClassloader instead of user code classloader, so we can't use customize CoordinationResponse class in RPCs (will lead to ClassNotFoundException). As CollectCoordinationResponse is predefined in Flink and shipped in flink-dist, it will be always visible to AppClassloader, and we use its payload for holding the actual serialized custom CoordinationResponse.

This closes #2744
pull/2749/head
Qingsheng Ren 1 year ago committed by Leonard Xu
parent 1973f64aa3
commit 8ee3ea52d5

@ -33,7 +33,9 @@ import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import com.ververica.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import com.ververica.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.slf4j.Logger;
@ -68,7 +70,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
@Override
public void processElement(StreamRecord<Event> streamRecord) throws Exception {
public void processElement(StreamRecord<Event> streamRecord) {
Event event = streamRecord.getValue();
if (event instanceof SchemaChangeEvent) {
TableId tableId = ((SchemaChangeEvent) event).tableId();
@ -85,8 +87,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
// The request will need to send a FlushEvent or block until flushing finished
SchemaChangeResponse response =
(SchemaChangeResponse) requestSchemaChange(tableId, schemaChangeEvent);
SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
if (response.isShouldSendFlushEvent()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
@ -99,21 +100,22 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
}
}
private CoordinationResponse requestSchemaChange(
private SchemaChangeResponse requestSchemaChange(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
}
private CoordinationResponse requestReleaseUpstream() {
private ReleaseUpstreamResponse requestReleaseUpstream() {
return sendRequestToCoordinator(new ReleaseUpstreamRequest());
}
private CoordinationResponse sendRequestToCoordinator(CoordinationRequest request) {
private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>
RESPONSE sendRequestToCoordinator(REQUEST request) {
try {
CompletableFuture<CoordinationResponse> responseFuture =
toCoordinator.sendRequestToCoordinator(
getOperatorID(), new SerializedValue<>(request));
return responseFuture.get();
return CoordinationResponseUtils.unwrap(responseFuture.get());
} catch (Exception e) {
throw new IllegalStateException(
"Failed to send request to coordinator: " + request.toString(), e);

@ -23,7 +23,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.FlinkException;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
@ -46,6 +45,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
/**
* The implementation of the {@link OperatorCoordinator} for the {@link SchemaOperator}.
*
@ -62,7 +63,6 @@ import java.util.concurrent.CompletableFuture;
* FlushSuccessEvent} from its registered sink writer
* </ul>
*/
@Internal
public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestHandler {
private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class);
@ -158,7 +158,7 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
return requestHandler.handleReleaseUpstreamRequest();
} else if (request instanceof GetSchemaRequest) {
return CompletableFuture.completedFuture(
handleGetSchemaRequest(((GetSchemaRequest) request)));
wrap(handleGetSchemaRequest(((GetSchemaRequest) request))));
} else {
throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request);
}

@ -38,6 +38,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
/** A handler to deal with all requests and events for {@link SchemaRegistry}. */
@Internal
@NotThreadSafe
@ -97,10 +99,11 @@ public class SchemaRegistryRequestHandler {
request.getTableId().toString());
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.schemaExists(request.getTableId())) {
return CompletableFuture.completedFuture(new SchemaChangeResponse(false));
return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));
}
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(new SchemaChangeResponse(true));
CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChanges.add(new PendingSchemaChange(request, response));
return response;
} else {
@ -140,7 +143,7 @@ public class SchemaRegistryRequestHandler {
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());
waitFlushSuccess.getResponseFuture().complete(new ReleaseUpstreamResponse());
waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));
startNextSchemaChangeRequest();
}
}
@ -153,10 +156,15 @@ public class SchemaRegistryRequestHandler {
SchemaChangeRequest request = pendingSchemaChange.changeRequest;
if (request.getSchemaChangeEvent() instanceof CreateTableEvent
&& schemaManager.schemaExists(request.getTableId())) {
pendingSchemaChange.getResponseFuture().complete(new SchemaChangeResponse(false));
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(false)));
pendingSchemaChanges.remove(0);
} else {
pendingSchemaChange.getResponseFuture().complete(new SchemaChangeResponse(true));
schemaManager.applySchemaChange(request.getSchemaChangeEvent());
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(true)));
break;
}
}

@ -0,0 +1,167 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.operators.schema.event;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.utils.InstantiationUtil;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Utilities for wrapping and unwrapping {@link CoordinationResponse} by {@link
* CollectCoordinationResponse}.
*/
@Internal
public class CoordinationResponseUtils {
private static final String MAGIC_VERSION = "__internal__";
private static final long MAGIC_OFFSET = 15213L;
public static <R extends CoordinationResponse> CoordinationResponse wrap(R response) {
CoordinationResponseSerializer serializer = new CoordinationResponseSerializer();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
serializer.serialize(response, new DataOutputViewStreamWrapper(out));
return new CollectCoordinationResponse(
MAGIC_VERSION, MAGIC_OFFSET, Collections.singletonList(baos.toByteArray()));
} catch (Exception e) {
throw new IllegalStateException(
String.format(
"Unable to wrap CoordinationResponse \"%s\" with type \"%s\"",
response, response.getClass().getCanonicalName()),
e);
}
}
@SuppressWarnings("unchecked")
public static <R extends CoordinationResponse> R unwrap(CoordinationResponse response) {
try {
CollectCoordinationResponse rawResponse = (CollectCoordinationResponse) response;
List<CoordinationResponse> results =
rawResponse.getResults(new CoordinationResponseSerializer());
return (R) results.get(0);
} catch (Exception e) {
throw new IllegalStateException("Unable to unwrap CoordinationResponse", e);
}
}
private static class CoordinationResponseSerializer
extends TypeSerializer<CoordinationResponse> {
@Override
public void serialize(CoordinationResponse record, DataOutputView target)
throws IOException {
byte[] serialized = InstantiationUtil.serializeObject(record);
target.writeInt(serialized.length);
target.write(serialized);
}
@Override
public CoordinationResponse deserialize(DataInputView source) throws IOException {
try {
int length = source.readInt();
byte[] serialized = new byte[length];
source.readFully(serialized);
return InstantiationUtil.deserializeObject(
serialized, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to deserialize CoordinationResponse", e);
}
}
@Override
public CoordinationResponse deserialize(CoordinationResponse reuse, DataInputView source)
throws IOException {
return deserialize(source);
}
@Override
public boolean isImmutableType() {
return true;
}
@Override
public TypeSerializer<CoordinationResponse> duplicate() {
return new CoordinationResponseSerializer();
}
@Override
public CoordinationResponse createInstance() {
return new CoordinationResponse() {};
}
@Override
public CoordinationResponse copy(CoordinationResponse from) {
throw new UnsupportedOperationException();
}
@Override
public CoordinationResponse copy(CoordinationResponse from, CoordinationResponse reuse) {
throw new UnsupportedOperationException();
}
@Override
public int getLength() {
return -1;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
CoordinationResponse deserialize = deserialize(source);
serialize(deserialize, target);
}
@Override
public boolean equals(Object obj) {
return obj instanceof CoordinationResponseSerializer;
}
@Override
public int hashCode() {
return getClass().hashCode();
}
@Override
public TypeSerializerSnapshot<CoordinationResponse> snapshotConfiguration() {
return new CoordinationResponseDeserializerSnapshot();
}
/** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess")
public static final class CoordinationResponseDeserializerSnapshot
extends SimpleTypeSerializerSnapshot<CoordinationResponse> {
public CoordinationResponseDeserializerSnapshot() {
super(CoordinationResponseSerializer::new);
}
}
}
}

@ -18,10 +18,8 @@ package com.ververica.cdc.runtime.operators.sink;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.runtime.operators.sink.DataSinkWriterOperator;
import org.apache.flink.util.SerializedValue;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.runtime.operators.schema.SchemaOperator;
@ -34,11 +32,12 @@ import com.ververica.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
import java.io.IOException;
import java.util.Optional;
import static com.ververica.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;
/**
* Client for {@link DataSinkWriterOperator} interact with {@link SchemaRegistry} when table schema
* evolution happened.
*/
@Internal
public class SchemaEvolutionClient {
private final TaskOperatorEventGateway toCoordinator;
@ -66,7 +65,7 @@ public class SchemaEvolutionClient {
public Optional<Schema> getLatestSchema(TableId tableId) throws Exception {
GetSchemaResponse getSchemaResponse =
((GetSchemaResponse)
unwrap(
toCoordinator
.sendRequestToCoordinator(
schemaOperatorID,

Loading…
Cancel
Save