[hotfix] Run schema coordinator logic asynchronously to avoid blocking the main thread

pull/3582/head
yuxiqian 5 months ago committed by Leonard Xu
parent 9816848c4b
commit 1ebefd6d2e

@ -40,6 +40,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/** Dummy classes for migration test. Called via reflection. */
@ -69,6 +70,7 @@ public class SchemaRegistryMigrationMock implements MigrationMockBase {
return new SchemaRegistry(
"Dummy Name",
null,
Executors.newFixedThreadPool(1),
new MetadataApplier() {
@Override
public boolean acceptsSchemaEvolutionType(

@ -37,7 +37,9 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandle
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
@ -83,6 +86,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
/** The name of the operator this SchemaOperatorCoordinator is associated with. */
private final String operatorName;
/** A single-thread executor to handle async execution of the coordinator. */
private final ExecutorService coordinatorExecutor;
/**
* Tracks the subtask failed reason to throw a more meaningful exception in {@link
* #subtaskReset}.
@ -113,18 +119,27 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
ExecutorService executorService,
MetadataApplier metadataApplier,
List<RouteRule> routes) {
this(operatorName, context, metadataApplier, routes, SchemaChangeBehavior.EVOLVE);
this(
operatorName,
context,
executorService,
metadataApplier,
routes,
SchemaChangeBehavior.LENIENT);
}
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
ExecutorService coordinatorExecutor,
MetadataApplier metadataApplier,
List<RouteRule> routes,
SchemaChangeBehavior schemaChangeBehavior) {
this.context = context;
this.coordinatorExecutor = coordinatorExecutor;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
this.metadataApplier = metadataApplier;
@ -133,7 +148,11 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
this.schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>());
this.requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior);
metadataApplier,
schemaManager,
schemaDerivation,
schemaChangeBehavior,
context);
this.schemaChangeBehavior = schemaChangeBehavior;
}
@ -153,48 +172,87 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
}
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
try {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
runInEventLoop(
() -> {
try {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(
((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
},
"handling event %s from subTask %d",
event,
subtask);
}
@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)
throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
// Serialize SchemaManager
int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
out.writeInt(schemaManagerSerializerVersion);
byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager);
out.writeInt(serializedSchemaManager.length);
out.write(serializedSchemaManager);
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
} catch (Throwable t) {
context.failJob(t);
throw t;
}
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
// we generate checkpoint in an async thread to not block the JobManager's main thread, the
// coordinator state might be large if there are many schema changes and monitor many
// tables.
runInEventLoop(
() -> {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
// Serialize SchemaManager
int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
out.writeInt(schemaManagerSerializerVersion);
byte[] serializedSchemaManager =
SchemaManager.SERIALIZER.serialize(schemaManager);
out.writeInt(serializedSchemaManager.length);
out.write(serializedSchemaManager);
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
} catch (Throwable t) {
context.failJob(t);
throw t;
}
},
"taking checkpoint %d",
checkpointId);
}
private void runInEventLoop(
final ThrowingRunnable<Throwable> action,
final String actionName,
final Object... actionNameFormatParameters) {
coordinatorExecutor.execute(
() -> {
try {
action.run();
} catch (Throwable t) {
// if we have a JVM critical error, promote it immediately, there is a good
// chance the logging or job failing will not succeed anymore
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String actionString =
String.format(actionName, actionNameFormatParameters);
LOG.error(
"Uncaught exception in the SchemaEvolutionCoordinator for {} while {}. Triggering job failover.",
operatorName,
actionString,
t);
context.failJob(t);
}
});
}
@Override
@ -205,26 +263,34 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest request) {
try {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof GetEvolvedSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();
runInEventLoop(
() -> {
try {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
requestHandler.handleSchemaChangeRequest(
schemaChangeRequest, responseFuture);
} else if (request instanceof SchemaChangeResultRequest) {
requestHandler.getSchemaChangeResult(responseFuture);
} else if (request instanceof GetEvolvedSchemaRequest) {
handleGetEvolvedSchemaRequest(
((GetEvolvedSchemaRequest) request), responseFuture);
} else if (request instanceof GetOriginalSchemaRequest) {
handleGetOriginalSchemaRequest(
(GetOriginalSchemaRequest) request, responseFuture);
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
},
"handling coordination request %s",
request);
return responseFuture;
}
@Override
@ -253,7 +319,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
metadataApplier,
schemaManager,
schemaDerivation,
schemaManager.getBehavior());
schemaManager.getBehavior(),
context);
break;
}
case 1:
@ -274,7 +341,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
metadataApplier,
schemaManager,
schemaDerivation,
schemaChangeBehavior);
schemaChangeBehavior,
context);
break;
}
default:
@ -307,46 +375,56 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
// do nothing
}
private GetEvolvedSchemaResponse handleGetEvolvedSchemaRequest(
GetEvolvedSchemaRequest getEvolvedSchemaRequest) {
private void handleGetEvolvedSchemaRequest(
GetEvolvedSchemaRequest getEvolvedSchemaRequest,
CompletableFuture<CoordinationResponse> response) {
LOG.info("Handling evolved schema request: {}", getEvolvedSchemaRequest);
int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
TableId tableId = getEvolvedSchemaRequest.getTableId();
if (schemaVersion == GetEvolvedSchemaRequest.LATEST_SCHEMA_VERSION) {
return new GetEvolvedSchemaResponse(
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
response.complete(
wrap(
new GetEvolvedSchemaResponse(
schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
} else {
try {
return new GetEvolvedSchemaResponse(
schemaManager.getEvolvedSchema(tableId, schemaVersion));
response.complete(
wrap(
new GetEvolvedSchemaResponse(
schemaManager.getEvolvedSchema(tableId, schemaVersion))));
} catch (IllegalArgumentException iae) {
LOG.warn(
"Some client is requesting an non-existed evolved schema for table {} with version {}",
tableId,
schemaVersion);
return new GetEvolvedSchemaResponse(null);
response.complete(wrap(new GetEvolvedSchemaResponse(null)));
}
}
}
private GetOriginalSchemaResponse handleGetOriginalSchemaRequest(
GetOriginalSchemaRequest getOriginalSchemaRequest) {
private void handleGetOriginalSchemaRequest(
GetOriginalSchemaRequest getOriginalSchemaRequest,
CompletableFuture<CoordinationResponse> response) {
LOG.info("Handling original schema request: {}", getOriginalSchemaRequest);
int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
TableId tableId = getOriginalSchemaRequest.getTableId();
if (schemaVersion == GetOriginalSchemaRequest.LATEST_SCHEMA_VERSION) {
return new GetOriginalSchemaResponse(
schemaManager.getLatestOriginalSchema(tableId).orElse(null));
response.complete(
wrap(
new GetOriginalSchemaResponse(
schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
} else {
try {
return new GetOriginalSchemaResponse(
schemaManager.getOriginalSchema(tableId, schemaVersion));
response.complete(
wrap(
new GetOriginalSchemaResponse(
schemaManager.getOriginalSchema(tableId, schemaVersion))));
} catch (IllegalArgumentException iae) {
LOG.warn(
"Some client is requesting an non-existed original schema for table {} with version {}",
tableId,
schemaVersion);
return new GetOriginalSchemaResponse(null);
response.complete(wrap(new GetOriginalSchemaResponse(null)));
}
}
}

@ -23,8 +23,12 @@ import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.FatalExitExceptionHandler;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** Provider of {@link SchemaRegistry}. */
@Internal
@ -57,7 +61,55 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(
"schema-evolution-coordinator", context.getUserCodeClassloader());
ExecutorService coordinatorExecutor =
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
return new SchemaRegistry(
operatorName, context, metadataApplier, routingRules, schemaChangeBehavior);
operatorName,
context,
coordinatorExecutor,
metadataApplier,
routingRules,
schemaChangeBehavior);
}
/** A thread factory class that provides some helper methods. */
public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
private final String coordinatorThreadName;
private final ClassLoader cl;
private final Thread.UncaughtExceptionHandler errorHandler;
private Thread t;
CoordinatorExecutorThreadFactory(
final String coordinatorThreadName, final ClassLoader contextClassLoader) {
this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);
}
CoordinatorExecutorThreadFactory(
final String coordinatorThreadName,
final ClassLoader contextClassLoader,
final Thread.UncaughtExceptionHandler errorHandler) {
this.coordinatorThreadName = coordinatorThreadName;
this.cl = contextClassLoader;
this.errorHandler = errorHandler;
}
@Override
public synchronized Thread newThread(Runnable r) {
if (t != null) {
throw new Error(
"This indicates that a fatal error has happened and caused the "
+ "coordinator executor thread to exit. Check the earlier logs"
+ "to see the root cause of the problem.");
}
t = new Thread(r, coordinatorThreadName);
t.setContextClassLoader(cl);
t.setUncaughtExceptionHandler(errorHandler);
return t;
}
}
}

@ -38,6 +38,7 @@ import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,15 +95,19 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final SchemaChangeBehavior schemaChangeBehavior;
private final OperatorCoordinator.Context context;
public SchemaRegistryRequestHandler(
MetadataApplier metadataApplier,
SchemaManager schemaManager,
SchemaDerivation schemaDerivation,
SchemaChangeBehavior schemaChangeBehavior) {
SchemaChangeBehavior schemaChangeBehavior,
OperatorCoordinator.Context context) {
this.metadataApplier = metadataApplier;
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;
this.context = context;
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
@ -122,8 +127,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
*
* @param request the received SchemaChangeRequest
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
public void handleSchemaChangeRequest(
SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {
// We use requester subTask ID as the pending ticket, because there will be at most 1 schema
// change requests simultaneously from each subTask
@ -156,7 +161,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
response.complete(wrap(SchemaChangeResponse.busy()));
return;
}
SchemaChangeEvent event = request.getSchemaChangeEvent();
@ -168,8 +174,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.duplicate()));
response.complete(wrap(SchemaChangeResponse.duplicate()));
return;
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
@ -183,7 +189,9 @@ public class SchemaRegistryRequestHandler implements Closeable {
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
response.complete(wrap(SchemaChangeResponse.ignored()));
return;
}
LOG.info(
@ -191,8 +199,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
// This request has been accepted.
schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
@ -202,7 +210,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
response.complete(wrap(SchemaChangeResponse.busy()));
}
}
}
@ -299,7 +307,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
}
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {
Preconditions.checkState(
schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
@ -311,11 +319,12 @@ public class SchemaRegistryRequestHandler implements Closeable {
// This request has been finished, return it and prepare for the next request
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
() -> wrap(new SchemaChangeResultResponse(finishedEvents)));
SchemaChangeResultResponse resultResponse =
new SchemaChangeResultResponse(finishedEvents);
response.complete(wrap(resultResponse));
} else {
// Still working on schema change request, waiting it
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
response.complete(wrap(new SchemaChangeProcessingResponse()));
}
}
@ -444,7 +453,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
if (currentChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", currentChangeException);
context.failJob(
new RuntimeException("Failed to apply schema change.", currentChangeException));
}
List<SchemaChangeEvent> finishedSchemaChanges =
new ArrayList<>(currentFinishedSchemaChanges);

@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@ -1035,11 +1036,16 @@ public class SchemaEvolveTest {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"height", DOUBLE, "Height data")))));
Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents))
processEvent(schemaOperator, addColumnEvents);
Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
Assertions.assertThat(harness.getJobFailureCause())
.cause()
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to apply schema change");
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
.matches(
e ->
((UnsupportedSchemaChangeEventException) e)
.getExceptionMessage()
.equals("Sink doesn't support such schema change event."));
harness.close();
}

@ -38,7 +38,6 @@ import org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGatewa
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@ -56,6 +55,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.Executors;
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;
@ -81,6 +81,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
private final SchemaRegistry schemaRegistry;
private final TestingSchemaRegistryGateway schemaRegistryGateway;
private final LinkedList<StreamRecord<E>> outputRecords = new LinkedList<>();
private final MockedOperatorCoordinatorContext mockedContext;
public EventOperatorTestHarness(OP operator, int numOutputs) {
this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
@ -94,11 +95,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
OP operator, int numOutputs, Duration duration, SchemaChangeBehavior behavior) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration),
new ArrayList<>(),
behavior);
@ -113,11 +117,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
Set<SchemaChangeEventType> enabledEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration, enabledEventTypes),
new ArrayList<>(),
behavior);
@ -133,11 +140,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
Set<SchemaChangeEventType> errorsOnEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(
duration, enabledEventTypes, errorsOnEventTypes),
new ArrayList<>(),
@ -196,6 +206,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
.orElse(null);
}
public boolean isJobFailed() {
return mockedContext.isJobFailed();
}
public Throwable getJobFailureCause() {
return mockedContext.getFailureCause();
}
@Override
public void close() throws Exception {
operator.close();

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.testutils.operators;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
/**
* This is a mocked version of Operator coordinator context that stores failure cause for testing
* purposes only.
*/
public class MockedOperatorCoordinatorContext extends MockOperatorCoordinatorContext {
public MockedOperatorCoordinatorContext(
OperatorID operatorID, ClassLoader userCodeClassLoader) {
super(operatorID, userCodeClassLoader);
}
private Throwable failureCause;
@Override
public void failJob(Throwable cause) {
super.failJob(cause);
failureCause = cause;
}
public Throwable getFailureCause() {
return failureCause;
}
}
Loading…
Cancel
Save