[FLINK-36094][cdc-runtime] Improve the Exception that SchemaRegistryRequestHandler thrown

This closes #3558.
pull/3538/head
Hongshun Wang 5 months ago committed by GitHub
parent 77c63385d9
commit 6205a5a0f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,9 +24,9 @@ import javax.annotation.Nullable;
/** An exception occurred during schema evolution. */
public class SchemaEvolveException extends FlinkRuntimeException {
private final SchemaChangeEvent applyingEvent;
private final String exceptionMessage;
private final @Nullable Throwable cause;
protected final SchemaChangeEvent applyingEvent;
protected final String exceptionMessage;
protected final @Nullable Throwable cause;
public SchemaEvolveException(SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);

@ -19,10 +19,36 @@ package org.apache.flink.cdc.common.exceptions;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import javax.annotation.Nullable;
/** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */
public class UnsupportedSchemaChangeEventException extends SchemaEvolveException {
public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) {
super(applyingEvent, "Sink doesn't support such schema change event.", null);
this(applyingEvent, "Sink doesn't support such schema change event.");
}
public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
}
public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) {
super(applyingEvent, exceptionMessage, cause);
}
@Override
public String toString() {
return "UnsupportedSchemaChangeEventException{"
+ "applyingEvent="
+ applyingEvent
+ ", exceptionMessage='"
+ exceptionMessage
+ '\''
+ ", cause='"
+ cause
+ '\''
+ '}';
}
}

@ -221,7 +221,7 @@ public class DorisMetadataApplier implements MetadataApplier {
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply add column event", e);
}
}
@ -234,7 +234,7 @@ public class DorisMetadataApplier implements MetadataApplier {
tableId.getSchemaName(), tableId.getTableName(), col);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply drop column event", e);
}
}
@ -250,7 +250,7 @@ public class DorisMetadataApplier implements MetadataApplier {
entry.getValue());
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply rename column event", e);
}
}
@ -272,7 +272,7 @@ public class DorisMetadataApplier implements MetadataApplier {
// will be fixed after FLINK-35243 got merged.
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply alter column type event", e);
}
}
}

@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@ -152,7 +153,7 @@ public class ValuesDatabase {
tableId, ((CreateTableEvent) schemaChangeEvent).getSchema()));
}
} else {
throw new SchemaEvolveException(
throw new UnsupportedSchemaChangeEventException(
schemaChangeEvent,
"Rejected schema change event since error.on.schema.change is enabled.",
null);

@ -341,7 +341,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + MysqlE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}

@ -800,7 +800,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}",
@ -818,7 +818,7 @@ public class RouteE2eITCase extends PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + EVENT_DEFAULT_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}

@ -113,9 +113,9 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
true,
false,
false,
Collections.singletonList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\""),
Collections.singletonList(
Collections.emptyList(),
Arrays.asList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}
@ -126,11 +126,10 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
false,
true,
false,
Collections.singletonList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}"),
Collections.emptyList(),
Arrays.asList(
"Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}
@ -146,8 +145,8 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}"),
Arrays.asList(
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
}
@Test
@ -185,7 +184,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
false,
false,
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
@ -369,7 +368,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
List<String> expectedJmEvents =
expectedJobManagerEvents.stream()
.map(s -> String.format(s, dbName, dbName))
.map(s -> String.format(s, dbName, dbName, dbName))
.collect(Collectors.toList());
validateResult(expectedJmEvents, jobManagerConsumer);
@ -422,7 +421,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = consumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}

@ -736,7 +736,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}

@ -386,7 +386,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment {
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}

@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
@ -29,7 +28,6 @@ import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
@ -40,8 +38,6 @@ import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
@ -431,56 +427,9 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents =
schemaEvolveResponse.getFailedSchemaChangeEvents();
List<SchemaChangeEvent> ignoredSchemaChangeEvents =
schemaEvolveResponse.getIgnoredSchemaChangeEvents();
if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) {
if (schemaEvolveResponse.hasException()) {
throw new RuntimeException(
String.format(
"Failed to apply schema change event %s.\nExceptions: %s",
schemaChangeEvent,
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
}
} else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.LENIENT
|| schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
if (schemaEvolveResponse.hasException()) {
schemaEvolveResponse
.getFailedSchemaChangeEvents()
.forEach(
e ->
LOG.warn(
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
e.f0,
e.f1));
}
} else {
throw new SchemaEvolveException(
schemaChangeEvent,
"Unexpected schema change behavior: " + schemaChangeBehavior);
}
// Update evolved schema changes based on apply results
requestApplyEvolvedSchemaChanges(tableId, finishedSchemaChangeEvents);
finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));
LOG.info(
"Applied schema change event {} to downstream. Among {} total evolved events, {} succeeded, {} failed, and {} ignored.",
schemaChangeEvent,
expectedSchemaChangeEvents.size(),
finishedSchemaChangeEvents.size(),
failedSchemaChangeEvents.size(),
ignoredSchemaChangeEvents.size());
schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
finishedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
ignoredSchemaChangeEvents.size());
}
}
@ -489,16 +438,6 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
}
private void requestApplyOriginalSchemaChanges(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, schemaChangeEvent));
}
private void requestApplyEvolvedSchemaChanges(
TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) {
sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, schemaChangeEvents));
}
private ReleaseUpstreamResponse requestReleaseUpstream()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
@ -538,7 +477,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}
@ -553,7 +492,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

@ -17,15 +17,13 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
@ -151,18 +149,23 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
try {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
}
@ -180,6 +183,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
} catch (Throwable t) {
context.failJob(t);
throw t;
}
}
@ -191,33 +197,29 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest request) {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
} else if (request instanceof ReleaseUpstreamRequest) {
return requestHandler.handleReleaseUpstreamRequest();
} else if (request instanceof GetEvolvedSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
} else if (request instanceof ApplyOriginalSchemaChangeRequest) {
return CompletableFuture.completedFuture(
wrap(
handleApplyOriginalSchemaChangeRequest(
(ApplyOriginalSchemaChangeRequest) request)));
} else if (request instanceof ApplyEvolvedSchemaChangeRequest) {
return CompletableFuture.completedFuture(
wrap(
handleApplyEvolvedSchemaChangeRequest(
(ApplyEvolvedSchemaChangeRequest) request)));
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof RefreshPendingListsRequest) {
return requestHandler.refreshPendingLists();
} else {
throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request);
try {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
} else if (request instanceof ReleaseUpstreamRequest) {
return requestHandler.handleReleaseUpstreamRequest();
} else if (request instanceof GetEvolvedSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof RefreshPendingListsRequest) {
return requestHandler.refreshPendingLists();
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
}
@ -275,6 +277,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
throw new IOException(
"Unrecognized serialization version " + schemaManagerSerializerVersion);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
}
@ -342,18 +347,15 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
}
}
private ApplyOriginalSchemaChangeResponse handleApplyOriginalSchemaChangeRequest(
ApplyOriginalSchemaChangeRequest applyOriginalSchemaChangeRequest) {
schemaManager.applyOriginalSchemaChange(
applyOriginalSchemaChangeRequest.getSchemaChangeEvent());
return new ApplyOriginalSchemaChangeResponse();
// --------------------Only visible for test -----------------
@VisibleForTesting
public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
}
private ApplyEvolvedSchemaChangeResponse handleApplyEvolvedSchemaChangeRequest(
ApplyEvolvedSchemaChangeRequest applyEvolvedSchemaChangeRequest) {
applyEvolvedSchemaChangeRequest
.getSchemaChangeEvent()
.forEach(schemaManager::applyEvolvedSchemaChange);
return new ApplyEvolvedSchemaChangeResponse();
@VisibleForTesting
public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent schemaChangeEvent) {
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
}
}

@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema.coordinator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@ -28,7 +27,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@ -88,7 +87,6 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final List<PendingSchemaChange> pendingSchemaChanges;
private final List<SchemaChangeEvent> finishedSchemaChanges;
private final List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChanges;
private final List<SchemaChangeEvent> ignoredSchemaChanges;
/** Sink writers which have sent flush success events for the request. */
@ -96,6 +94,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
/** Status of the execution of current schema change request. */
private volatile boolean isSchemaChangeApplying;
/** Actual exception if failed to apply schema change. */
private volatile Throwable schemaChangeException;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;
@ -111,7 +111,6 @@ public class SchemaRegistryRequestHandler implements Closeable {
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
this.finishedSchemaChanges = new LinkedList<>();
this.failedSchemaChanges = new LinkedList<>();
this.ignoredSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
@ -129,8 +128,8 @@ public class SchemaRegistryRequestHandler implements Closeable {
private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
isSchemaChangeApplying = true;
schemaChangeException = null;
finishedSchemaChanges.clear();
failedSchemaChanges.clear();
ignoredSchemaChanges.clear();
for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
@ -147,17 +146,27 @@ public class SchemaRegistryRequestHandler implements Closeable {
try {
metadataApplier.applySchemaChange(changeEvent);
LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId);
schemaManager.applyEvolvedSchemaChange(changeEvent);
finishedSchemaChanges.add(changeEvent);
} catch (SchemaEvolveException e) {
} catch (Throwable t) {
LOG.error(
"Failed to apply schema change {} to table {}. Caused by: {}",
changeEvent,
tableId,
e);
failedSchemaChanges.add(Tuple2.of(changeEvent, e));
t);
if (!shouldIgnoreException(t)) {
schemaChangeException = t;
break;
} else {
LOG.warn(
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
changeEvent,
t);
}
}
}
}
PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
startNextSchemaChangeRequest();
@ -254,6 +263,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
() -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents));
Thread.sleep(1000);
if (schemaChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", schemaChangeException);
}
if (isSchemaChangeApplying) {
waitFlushSuccess
.getResponseFuture()
@ -261,12 +274,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
} else {
waitFlushSuccess
.getResponseFuture()
.complete(
wrap(
new ReleaseUpstreamResponse(
finishedSchemaChanges,
failedSchemaChanges,
ignoredSchemaChanges)));
.complete(wrap(new ReleaseUpstreamResponse(finishedSchemaChanges)));
}
}
}
@ -305,16 +313,15 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
if (schemaChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", schemaChangeException);
}
if (isSchemaChangeApplying) {
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
} else {
return CompletableFuture.supplyAsync(
() ->
wrap(
new ReleaseUpstreamResponse(
finishedSchemaChanges,
failedSchemaChanges,
ignoredSchemaChanges)));
() -> wrap(new ReleaseUpstreamResponse(finishedSchemaChanges)));
}
}
@ -437,6 +444,15 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
}
private boolean shouldIgnoreException(Throwable throwable) {
// In IGNORE mode, will never try to apply schema change events
// In EVOLVE and and LENIENT mode, such failure will not be tolerated
// In EXCEPTION mode, an exception will be thrown once captured
return (throwable instanceof UnsupportedSchemaChangeEventException)
&& (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
}
private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private List<SchemaChangeEvent> derivedSchemaChangeEvents;

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import java.util.List;
import java.util.Objects;
/**
* The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved schema
* changes.
*/
public class ApplyEvolvedSchemaChangeRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
/** The sender of the request. */
private final TableId tableId;
/** The schema changes. */
private final List<SchemaChangeEvent> schemaChangeEvent;
public ApplyEvolvedSchemaChangeRequest(
TableId tableId, List<SchemaChangeEvent> schemaChangeEvent) {
this.tableId = tableId;
this.schemaChangeEvent = schemaChangeEvent;
}
public TableId getTableId() {
return tableId;
}
public List<SchemaChangeEvent> getSchemaChangeEvent() {
return schemaChangeEvent;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ApplyEvolvedSchemaChangeRequest)) {
return false;
}
ApplyEvolvedSchemaChangeRequest that = (ApplyEvolvedSchemaChangeRequest) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
}
@Override
public int hashCode() {
return Objects.hash(tableId, schemaChangeEvent);
}
}

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
/**
* The response from {@link SchemaRegistry} to {@link SchemaOperator} to request apply original
* schema changes, the evolved schema changes come from original schema changes with different
* schema evolution strategy.
*/
public class ApplyEvolvedSchemaChangeResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
}

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import java.util.Objects;
/**
* The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply original
* schema changes.
*/
public class ApplyOriginalSchemaChangeRequest implements CoordinationRequest {
private static final long serialVersionUID = 1L;
/** The sender of the request. */
private final TableId tableId;
/** The schema changes. */
private final SchemaChangeEvent schemaChangeEvent;
public ApplyOriginalSchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
this.tableId = tableId;
this.schemaChangeEvent = schemaChangeEvent;
}
public TableId getTableId() {
return tableId;
}
public SchemaChangeEvent getSchemaChangeEvent() {
return schemaChangeEvent;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ApplyOriginalSchemaChangeRequest)) {
return false;
}
ApplyOriginalSchemaChangeRequest that = (ApplyOriginalSchemaChangeRequest) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
}
@Override
public int hashCode() {
return Objects.hash(tableId, schemaChangeEvent);
}
}

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
/**
* The response from {@link SchemaRegistry} to {@link SchemaOperator} to request apply original
* schema changes.
*/
public class ApplyOriginalSchemaChangeResponse implements CoordinationResponse {
private static final long serialVersionUID = 1L;
}

@ -17,7 +17,6 @@
package org.apache.flink.cdc.runtime.operators.schema.event;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
@ -25,7 +24,6 @@ import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link
@ -41,50 +39,19 @@ public class ReleaseUpstreamResponse implements CoordinationResponse {
*/
private final List<SchemaChangeEvent> finishedSchemaChangeEvents;
private final List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents;
private final List<SchemaChangeEvent> ignoredSchemaChangeEvents;
public ReleaseUpstreamResponse(
List<SchemaChangeEvent> finishedSchemaChangeEvents,
List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents,
List<SchemaChangeEvent> ignoredSchemaChangeEvents) {
public ReleaseUpstreamResponse(List<SchemaChangeEvent> finishedSchemaChangeEvents) {
this.finishedSchemaChangeEvents = finishedSchemaChangeEvents;
this.failedSchemaChangeEvents = failedSchemaChangeEvents;
this.ignoredSchemaChangeEvents = ignoredSchemaChangeEvents;
}
public List<SchemaChangeEvent> getFinishedSchemaChangeEvents() {
return finishedSchemaChangeEvents;
}
public List<Tuple2<SchemaChangeEvent, Throwable>> getFailedSchemaChangeEvents() {
return failedSchemaChangeEvents;
}
public List<SchemaChangeEvent> getIgnoredSchemaChangeEvents() {
return ignoredSchemaChangeEvents;
}
public String getPrintableFailedSchemaChangeEvents() {
return failedSchemaChangeEvents.stream()
.map(e -> "Failed to apply " + e.f0 + ". Caused by: " + e.f1)
.collect(Collectors.joining("\n"));
}
public boolean hasException() {
return !failedSchemaChangeEvents.isEmpty();
}
@Override
public String toString() {
return "ReleaseUpstreamResponse{"
+ "finishedSchemaChangeEvents="
+ finishedSchemaChangeEvents
+ ", failedSchemaChangeEvents="
+ failedSchemaChangeEvents
+ ", ignoredSchemaChangeEvents="
+ ignoredSchemaChangeEvents
+ '}';
}
@ -97,14 +64,11 @@ public class ReleaseUpstreamResponse implements CoordinationResponse {
return false;
}
ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object;
return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents)
&& Objects.equals(failedSchemaChangeEvents, that.failedSchemaChangeEvents)
&& Objects.equals(ignoredSchemaChangeEvents, that.ignoredSchemaChangeEvents);
return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents);
}
@Override
public int hashCode() {
return Objects.hash(
finishedSchemaChangeEvents, failedSchemaChangeEvents, ignoredSchemaChangeEvents);
return Objects.hash(finishedSchemaChangeEvents);
}
}

@ -1040,6 +1040,8 @@ public class SchemaEvolveTest {
Column.physicalColumn(
"height", DOUBLE, "Height data")))));
Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents))
.cause()
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to apply schema change");
harness.close();

@ -25,8 +25,6 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
@ -56,7 +54,6 @@ import org.apache.flink.util.SerializedValue;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Set;
@ -166,14 +163,10 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
}
public void registerTableSchema(TableId tableId, Schema schema) {
schemaRegistry.handleCoordinationRequest(
new ApplyOriginalSchemaChangeRequest(
tableId, new CreateTableEvent(tableId, schema)));
schemaRegistry.handleApplyOriginalSchemaChangeEvent(new CreateTableEvent(tableId, schema));
schemaRegistry.handleCoordinationRequest(
new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema)));
schemaRegistry.handleCoordinationRequest(
new ApplyEvolvedSchemaChangeRequest(
tableId, Collections.singletonList(new CreateTableEvent(tableId, schema))));
schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema));
}
public Schema getLatestOriginalSchema(TableId tableId) throws Exception {

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import java.time.Duration;
@ -83,8 +84,7 @@ public class CollectingMetadataApplier implements MetadataApplier {
try {
Thread.sleep(duration.toMillis());
if (errorsOnEventTypes.contains(schemaChangeEvent.getType())) {
throw new SchemaEvolveException(
schemaChangeEvent, "Dummy metadata apply exception for test.", null);
throw new UnsupportedSchemaChangeEventException(schemaChangeEvent);
}
} catch (InterruptedException ignore) {
// Ignores sleep interruption

Loading…
Cancel
Save