diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index c8056cc7a..bdb384cbb 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -36,9 +37,11 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.lang.reflect.Constructor; import java.lang.reflect.Field; @@ -123,6 +126,26 @@ public class DataSinkWriterOperator extends AbstractStreamOperator>>getFlinkWriterOperator() + .snapshotState(context); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + this.>>getFlinkWriterOperator() + .processWatermark(mark); + } + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + super.processWatermarkStatus(watermarkStatus); + this.>>getFlinkWriterOperator() + .processWatermarkStatus(watermarkStatus); + } + @Override public void processElement(StreamRecord element) throws Exception { Event event = element.getValue();