From 799b5d36afa6514f62f9c946e538522513efe042 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Wed, 24 Feb 2021 19:37:54 +0800 Subject: [PATCH] [debezium] Rename DebeziumState to DebeziumOffset --- .../internal/DebeziumChangeConsumer.java | 16 ++++++++-------- .../{DebeziumState.java => DebeziumOffset.java} | 4 ++-- ...alizer.java => DebeziumOffsetSerializer.java} | 12 ++++++------ .../internal/FlinkOffsetBackingStore.java | 12 ++++++------ 4 files changed, 22 insertions(+), 22 deletions(-) rename flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/{DebeziumState.java => DebeziumOffset.java} (97%) rename flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/{DebeziumStateSerializer.java => DebeziumOffsetSerializer.java} (77%) diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java index 466621fcc..d300477b0 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumChangeConsumer.java @@ -62,9 +62,9 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< private final ErrorReporter errorReporter; - private final DebeziumState debeziumState; + private final DebeziumOffset debeziumOffset; - private final DebeziumStateSerializer stateSerializer; + private final DebeziumOffsetSerializer stateSerializer; private boolean isInDbSnapshotPhase; @@ -83,8 +83,8 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< this.isInDbSnapshotPhase = isInDbSnapshotPhase; this.debeziumCollector = new DebeziumCollector(); this.errorReporter = errorReporter; - this.debeziumState = new DebeziumState(); - this.stateSerializer = new DebeziumStateSerializer(); + this.debeziumOffset = new DebeziumOffset(); + this.stateSerializer = new DebeziumOffsetSerializer(); } @Override @@ -159,8 +159,8 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< sourceContext.collect(record); } // update offset to state - debeziumState.setSourcePartition(sourcePartition); - debeziumState.setSourceOffset(sourceOffset); + debeziumOffset.setSourcePartition(sourcePartition); + debeziumOffset.setSourceOffset(sourceOffset); } /** @@ -171,11 +171,11 @@ public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer< public byte[] snapshotCurrentState() throws Exception { // this method assumes that the checkpoint lock is held assert Thread.holdsLock(checkpointLock); - if (debeziumState.sourceOffset == null || debeziumState.sourcePartition == null) { + if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) { return null; } - return stateSerializer.serialize(debeziumState); + return stateSerializer.serialize(debeziumOffset); } private class DebeziumCollector implements Collector { diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumState.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumOffset.java similarity index 97% rename from flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumState.java rename to flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumOffset.java index 45eeb6813..cc6d8779b 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumState.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumOffset.java @@ -40,7 +40,7 @@ import java.util.Map; * "table_name"} and the sourceOffset as a Long containing the timestamp of the row. */ @Internal -public class DebeziumState { +public class DebeziumOffset { public Map sourcePartition; public Map sourceOffset; @@ -55,7 +55,7 @@ public class DebeziumState { @Override public String toString() { - return "DebeziumConsumerState{" + + return "DebeziumOffset{" + "sourcePartition=" + sourcePartition + ", sourceOffset=" + sourceOffset + '}'; diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumStateSerializer.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumOffsetSerializer.java similarity index 77% rename from flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumStateSerializer.java rename to flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumOffsetSerializer.java index d4a9cf512..4c5f9225e 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumStateSerializer.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/DebeziumOffsetSerializer.java @@ -25,19 +25,19 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap import java.io.IOException; /** - * Serializer implementation for a {@link DebeziumState}. + * Serializer implementation for a {@link DebeziumOffset}. */ @Internal -public class DebeziumStateSerializer { - public byte[] serialize(DebeziumState debeziumState) throws IOException { +public class DebeziumOffsetSerializer { + public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException { // we currently use JSON serialization for simplification, as the state is very small. // we can improve this in the future if needed ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.writeValueAsBytes(debeziumState); + return objectMapper.writeValueAsBytes(debeziumOffset); } - public DebeziumState deserialize(byte[] bytes) throws IOException { + public DebeziumOffset deserialize(byte[] bytes) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(bytes, DebeziumState.class); + return objectMapper.readValue(bytes, DebeziumOffset.class); } } diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkOffsetBackingStore.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkOffsetBackingStore.java index 85a942a8a..b7fb61c68 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkOffsetBackingStore.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkOffsetBackingStore.java @@ -77,10 +77,10 @@ public class FlinkOffsetBackingStore implements OffsetBackingStore { } String stateJson = (String) conf.get(OFFSET_STATE_VALUE); - DebeziumStateSerializer serializer = new DebeziumStateSerializer(); - DebeziumState debeziumState; + DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer(); + DebeziumOffset debeziumOffset; try { - debeziumState = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8)); + debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8)); } catch (IOException e) { LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e); throw new RuntimeException(e); @@ -100,7 +100,7 @@ public class FlinkOffsetBackingStore implements OffsetBackingStore { keyConverter, valueConverter); - offsetWriter.offset(debeziumState.sourcePartition, debeziumState.sourceOffset); + offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset); // flush immediately if (!offsetWriter.beginFlush()) { @@ -122,8 +122,8 @@ public class FlinkOffsetBackingStore implements OffsetBackingStore { try { flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS); LOG.info("Flush offsets successfully, partition: {}, offsets: {}", - debeziumState.sourcePartition, - debeziumState.sourceOffset); + debeziumOffset.sourcePartition, + debeziumOffset.sourceOffset); } catch (InterruptedException e) { LOG.warn("Flush offsets interrupted, cancelling.", e); offsetWriter.cancelFlush();