|
|
|
@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
|
|
|
|
|
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
|
|
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
|
|
|
|
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
|
|
|
|
|
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
|
|
|
|
|
|
|
|
|
|
import com.ververica.cdc.common.annotation.Internal;
|
|
|
|
|
import com.ververica.cdc.common.event.Event;
|
|
|
|
@ -62,7 +61,7 @@ public class DataSinkTranslator {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (sink instanceof TwoPhaseCommittingSink) {
|
|
|
|
|
addCommittingTopology(sink, stream);
|
|
|
|
|
addCommittingTopology(sink, stream, schemaOperatorID);
|
|
|
|
|
} else {
|
|
|
|
|
input.transform(
|
|
|
|
|
WRITER_OPERATOR_NAME,
|
|
|
|
@ -71,7 +70,8 @@ public class DataSinkTranslator {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <CommT> void addCommittingTopology(Sink<Event> sink, DataStream<Event> inputStream) {
|
|
|
|
|
private <CommT> void addCommittingTopology(
|
|
|
|
|
Sink<Event> sink, DataStream<Event> inputStream, OperatorID schemaOperatorID) {
|
|
|
|
|
TwoPhaseCommittingSink<Event, CommT> committingSink =
|
|
|
|
|
(TwoPhaseCommittingSink<Event, CommT>) sink;
|
|
|
|
|
TypeInformation<CommittableMessage<CommT>> typeInformation =
|
|
|
|
@ -80,7 +80,7 @@ public class DataSinkTranslator {
|
|
|
|
|
inputStream.transform(
|
|
|
|
|
WRITER_OPERATOR_NAME,
|
|
|
|
|
typeInformation,
|
|
|
|
|
new SinkWriterOperatorFactory<>(sink));
|
|
|
|
|
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
|
|
|
|
|
|
|
|
|
|
DataStream<CommittableMessage<CommT>> preCommitted = written;
|
|
|
|
|
if (sink instanceof WithPreCommitTopology) {
|
|
|
|
|