diff --git a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
index bb19223e6..34cd1ea03 100644
--- a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
+++ b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 /** Dummy classes for migration test. Called via reflection. */
@@ -69,6 +70,7 @@ public class SchemaRegistryMigrationMock implements MigrationMockBase {
         return new SchemaRegistry(
                 "Dummy Name",
                 null,
+                Executors.newFixedThreadPool(1),
                 new MetadataApplier() {
                     @Override
                     public boolean acceptsSchemaEvolutionType(
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 617daed92..64d4fa1bc 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -37,7 +37,9 @@ import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandle
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
 
@@ -83,6 +86,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
     /** The name of the operator this SchemaOperatorCoordinator is associated with. */
     private final String operatorName;
 
+    /** A single-thread executor to handle async execution of the coordinator. */
+    private final ExecutorService coordinatorExecutor;
+
     /**
      * Tracks the subtask failed reason to throw a more meaningful exception in {@link
      * #subtaskReset}.
@@ -113,18 +119,27 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
     public SchemaRegistry(
             String operatorName,
             OperatorCoordinator.Context context,
+            ExecutorService executorService,
             MetadataApplier metadataApplier,
             List<RouteRule> routes) {
-        this(operatorName, context, metadataApplier, routes, SchemaChangeBehavior.EVOLVE);
+        this(
+                operatorName,
+                context,
+                executorService,
+                metadataApplier,
+                routes,
+                SchemaChangeBehavior.LENIENT);
     }
 
     public SchemaRegistry(
             String operatorName,
             OperatorCoordinator.Context context,
+            ExecutorService coordinatorExecutor,
             MetadataApplier metadataApplier,
             List<RouteRule> routes,
             SchemaChangeBehavior schemaChangeBehavior) {
         this.context = context;
+        this.coordinatorExecutor = coordinatorExecutor;
         this.operatorName = operatorName;
         this.failedReasons = new HashMap<>();
         this.metadataApplier = metadataApplier;
@@ -133,7 +148,11 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
         this.schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>());
         this.requestHandler =
                 new SchemaRegistryRequestHandler(
-                        metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior);
+                        metadataApplier,
+                        schemaManager,
+                        schemaDerivation,
+                        schemaChangeBehavior,
+                        context);
         this.schemaChangeBehavior = schemaChangeBehavior;
     }
 
@@ -153,48 +172,87 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
     }
 
     @Override
-    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
-            throws Exception {
-        try {
-            if (event instanceof FlushSuccessEvent) {
-                FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
-                LOG.info(
-                        "Sink subtask {} succeed flushing for table {}.",
-                        flushSuccessEvent.getSubtask(),
-                        flushSuccessEvent.getTableId().toString());
-                requestHandler.flushSuccess(
-                        flushSuccessEvent.getTableId(),
-                        flushSuccessEvent.getSubtask(),
-                        currentParallelism);
-            } else if (event instanceof SinkWriterRegisterEvent) {
-                requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
-            } else {
-                throw new FlinkException("Unrecognized Operator Event: " + event);
-            }
-        } catch (Throwable t) {
-            context.failJob(t);
-            throw t;
-        }
+    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
+        runInEventLoop(
+                () -> {
+                    try {
+                        if (event instanceof FlushSuccessEvent) {
+                            FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
+                            LOG.info(
+                                    "Sink subtask {} succeed flushing for table {}.",
+                                    flushSuccessEvent.getSubtask(),
+                                    flushSuccessEvent.getTableId().toString());
+                            requestHandler.flushSuccess(
+                                    flushSuccessEvent.getTableId(),
+                                    flushSuccessEvent.getSubtask(),
+                                    currentParallelism);
+                        } else if (event instanceof SinkWriterRegisterEvent) {
+                            requestHandler.registerSinkWriter(
+                                    ((SinkWriterRegisterEvent) event).getSubtask());
+                        } else {
+                            throw new FlinkException("Unrecognized Operator Event: " + event);
+                        }
+                    } catch (Throwable t) {
+                        context.failJob(t);
+                        throw t;
+                    }
+                },
+                "handling event %s from subTask %d",
+                event,
+                subtask);
     }
 
     @Override
-    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)
-            throws Exception {
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                DataOutputStream out = new DataOutputStream(baos)) {
-            // Serialize SchemaManager
-            int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
-            out.writeInt(schemaManagerSerializerVersion);
-            byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager);
-            out.writeInt(serializedSchemaManager.length);
-            out.write(serializedSchemaManager);
-            // Serialize SchemaDerivation mapping
-            SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
-            resultFuture.complete(baos.toByteArray());
-        } catch (Throwable t) {
-            context.failJob(t);
-            throw t;
-        }
+    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
+        // we generate checkpoint in an async thread to not block the JobManager's main thread, the
+        // coordinator state might be large if there are many schema changes and monitor many
+        // tables.
+        runInEventLoop(
+                () -> {
+                    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                            DataOutputStream out = new DataOutputStream(baos)) {
+                        // Serialize SchemaManager
+                        int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
+                        out.writeInt(schemaManagerSerializerVersion);
+                        byte[] serializedSchemaManager =
+                                SchemaManager.SERIALIZER.serialize(schemaManager);
+                        out.writeInt(serializedSchemaManager.length);
+                        out.write(serializedSchemaManager);
+                        // Serialize SchemaDerivation mapping
+                        SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
+                        resultFuture.complete(baos.toByteArray());
+                    } catch (Throwable t) {
+                        context.failJob(t);
+                        throw t;
+                    }
+                },
+                "taking checkpoint %d",
+                checkpointId);
+    }
+
+    private void runInEventLoop(
+            final ThrowingRunnable<Throwable> action,
+            final String actionName,
+            final Object... actionNameFormatParameters) {
+        coordinatorExecutor.execute(
+                () -> {
+                    try {
+                        action.run();
+                    } catch (Throwable t) {
+                        // if we have a JVM critical error, promote it immediately, there is a good
+                        // chance the logging or job failing will not succeed anymore
+                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+                        final String actionString =
+                                String.format(actionName, actionNameFormatParameters);
+                        LOG.error(
+                                "Uncaught exception in the SchemaEvolutionCoordinator for {} while {}. Triggering job failover.",
+                                operatorName,
+                                actionString,
+                                t);
+                        context.failJob(t);
+                    }
+                });
     }
 
     @Override
@@ -205,26 +263,34 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
     @Override
     public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
             CoordinationRequest request) {
-        try {
-            if (request instanceof SchemaChangeRequest) {
-                SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
-                return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
-            } else if (request instanceof SchemaChangeResultRequest) {
-                return requestHandler.getSchemaChangeResult();
-            } else if (request instanceof GetEvolvedSchemaRequest) {
-                return CompletableFuture.completedFuture(
-                        wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
-            } else if (request instanceof GetOriginalSchemaRequest) {
-                return CompletableFuture.completedFuture(
-                        wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
-            } else {
-                throw new IllegalArgumentException(
-                        "Unrecognized CoordinationRequest type: " + request);
-            }
-        } catch (Throwable t) {
-            context.failJob(t);
-            throw t;
-        }
+        CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();
+        runInEventLoop(
+                () -> {
+                    try {
+                        if (request instanceof SchemaChangeRequest) {
+                            SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
+                            requestHandler.handleSchemaChangeRequest(
+                                    schemaChangeRequest, responseFuture);
+                        } else if (request instanceof SchemaChangeResultRequest) {
+                            requestHandler.getSchemaChangeResult(responseFuture);
+                        } else if (request instanceof GetEvolvedSchemaRequest) {
+                            handleGetEvolvedSchemaRequest(
+                                    ((GetEvolvedSchemaRequest) request), responseFuture);
+                        } else if (request instanceof GetOriginalSchemaRequest) {
+                            handleGetOriginalSchemaRequest(
+                                    (GetOriginalSchemaRequest) request, responseFuture);
+                        } else {
+                            throw new IllegalArgumentException(
+                                    "Unrecognized CoordinationRequest type: " + request);
+                        }
+                    } catch (Throwable t) {
+                        context.failJob(t);
+                        throw t;
+                    }
+                },
+                "handling coordination request %s",
+                request);
+        return responseFuture;
     }
 
     @Override
@@ -253,7 +319,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
                                         metadataApplier,
                                         schemaManager,
                                         schemaDerivation,
-                                        schemaManager.getBehavior());
+                                        schemaManager.getBehavior(),
+                                        context);
                         break;
                     }
                 case 1:
@@ -274,7 +341,8 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
                                         metadataApplier,
                                         schemaManager,
                                         schemaDerivation,
-                                        schemaChangeBehavior);
+                                        schemaChangeBehavior,
+                                        context);
                         break;
                     }
                 default:
@@ -307,46 +375,56 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
         // do nothing
     }
 
-    private GetEvolvedSchemaResponse handleGetEvolvedSchemaRequest(
-            GetEvolvedSchemaRequest getEvolvedSchemaRequest) {
+    private void handleGetEvolvedSchemaRequest(
+            GetEvolvedSchemaRequest getEvolvedSchemaRequest,
+            CompletableFuture<CoordinationResponse> response) {
         LOG.info("Handling evolved schema request: {}", getEvolvedSchemaRequest);
         int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
         TableId tableId = getEvolvedSchemaRequest.getTableId();
         if (schemaVersion == GetEvolvedSchemaRequest.LATEST_SCHEMA_VERSION) {
-            return new GetEvolvedSchemaResponse(
-                    schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
+            response.complete(
+                    wrap(
+                            new GetEvolvedSchemaResponse(
+                                    schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
         } else {
             try {
-                return new GetEvolvedSchemaResponse(
-                        schemaManager.getEvolvedSchema(tableId, schemaVersion));
+                response.complete(
+                        wrap(
+                                new GetEvolvedSchemaResponse(
+                                        schemaManager.getEvolvedSchema(tableId, schemaVersion))));
             } catch (IllegalArgumentException iae) {
                 LOG.warn(
                         "Some client is requesting an non-existed evolved schema for table {} with version {}",
                         tableId,
                         schemaVersion);
-                return new GetEvolvedSchemaResponse(null);
+                response.complete(wrap(new GetEvolvedSchemaResponse(null)));
             }
         }
     }
 
-    private GetOriginalSchemaResponse handleGetOriginalSchemaRequest(
-            GetOriginalSchemaRequest getOriginalSchemaRequest) {
+    private void handleGetOriginalSchemaRequest(
+            GetOriginalSchemaRequest getOriginalSchemaRequest,
+            CompletableFuture<CoordinationResponse> response) {
         LOG.info("Handling original schema request: {}", getOriginalSchemaRequest);
         int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
         TableId tableId = getOriginalSchemaRequest.getTableId();
         if (schemaVersion == GetOriginalSchemaRequest.LATEST_SCHEMA_VERSION) {
-            return new GetOriginalSchemaResponse(
-                    schemaManager.getLatestOriginalSchema(tableId).orElse(null));
+            response.complete(
+                    wrap(
+                            new GetOriginalSchemaResponse(
+                                    schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
         } else {
             try {
-                return new GetOriginalSchemaResponse(
-                        schemaManager.getOriginalSchema(tableId, schemaVersion));
+                response.complete(
+                        wrap(
+                                new GetOriginalSchemaResponse(
+                                        schemaManager.getOriginalSchema(tableId, schemaVersion))));
             } catch (IllegalArgumentException iae) {
                 LOG.warn(
                         "Some client is requesting an non-existed original schema for table {} with version {}",
                         tableId,
                         schemaVersion);
-                return new GetOriginalSchemaResponse(null);
+                response.complete(wrap(new GetOriginalSchemaResponse(null)));
             }
         }
     }
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
index dd7f2dc36..bc261e40f 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
@@ -23,8 +23,12 @@ import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 /** Provider of {@link SchemaRegistry}. */
 @Internal
@@ -57,7 +61,55 @@ public class SchemaRegistryProvider implements OperatorCoordinator.Provider {
 
     @Override
     public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
+        CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+                new CoordinatorExecutorThreadFactory(
+                        "schema-evolution-coordinator", context.getUserCodeClassloader());
+        ExecutorService coordinatorExecutor =
+                Executors.newSingleThreadExecutor(coordinatorThreadFactory);
         return new SchemaRegistry(
-                operatorName, context, metadataApplier, routingRules, schemaChangeBehavior);
+                operatorName,
+                context,
+                coordinatorExecutor,
+                metadataApplier,
+                routingRules,
+                schemaChangeBehavior);
+    }
+
+    /** A thread factory class that provides some helper methods. */
+    public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
+
+        private final String coordinatorThreadName;
+        private final ClassLoader cl;
+        private final Thread.UncaughtExceptionHandler errorHandler;
+
+        private Thread t;
+
+        CoordinatorExecutorThreadFactory(
+                final String coordinatorThreadName, final ClassLoader contextClassLoader) {
+            this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);
+        }
+
+        CoordinatorExecutorThreadFactory(
+                final String coordinatorThreadName,
+                final ClassLoader contextClassLoader,
+                final Thread.UncaughtExceptionHandler errorHandler) {
+            this.coordinatorThreadName = coordinatorThreadName;
+            this.cl = contextClassLoader;
+            this.errorHandler = errorHandler;
+        }
+
+        @Override
+        public synchronized Thread newThread(Runnable r) {
+            if (t != null) {
+                throw new Error(
+                        "This indicates that a fatal error has happened and caused the "
+                                + "coordinator executor thread to exit. Check the earlier logs"
+                                + "to see the root cause of the problem.");
+            }
+            t = new Thread(r, coordinatorThreadName);
+            t.setContextClassLoader(cl);
+            t.setUncaughtExceptionHandler(errorHandler);
+            return t;
+        }
     }
 }
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index de5bde4e6..9cb0e3e00 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -38,6 +38,7 @@ import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,15 +95,19 @@ public class SchemaRegistryRequestHandler implements Closeable {
 
     private final SchemaChangeBehavior schemaChangeBehavior;
 
+    private final OperatorCoordinator.Context context;
+
     public SchemaRegistryRequestHandler(
             MetadataApplier metadataApplier,
             SchemaManager schemaManager,
             SchemaDerivation schemaDerivation,
-            SchemaChangeBehavior schemaChangeBehavior) {
+            SchemaChangeBehavior schemaChangeBehavior,
+            OperatorCoordinator.Context context) {
         this.metadataApplier = metadataApplier;
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
         this.schemaChangeBehavior = schemaChangeBehavior;
+        this.context = context;
 
         this.activeSinkWriters = ConcurrentHashMap.newKeySet();
         this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
@@ -122,8 +127,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
      *
      * @param request the received SchemaChangeRequest
      */
-    public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
-            SchemaChangeRequest request) {
+    public void handleSchemaChangeRequest(
+            SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {
 
         // We use requester subTask ID as the pending ticket, because there will be at most 1 schema
         // change requests simultaneously from each subTask
@@ -156,7 +161,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
                     if (!pendingSubTaskIds.contains(requestSubTaskId)) {
                         pendingSubTaskIds.add(requestSubTaskId);
                     }
-                    return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+                    response.complete(wrap(SchemaChangeResponse.busy()));
+                    return;
                 }
 
                 SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -168,8 +174,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
                     LOG.info(
                             "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
                             request);
-                    return CompletableFuture.completedFuture(
-                            wrap(SchemaChangeResponse.duplicate()));
+                    response.complete(wrap(SchemaChangeResponse.duplicate()));
+                    return;
                 }
                 schemaManager.applyOriginalSchemaChange(event);
                 List<SchemaChangeEvent> derivedSchemaChangeEvents =
@@ -183,7 +189,9 @@ public class SchemaRegistryRequestHandler implements Closeable {
                     LOG.info(
                             "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
                             request);
-                    return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+
+                    response.complete(wrap(SchemaChangeResponse.ignored()));
+                    return;
                 }
 
                 LOG.info(
@@ -191,8 +199,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
                 // This request has been accepted.
                 schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
                 currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
-                return CompletableFuture.completedFuture(
-                        wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+
+                response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
             } else {
                 LOG.info(
                         "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
@@ -202,7 +210,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
                 if (!pendingSubTaskIds.contains(requestSubTaskId)) {
                     pendingSubTaskIds.add(requestSubTaskId);
                 }
-                return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+                response.complete(wrap(SchemaChangeResponse.busy()));
             }
         }
     }
@@ -299,7 +307,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
         }
     }
 
-    public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
+    public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {
         Preconditions.checkState(
                 schemaChangeStatus != RequestStatus.IDLE,
                 "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
@@ -311,11 +319,12 @@ public class SchemaRegistryRequestHandler implements Closeable {
 
             // This request has been finished, return it and prepare for the next request
             List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
-            return CompletableFuture.supplyAsync(
-                    () -> wrap(new SchemaChangeResultResponse(finishedEvents)));
+            SchemaChangeResultResponse resultResponse =
+                    new SchemaChangeResultResponse(finishedEvents);
+            response.complete(wrap(resultResponse));
         } else {
             // Still working on schema change request, waiting it
-            return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
+            response.complete(wrap(new SchemaChangeProcessingResponse()));
         }
     }
 
@@ -444,7 +453,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
 
     private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
         if (currentChangeException != null) {
-            throw new RuntimeException("Failed to apply schema change.", currentChangeException);
+            context.failJob(
+                    new RuntimeException("Failed to apply schema change.", currentChangeException));
         }
         List<SchemaChangeEvent> finishedSchemaChanges =
                 new ArrayList<>(currentFinishedSchemaChanges);
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index d3e0de8d0..a3d5ce547 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -1035,11 +1036,16 @@ public class SchemaEvolveTest {
                                         new AddColumnEvent.ColumnWithPosition(
                                                 Column.physicalColumn(
                                                         "height", DOUBLE, "Height data")))));
-        Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents))
+        processEvent(schemaOperator, addColumnEvents);
+        Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
+        Assertions.assertThat(harness.getJobFailureCause())
                 .cause()
-                .cause()
-                .isExactlyInstanceOf(RuntimeException.class)
-                .hasMessageContaining("Failed to apply schema change");
+                .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
+                .matches(
+                        e ->
+                                ((UnsupportedSchemaChangeEventException) e)
+                                        .getExceptionMessage()
+                                        .equals("Sink doesn't support such schema change event."));
         harness.close();
     }
 
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index a469a3a91..d8b977029 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -38,7 +38,6 @@ import org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGatewa
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
-import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -56,6 +55,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;
 
@@ -81,6 +81,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
     private final SchemaRegistry schemaRegistry;
     private final TestingSchemaRegistryGateway schemaRegistryGateway;
     private final LinkedList<StreamRecord<E>> outputRecords = new LinkedList<>();
+    private final MockedOperatorCoordinatorContext mockedContext;
 
     public EventOperatorTestHarness(OP operator, int numOutputs) {
         this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
@@ -94,11 +95,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
             OP operator, int numOutputs, Duration duration, SchemaChangeBehavior behavior) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.mockedContext =
+                new MockedOperatorCoordinatorContext(
+                        SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
         schemaRegistry =
                 new SchemaRegistry(
                         "SchemaOperator",
-                        new MockOperatorCoordinatorContext(
-                                SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
+                        mockedContext,
+                        Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(duration),
                         new ArrayList<>(),
                         behavior);
@@ -113,11 +117,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
             Set<SchemaChangeEventType> enabledEventTypes) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.mockedContext =
+                new MockedOperatorCoordinatorContext(
+                        SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
         schemaRegistry =
                 new SchemaRegistry(
                         "SchemaOperator",
-                        new MockOperatorCoordinatorContext(
-                                SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
+                        mockedContext,
+                        Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(duration, enabledEventTypes),
                         new ArrayList<>(),
                         behavior);
@@ -133,11 +140,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
             Set<SchemaChangeEventType> errorsOnEventTypes) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.mockedContext =
+                new MockedOperatorCoordinatorContext(
+                        SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
         schemaRegistry =
                 new SchemaRegistry(
                         "SchemaOperator",
-                        new MockOperatorCoordinatorContext(
-                                SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
+                        mockedContext,
+                        Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(
                                 duration, enabledEventTypes, errorsOnEventTypes),
                         new ArrayList<>(),
@@ -196,6 +206,14 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
                 .orElse(null);
     }
 
+    public boolean isJobFailed() {
+        return mockedContext.isJobFailed();
+    }
+
+    public Throwable getJobFailureCause() {
+        return mockedContext.getFailureCause();
+    }
+
     @Override
     public void close() throws Exception {
         operator.close();
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
new file mode 100644
index 000000000..19ab961ee
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.testutils.operators;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+
+/**
+ * This is a mocked version of Operator coordinator context that stores failure cause for testing
+ * purposes only.
+ */
+public class MockedOperatorCoordinatorContext extends MockOperatorCoordinatorContext {
+    public MockedOperatorCoordinatorContext(
+            OperatorID operatorID, ClassLoader userCodeClassLoader) {
+        super(operatorID, userCodeClassLoader);
+    }
+
+    private Throwable failureCause;
+
+    @Override
+    public void failJob(Throwable cause) {
+        super.failJob(cause);
+        failureCause = cause;
+    }
+
+    public Throwable getFailureCause() {
+        return failureCause;
+    }
+}