[FLINK-35255][cdc][runtime] DataSinkWriterOperator adds overrides for the snapshotState and processWatermark methods (#3279)

pull/3281/head
yanghuaiGit 9 months ago committed by GitHub
parent a83d0e5d9e
commit 759b294496
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@ -36,9 +37,11 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
@ -123,6 +126,26 @@ public class DataSinkWriterOperator<CommT> extends AbstractStreamOperator<Commit
.initializeState(context);
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
.snapshotState(context);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
.processWatermark(mark);
}
@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
super.processWatermarkStatus(watermarkStatus);
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator()
.processWatermarkStatus(watermarkStatus);
}
@Override
public void processElement(StreamRecord<Event> element) throws Exception {
Event event = element.getValue();

Loading…
Cancel
Save