[debezium] Rename DebeziumState to DebeziumOffset

release-1.2
Jark Wu 4 years ago
parent 2dfb59f3c7
commit 799b5d36af
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -62,9 +62,9 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
private final ErrorReporter errorReporter; private final ErrorReporter errorReporter;
private final DebeziumState debeziumState; private final DebeziumOffset debeziumOffset;
private final DebeziumStateSerializer stateSerializer; private final DebeziumOffsetSerializer stateSerializer;
private boolean isInDbSnapshotPhase; private boolean isInDbSnapshotPhase;
@ -83,8 +83,8 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
this.isInDbSnapshotPhase = isInDbSnapshotPhase; this.isInDbSnapshotPhase = isInDbSnapshotPhase;
this.debeziumCollector = new DebeziumCollector(); this.debeziumCollector = new DebeziumCollector();
this.errorReporter = errorReporter; this.errorReporter = errorReporter;
this.debeziumState = new DebeziumState(); this.debeziumOffset = new DebeziumOffset();
this.stateSerializer = new DebeziumStateSerializer(); this.stateSerializer = new DebeziumOffsetSerializer();
} }
@Override @Override
@ -159,8 +159,8 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
sourceContext.collect(record); sourceContext.collect(record);
} }
// update offset to state // update offset to state
debeziumState.setSourcePartition(sourcePartition); debeziumOffset.setSourcePartition(sourcePartition);
debeziumState.setSourceOffset(sourceOffset); debeziumOffset.setSourceOffset(sourceOffset);
} }
/** /**
@ -171,11 +171,11 @@ public class DebeziumChangeConsumer<T> implements DebeziumEngine.ChangeConsumer<
public byte[] snapshotCurrentState() throws Exception { public byte[] snapshotCurrentState() throws Exception {
// this method assumes that the checkpoint lock is held // this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock); assert Thread.holdsLock(checkpointLock);
if (debeziumState.sourceOffset == null || debeziumState.sourcePartition == null) { if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
return null; return null;
} }
return stateSerializer.serialize(debeziumState); return stateSerializer.serialize(debeziumOffset);
} }
private class DebeziumCollector implements Collector<T> { private class DebeziumCollector implements Collector<T> {

@ -40,7 +40,7 @@ import java.util.Map;
* "table_name"} and the sourceOffset as a Long containing the timestamp of the row. * "table_name"} and the sourceOffset as a Long containing the timestamp of the row.
*/ */
@Internal @Internal
public class DebeziumState { public class DebeziumOffset {
public Map<String, ?> sourcePartition; public Map<String, ?> sourcePartition;
public Map<String, ?> sourceOffset; public Map<String, ?> sourceOffset;
@ -55,7 +55,7 @@ public class DebeziumState {
@Override @Override
public String toString() { public String toString() {
return "DebeziumConsumerState{" + return "DebeziumOffset{" +
"sourcePartition=" + sourcePartition + "sourcePartition=" + sourcePartition +
", sourceOffset=" + sourceOffset + ", sourceOffset=" + sourceOffset +
'}'; '}';

@ -25,19 +25,19 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import java.io.IOException; import java.io.IOException;
/** /**
* Serializer implementation for a {@link DebeziumState}. * Serializer implementation for a {@link DebeziumOffset}.
*/ */
@Internal @Internal
public class DebeziumStateSerializer { public class DebeziumOffsetSerializer {
public byte[] serialize(DebeziumState debeziumState) throws IOException { public byte[] serialize(DebeziumOffset debeziumOffset) throws IOException {
// we currently use JSON serialization for simplification, as the state is very small. // we currently use JSON serialization for simplification, as the state is very small.
// we can improve this in the future if needed // we can improve this in the future if needed
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsBytes(debeziumState); return objectMapper.writeValueAsBytes(debeziumOffset);
} }
public DebeziumState deserialize(byte[] bytes) throws IOException { public DebeziumOffset deserialize(byte[] bytes) throws IOException {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(bytes, DebeziumState.class); return objectMapper.readValue(bytes, DebeziumOffset.class);
} }
} }

@ -77,10 +77,10 @@ public class FlinkOffsetBackingStore implements OffsetBackingStore {
} }
String stateJson = (String) conf.get(OFFSET_STATE_VALUE); String stateJson = (String) conf.get(OFFSET_STATE_VALUE);
DebeziumStateSerializer serializer = new DebeziumStateSerializer(); DebeziumOffsetSerializer serializer = new DebeziumOffsetSerializer();
DebeziumState debeziumState; DebeziumOffset debeziumOffset;
try { try {
debeziumState = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8)); debeziumOffset = serializer.deserialize(stateJson.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) { } catch (IOException e) {
LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e); LOG.error("Can't deserialize debezium offset state from JSON: " + stateJson, e);
throw new RuntimeException(e); throw new RuntimeException(e);
@ -100,7 +100,7 @@ public class FlinkOffsetBackingStore implements OffsetBackingStore {
keyConverter, keyConverter,
valueConverter); valueConverter);
offsetWriter.offset(debeziumState.sourcePartition, debeziumState.sourceOffset); offsetWriter.offset(debeziumOffset.sourcePartition, debeziumOffset.sourceOffset);
// flush immediately // flush immediately
if (!offsetWriter.beginFlush()) { if (!offsetWriter.beginFlush()) {
@ -122,8 +122,8 @@ public class FlinkOffsetBackingStore implements OffsetBackingStore {
try { try {
flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS); flushFuture.get(FLUSH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
LOG.info("Flush offsets successfully, partition: {}, offsets: {}", LOG.info("Flush offsets successfully, partition: {}, offsets: {}",
debeziumState.sourcePartition, debeziumOffset.sourcePartition,
debeziumState.sourceOffset); debeziumOffset.sourceOffset);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Flush offsets interrupted, cancelling.", e); LOG.warn("Flush offsets interrupted, cancelling.", e);
offsetWriter.cancelFlush(); offsetWriter.cancelFlush();

Loading…
Cancel
Save