[hotfix][cdc-runtime] Keep upstream pending requests in order to avoid checkpoint hanging

pull/3582/head
yuxiqian 5 months ago committed by Leonard Xu
parent debd43cdd7
commit 9816848c4b

@ -338,6 +338,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment {
// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
}
List<String> expectedTmEvents =

@ -349,6 +349,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment {
// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");
}
List<String> expectedTmEvents =

@ -50,6 +50,7 @@ import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@ -440,7 +441,8 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
sendRequestToCoordinator(
new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));
if (response.isRegistryBusy()) {
if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
LOG.info(
@ -609,4 +611,10 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
}
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement
// is guaranteed not to be mixed together.
}
}

@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -77,7 +76,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
/**
* Atomic flag indicating if current RequestHandler could accept more schema changes for now.
*/
private final AtomicReference<RequestStatus> schemaChangeStatus;
private volatile RequestStatus schemaChangeStatus;
private final List<Integer> pendingSubTaskIds;
private final Object schemaChangeRequestLock;
private volatile Throwable currentChangeException;
private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents;
@ -109,7 +111,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
this.currentFinishedSchemaChanges = new ArrayList<>();
this.currentIgnoredSchemaChanges = new ArrayList<>();
this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE);
this.schemaChangeStatus = RequestStatus.IDLE;
this.pendingSubTaskIds = new ArrayList<>();
this.schemaChangeRequestLock = new Object();
}
/**
@ -119,54 +124,86 @@ public class SchemaRegistryRequestHandler implements Closeable {
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
"Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
// If this schema change event has been requested by another subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
clearCurrentSchemaChangeRequest();
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not "
+ schemaChangeStatus.get());
// We use requester subTask ID as the pending ticket, because there will be at most 1 schema
// change requests simultaneously from each subTask
int requestSubTaskId = request.getSubTaskId();
synchronized (schemaChangeRequestLock) {
// Make sure we handle the first request in the pending list to avoid out-of-order
// waiting and blocks checkpointing mechanism.
if (schemaChangeStatus == RequestStatus.IDLE) {
if (pendingSubTaskIds.isEmpty()) {
LOG.info(
"Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",
request.getSchemaChangeEvent(),
request.getTableId().toString(),
requestSubTaskId);
} else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
LOG.info(
"Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",
request.getSchemaChangeEvent(),
request.getTableId().toString(),
requestSubTaskId);
pendingSubTaskIds.remove(0);
} else {
LOG.info(
"Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",
request.getSchemaChangeEvent(),
request.getTableId().toString(),
requestSubTaskId,
pendingSubTaskIds);
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
SchemaChangeEvent event = request.getSchemaChangeEvent();
// If this schema change event has been requested by another subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
clearCurrentSchemaChangeRequest();
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
// If this schema change event is filtered out by LENIENT mode or merging table
// route strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
clearCurrentSchemaChangeRequest();
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
// If this schema change event is filtered out by LENIENT mode or merging table route
// strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);
clearCurrentSchemaChangeRequest();
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not "
+ schemaChangeStatus.get());
"SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");
// This request has been accepted.
schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
"Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
request,
requestSubTaskId,
pendingSubTaskIds);
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"Schema Registry is busy processing a schema change request, could not handle request {} for now.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
}
}
@ -213,9 +250,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
}
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
schemaChangeStatus == RequestStatus.APPLYING,
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
+ schemaChangeStatus.get());
+ schemaChangeStatus);
schemaChangeStatus = RequestStatus.FINISHED;
LOG.info(
"SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
currentDerivedSchemaChangeEvents);
@ -248,10 +286,11 @@ public class SchemaRegistryRequestHandler implements Closeable {
}
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ schemaChangeStatus);
schemaChangeStatus = RequestStatus.APPLYING;
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
@ -262,9 +301,10 @@ public class SchemaRegistryRequestHandler implements Closeable {
public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
Preconditions.checkState(
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
if (schemaChangeStatus == RequestStatus.FINISHED) {
schemaChangeStatus = RequestStatus.IDLE;
LOG.info(
"SchemaChangeStatus switched from FINISHED to IDLE for request {}",
currentDerivedSchemaChangeEvents);

@ -36,10 +36,14 @@ public class SchemaChangeRequest implements CoordinationRequest {
private final TableId tableId;
/** The schema changes. */
private final SchemaChangeEvent schemaChangeEvent;
/** The ID of subTask that initiated the request. */
private final int subTaskId;
public SchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) {
public SchemaChangeRequest(
TableId tableId, SchemaChangeEvent schemaChangeEvent, int subTaskId) {
this.tableId = tableId;
this.schemaChangeEvent = schemaChangeEvent;
this.subTaskId = subTaskId;
}
public TableId getTableId() {
@ -50,6 +54,10 @@ public class SchemaChangeRequest implements CoordinationRequest {
return schemaChangeEvent;
}
public int getSubTaskId() {
return subTaskId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -60,11 +68,12 @@ public class SchemaChangeRequest implements CoordinationRequest {
}
SchemaChangeRequest that = (SchemaChangeRequest) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(schemaChangeEvent, that.schemaChangeEvent);
&& Objects.equals(schemaChangeEvent, that.schemaChangeEvent)
&& subTaskId == that.subTaskId;
}
@Override
public int hashCode() {
return Objects.hash(tableId, schemaChangeEvent);
return Objects.hash(tableId, schemaChangeEvent, subTaskId);
}
}

@ -31,6 +31,7 @@ import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@ -147,4 +148,10 @@ public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEve
}
});
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement
// is guaranteed not to be mixed together.
}
}

@ -164,7 +164,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
public void registerTableSchema(TableId tableId, Schema schema) {
schemaRegistry.handleCoordinationRequest(
new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema)));
new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema), 0));
schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema));
}

Loading…
Cancel
Save