[mysql] Rename BinlogPosition to BinlogOffset

pull/266/head
Jark Wu 4 years ago committed by Leonard Xu
parent 5786506394
commit 6e5e14dc34

@ -18,7 +18,7 @@
package com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlOffsetContext;
@ -83,7 +83,7 @@ public class SignalEventDispatcher {
}
public void dispatchWatermarkEvent(
MySQLSplit mySQLSplit, BinlogPosition watermark, WatermarkKind watermarkKind)
MySQLSplit mySQLSplit, BinlogOffset watermark, WatermarkKind watermarkKind)
throws InterruptedException {
SourceRecord sourceRecord =
new SourceRecord(
@ -113,15 +113,15 @@ public class SignalEventDispatcher {
String databaseName,
String tableName,
String splitId,
BinlogPosition binlogPosition,
BinlogOffset binlogOffset,
WatermarkKind watermarkKind) {
Struct result = new Struct(signalEventValueSchema);
result.put(DATABASE_NAME, databaseName);
result.put(TABLE_NAME, tableName);
result.put(SPLIT_ID_KEY, splitId);
result.put(WATERMARK_KIND, watermarkKind.toString());
result.put(BINLOG_FILENAME_OFFSET_KEY, binlogPosition.getFilename());
result.put(BINLOG_POSITION_OFFSET_KEY, binlogPosition.getPosition());
result.put(BINLOG_FILENAME_OFFSET_KEY, binlogOffset.getFilename());
result.put(BINLOG_POSITION_OFFSET_KEY, binlogOffset.getPosition());
return result;
}

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.mysql.debezium.offset;
import java.io.Serializable;
/** The offset of MySQL binlog, can be MySQL binlog position or MySQL GTID set. */
public interface MySQLOffset extends Serializable {}

@ -23,9 +23,9 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.MySQLBinlogSplitReadTask;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.connector.base.ChangeEventQueue;
@ -69,9 +69,9 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
private MySQLBinlogSplitReadTask binlogSplitReadTask;
private MySQLSplit currentTableSplit;
// tableId -> List[splitKeyStart, splitKeyEnd, splitHighWatermark]
private Map<TableId, List<Tuple3<Object[], Object[], BinlogPosition>>> finishedSplitsInfo;
private Map<TableId, List<Tuple3<Object[], Object[], BinlogOffset>>> finishedSplitsInfo;
// tableId -> the max splitHighWatermark
private Map<TableId, BinlogPosition> maxSplitHighWatermarkMap;
private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
this.statefulTaskContext = statefulTaskContext;
@ -183,7 +183,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
private boolean shouldEmit(SourceRecord sourceRecord) {
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogPosition position = getBinlogPosition(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
// aligned, all snapshot splits of the table has reached max highWatermark
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
@ -193,7 +193,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
currentTableSplit.getSplitBoundaryType(),
sourceRecord,
statefulTaskContext.getSchemaNameAdjuster());
for (Tuple3<Object[], Object[], BinlogPosition> splitInfo :
for (Tuple3<Object[], Object[], BinlogOffset> splitInfo :
finishedSplitsInfo.get(tableId)) {
if (RecordUtils.splitKeyRangeContains(key, splitInfo.f0, splitInfo.f1)
&& position.isAtOrBefore(splitInfo.f2)) {
@ -209,22 +209,22 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySQLSpli
}
private void configureFilter() {
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
currentTableSplit.getFinishedSplitsInfo();
Map<TableId, List<Tuple3<Object[], Object[], BinlogPosition>>> splitsInfoMap =
Map<TableId, List<Tuple3<Object[], Object[], BinlogOffset>>> splitsInfoMap =
new HashMap<>();
Map<TableId, BinlogPosition> tableIdBinlogPositionMap = new HashMap<>();
Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
for (Tuple5<TableId, String, Object[], Object[], BinlogPosition> finishedSplitInfo :
for (Tuple5<TableId, String, Object[], Object[], BinlogOffset> finishedSplitInfo :
finishedSplitsInfo) {
TableId tableId = finishedSplitInfo.f0;
List<Tuple3<Object[], Object[], BinlogPosition>> list =
List<Tuple3<Object[], Object[], BinlogOffset>> list =
splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
list.add(Tuple3.of(finishedSplitInfo.f2, finishedSplitInfo.f3, finishedSplitInfo.f4));
splitsInfoMap.put(tableId, list);
BinlogPosition highWatermark = finishedSplitInfo.f4;
BinlogPosition maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
BinlogOffset highWatermark = finishedSplitInfo.f4;
BinlogOffset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
if (maxHighWatermark == null || highWatermark.isAtOrBefore(maxHighWatermark)) {
tableIdBinlogPositionMap.put(tableId, highWatermark);
}

@ -20,10 +20,10 @@ package com.alibaba.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.MySQLBinlogSplitReadTask;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.MySQLSnapshotSplitReadTask;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitState;
import com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
@ -209,22 +209,22 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySQLSp
public class SnapshotSplitChangeEventSourceContextImpl
implements ChangeEventSource.ChangeEventSourceContext {
private BinlogPosition lowWatermark;
private BinlogPosition highWatermark;
private BinlogOffset lowWatermark;
private BinlogOffset highWatermark;
public BinlogPosition getLowWatermark() {
public BinlogOffset getLowWatermark() {
return lowWatermark;
}
public void setLowWatermark(BinlogPosition lowWatermark) {
public void setLowWatermark(BinlogOffset lowWatermark) {
this.lowWatermark = lowWatermark;
}
public BinlogPosition getHighWatermark() {
public BinlogOffset getHighWatermark() {
return highWatermark;
}
public void setHighWatermark(BinlogPosition highWatermark) {
public void setHighWatermark(BinlogOffset highWatermark) {
this.highWatermark = highWatermark;
}

@ -21,6 +21,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.debezium.task;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitKind;
import com.github.shyiko.mysql.binlog.event.Event;
@ -93,26 +94,21 @@ public class MySQLBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
super.handleEvent(event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition
currentBinlogPosition =
new com.alibaba.ververica.cdc.connectors.mysql.debezium.offset
.BinlogPosition(
final BinlogOffset currentBinlogOffset =
new BinlogOffset(
offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
Long.parseLong(
offsetContext
.getOffset()
.get(BINLOG_FILENAME_OFFSET_KEY)
.toString(),
Long.parseLong(
offsetContext
.getOffset()
.get(BINLOG_POSITION_OFFSET_KEY)
.toString()));
.get(BINLOG_POSITION_OFFSET_KEY)
.toString()));
// reach the high watermark, the binlog reader should finished
if (currentBinlogPosition.isAtOrBefore(mySQLSplit.getHighWatermark())) {
if (currentBinlogOffset.isAtOrBefore(mySQLSplit.getHighWatermark())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
mySQLSplit,
currentBinlogPosition,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
logger.error("Send signal event error.", e);

@ -20,8 +20,8 @@ package com.alibaba.ververica.cdc.connectors.mysql.debezium.task;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils;
import io.debezium.DebeziumException;
@ -129,7 +129,7 @@ public class MySQLSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
final BinlogPosition lowWatermark = getCurrentBinlogPosition();
final BinlogOffset lowWatermark = getCurrentBinlogPosition();
LOGGER.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
@ -149,7 +149,7 @@ public class MySQLSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
LOGGER.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, mySQLSplit.getTableId());
final BinlogPosition highWatermark = getCurrentBinlogPosition();
final BinlogOffset highWatermark = getCurrentBinlogPosition();
LOGGER.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
@ -276,9 +276,9 @@ public class MySQLSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
return Threads.timer(clock, LOG_INTERVAL);
}
private BinlogPosition getCurrentBinlogPosition() {
AtomicReference<BinlogPosition> currentBinlogPosition =
new AtomicReference<>(BinlogPosition.INITIAL_OFFSET);
private BinlogOffset getCurrentBinlogPosition() {
AtomicReference<BinlogOffset> currentBinlogPosition =
new AtomicReference<>(BinlogOffset.INITIAL_OFFSET);
try {
jdbcConnection.setAutoCommit(false);
String showMasterStmt = "SHOW MASTER STATUS";
@ -289,7 +289,7 @@ public class MySQLSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
String binlogFilename = rs.getString(1);
long binlogPosition = rs.getLong(2);
currentBinlogPosition.set(
new BinlogPosition(binlogFilename, binlogPosition));
new BinlogOffset(binlogFilename, binlogPosition));
LOGGER.info(
"Read binlog '{}' at position '{}'",
binlogFilename,

@ -33,11 +33,11 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.types.logical.RowType;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.assigner.MySQLSnapshotSplitAssigner;
import com.alibaba.ververica.cdc.connectors.mysql.source.enumerator.MySQLSourceEnumState;
import com.alibaba.ververica.cdc.connectors.mysql.source.enumerator.MySQLSourceEnumStateSerializer;
import com.alibaba.ververica.cdc.connectors.mysql.source.enumerator.MySQLSourceEnumerator;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.reader.MySQLRecordEmitter;
import com.alibaba.ververica.cdc.connectors.mysql.source.reader.MySQLSourceReader;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
@ -87,8 +87,8 @@ public class MySQLParallelSource<T>
@Override
public SourceReader<T, MySQLSplit> createReader(SourceReaderContext readerContext)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, BinlogPosition>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple2<T, BinlogOffset>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
// set the server id for reader
Configuration readerConfiguration = config.clone();

@ -56,8 +56,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.toDebeziumConfig;
import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isOptimizedKeyType;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.RecordUtils.rowToArray;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.StatementUtils.buildMaxSplitKeyQuery;

@ -20,7 +20,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.enumerator;
import org.apache.flink.api.java.tuple.Tuple2;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitReader;
import io.debezium.relational.TableId;
@ -52,7 +52,7 @@ public class MySQLSourceEnumState {
* The finished (snapshot) splits that the {@link MySQLSourceEnumerator} has received from
* {@link MySQLSplitReader}s.
*/
private final Map<Integer, List<Tuple2<String, BinlogPosition>>> finishedSnapshotSplits;
private final Map<Integer, List<Tuple2<String, BinlogOffset>>> finishedSnapshotSplits;
/**
* The splits are frequently serialized into checkpoints. Caching the byte representation makes
@ -64,7 +64,7 @@ public class MySQLSourceEnumState {
Collection<MySQLSplit> remainingSplits,
Collection<TableId> alreadyProcessedTables,
Map<Integer, List<MySQLSplit>> assignedSplits,
Map<Integer, List<Tuple2<String, BinlogPosition>>> finishedSnapshotSplits) {
Map<Integer, List<Tuple2<String, BinlogOffset>>> finishedSnapshotSplits) {
this.remainingSplits = remainingSplits;
this.alreadyProcessedTables = alreadyProcessedTables;
this.assignedSplits = assignedSplits;
@ -83,7 +83,7 @@ public class MySQLSourceEnumState {
return assignedSplits;
}
public Map<Integer, List<Tuple2<String, BinlogPosition>>> getFinishedSnapshotSplits() {
public Map<Integer, List<Tuple2<String, BinlogOffset>>> getFinishedSnapshotSplits() {
return finishedSnapshotSplits;
}
}

@ -23,7 +23,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import io.debezium.relational.TableId;
@ -97,59 +97,59 @@ public class MySQLSourceEnumStateSerializer
final Collection<MySQLSplit> splits = readMySQLSplits(in);
final Collection<TableId> tableIds = readTableIds(in);
final Map<Integer, List<MySQLSplit>> assignedSplits = readAssignedSplits(in);
final Map<Integer, List<Tuple2<String, BinlogPosition>>> finishedSnapshotSplits =
final Map<Integer, List<Tuple2<String, BinlogOffset>>> finishedSnapshotSplits =
readFinishedSnapshotSplits(in);
in.releaseArrays();
return new MySQLSourceEnumState(splits, tableIds, assignedSplits, finishedSnapshotSplits);
}
private void writeFinishedSnapshotSplits(
Map<Integer, List<Tuple2<String, BinlogPosition>>> finishedSnapshotSplits,
Map<Integer, List<Tuple2<String, BinlogOffset>>> finishedSnapshotSplits,
DataOutputSerializer out)
throws IOException {
final int size = finishedSnapshotSplits.size();
out.writeInt(size);
for (Map.Entry<Integer, List<Tuple2<String, BinlogPosition>>> entry :
for (Map.Entry<Integer, List<Tuple2<String, BinlogOffset>>> entry :
finishedSnapshotSplits.entrySet()) {
int subtaskId = entry.getKey();
out.writeInt(subtaskId);
List<Tuple2<String, BinlogPosition>> splitsInfo = entry.getValue();
List<Tuple2<String, BinlogOffset>> splitsInfo = entry.getValue();
writeSplitsInfo(splitsInfo, out);
}
}
private Map<Integer, List<Tuple2<String, BinlogPosition>>> readFinishedSnapshotSplits(
private Map<Integer, List<Tuple2<String, BinlogOffset>>> readFinishedSnapshotSplits(
DataInputDeserializer in) throws IOException {
Map<Integer, List<Tuple2<String, BinlogPosition>>> finishedSnapshotSplits = new HashMap<>();
Map<Integer, List<Tuple2<String, BinlogOffset>>> finishedSnapshotSplits = new HashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
int subtaskId = in.readInt();
List<Tuple2<String, BinlogPosition>> splitsInfo = readSplitsInfo(in);
List<Tuple2<String, BinlogOffset>> splitsInfo = readSplitsInfo(in);
finishedSnapshotSplits.put(subtaskId, splitsInfo);
}
return finishedSnapshotSplits;
}
private void writeSplitsInfo(
List<Tuple2<String, BinlogPosition>> splitsInfo, DataOutputSerializer out)
List<Tuple2<String, BinlogOffset>> splitsInfo, DataOutputSerializer out)
throws IOException {
final int size = splitsInfo.size();
out.writeInt(size);
for (int i = 0; i < size; i++) {
Tuple2<String, BinlogPosition> splitInfo = splitsInfo.get(i);
Tuple2<String, BinlogOffset> splitInfo = splitsInfo.get(i);
out.writeUTF(splitInfo.f0);
writeBinlogPosition(splitInfo.f1, out);
}
}
private List<Tuple2<String, BinlogPosition>> readSplitsInfo(DataInputDeserializer in)
private List<Tuple2<String, BinlogOffset>> readSplitsInfo(DataInputDeserializer in)
throws IOException {
List<Tuple2<String, BinlogPosition>> splitsInfo = new ArrayList<>();
List<Tuple2<String, BinlogOffset>> splitsInfo = new ArrayList<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
String splitId = in.readUTF();
BinlogPosition binlogPosition = readBinlogPosition(in);
splitsInfo.add(Tuple2.of(splitId, binlogPosition));
BinlogOffset binlogOffset = readBinlogPosition(in);
splitsInfo.add(Tuple2.of(splitId, binlogOffset));
}
return splitsInfo;
}

@ -24,11 +24,11 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.assigner.MySQLSnapshotSplitAssigner;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorAckEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorRequestReportEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.SourceReaderReportEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitKind;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
@ -59,13 +59,13 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
private final MySQLSnapshotSplitAssigner snapshotSplitAssigner;
private final Map<Integer, List<MySQLSplit>> assignedSplits;
private final Map<Integer, List<Tuple2<String, BinlogPosition>>> receiveFinishedSnapshotSplits;
private final Map<Integer, List<Tuple2<String, BinlogOffset>>> receiveFinishedSnapshotSplits;
public MySQLSourceEnumerator(
SplitEnumeratorContext<MySQLSplit> context,
MySQLSnapshotSplitAssigner snapshotSplitAssigner,
Map<Integer, List<MySQLSplit>> assignedSplits,
Map<Integer, List<Tuple2<String, BinlogPosition>>> receiveFinishedSnapshotSplits) {
Map<Integer, List<Tuple2<String, BinlogOffset>>> receiveFinishedSnapshotSplits) {
this.context = context;
this.snapshotSplitAssigner = snapshotSplitAssigner;
this.assignedSplits = assignedSplits;
@ -115,7 +115,7 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
for (int subtaskId : subtaskIds) {
final List<MySQLSplit> assignedSplit =
assignedSplits.getOrDefault(subtaskId, new ArrayList<>());
final List<Tuple2<String, BinlogPosition>> ackSpitsForReader =
final List<Tuple2<String, BinlogOffset>> ackSpitsForReader =
receiveFinishedSnapshotSplits.getOrDefault(subtaskId, new ArrayList<>());
int assignedSnapshotSplitSize =
assignedSplit.stream()
@ -146,7 +146,7 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
sourceEvent,
subtaskId);
SourceReaderReportEvent reportEvent = (SourceReaderReportEvent) sourceEvent;
final List<Tuple2<String, BinlogPosition>> ackSpitsForReader =
final List<Tuple2<String, BinlogOffset>> ackSpitsForReader =
receiveFinishedSnapshotSplits.getOrDefault(subtaskId, new ArrayList<>());
ackSpitsForReader.addAll(reportEvent.getFinishedSplits());
@ -225,24 +225,24 @@ public class MySQLSourceEnumerator implements SplitEnumerator<MySQLSplit, MySQLS
.flatMap(Collection::stream)
.sorted(Comparator.comparing(MySQLSplit::splitId))
.collect(Collectors.toList());
final List<Tuple2<String, BinlogPosition>> receiveSnapshotSplits =
final List<Tuple2<String, BinlogOffset>> receiveSnapshotSplits =
receiveFinishedSnapshotSplits.values().stream()
.flatMap(Collection::stream)
.sorted(Comparator.comparing(o -> o.f0))
.collect(Collectors.toList());
final List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> snapshotSplits =
final List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> snapshotSplits =
new ArrayList<>();
final Map<TableId, SchemaRecord> databaseHistory = new HashMap<>();
BinlogPosition minBinlogOffset = receiveSnapshotSplits.get(0).f1;
BinlogOffset minBinlogOffset = receiveSnapshotSplits.get(0).f1;
for (int i = 0; i < assignedSnapshotSplit.size(); i++) {
MySQLSplit split = assignedSnapshotSplit.get(i);
// find the min binlog offset
if (receiveSnapshotSplits.get(i).f1.compareTo(minBinlogOffset) < 0) {
minBinlogOffset = receiveSnapshotSplits.get(i).f1;
}
Tuple2<String, BinlogPosition> splitPosition = receiveSnapshotSplits.get(i);
Tuple2<String, BinlogOffset> splitPosition = receiveSnapshotSplits.get(i);
snapshotSplits.add(
Tuple5.of(
split.getTableId(),

@ -21,8 +21,8 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.java.tuple.Tuple2;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.enumerator.MySQLSourceEnumerator;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.reader.MySQLSourceReader;
import java.util.ArrayList;
@ -35,13 +35,13 @@ public class SourceReaderReportEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
private final ArrayList<Tuple2<String, BinlogPosition>> finishedSplits;
private final ArrayList<Tuple2<String, BinlogOffset>> finishedSplits;
public SourceReaderReportEvent(ArrayList<Tuple2<String, BinlogPosition>> finishedSplits) {
public SourceReaderReportEvent(ArrayList<Tuple2<String, BinlogOffset>> finishedSplits) {
this.finishedSplits = finishedSplits;
}
public ArrayList<Tuple2<String, BinlogPosition>> getFinishedSplits() {
public ArrayList<Tuple2<String, BinlogOffset>> getFinishedSplits() {
return finishedSplits;
}
}

@ -16,29 +16,25 @@
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.mysql.debezium.offset;
package com.alibaba.ververica.cdc.connectors.mysql.source.offset;
import org.apache.flink.util.Preconditions;
/** A {@link MySQLOffset} implementation that uses binlog position. */
public class BinlogPosition implements MySQLOffset, Comparable<BinlogPosition> {
import java.util.Objects;
public static final BinlogPosition INITIAL_OFFSET = new BinlogPosition("", 0);
/** A structure describes an offset in a binlog of MySQL server. */
public class BinlogOffset implements Comparable<BinlogOffset> {
public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0);
private final String filename;
private final long position;
public BinlogPosition(String filename, long position) {
public BinlogOffset(String filename, long position) {
Preconditions.checkNotNull(filename);
this.filename = filename;
this.position = position;
}
public BinlogPosition(
String filename, long position, boolean isLowWatermark, boolean isHighWatermark) {
this.filename = filename;
this.position = position;
}
public String getFilename() {
return filename;
}
@ -48,7 +44,7 @@ public class BinlogPosition implements MySQLOffset, Comparable<BinlogPosition> {
}
@Override
public int compareTo(BinlogPosition o) {
public int compareTo(BinlogOffset o) {
if (this.filename.equals(o.filename)) {
return Long.compare(this.position, o.position);
} else {
@ -57,7 +53,7 @@ public class BinlogPosition implements MySQLOffset, Comparable<BinlogPosition> {
}
}
public boolean isAtOrBefore(BinlogPosition that) {
public boolean isAtOrBefore(BinlogOffset that) {
return this.compareTo(that) >= 0;
}
@ -67,32 +63,19 @@ public class BinlogPosition implements MySQLOffset, Comparable<BinlogPosition> {
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + filename.hashCode();
result = prime * result + (int) (position ^ (position >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (obj == null) {
if (o == null || getClass() != o.getClass()) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
BinlogPosition other = (BinlogPosition) obj;
if (!filename.equals(other.filename)) {
return false;
}
if (position != other.position) {
return false;
}
return true;
BinlogOffset that = (BinlogOffset) o;
return position == that.position && Objects.equals(filename, that.filename);
}
@Override
public int hashCode() {
return Objects.hash(filename, position);
}
}

@ -22,7 +22,7 @@ import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitState;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
@ -65,7 +65,7 @@ public final class MySQLRecordEmitter<T>
public void emitRecord(SourceRecord element, SourceOutput<T> output, MySQLSplitState splitState)
throws Exception {
if (isWatermarkEvent(element)) {
BinlogPosition watermark = getWatermark(element);
BinlogOffset watermark = getWatermark(element);
if (isHighWatermarkEvent(element)) {
splitState.setHighWatermarkState(watermark);
splitState.setSnapshotReadFinishedState(true);
@ -83,7 +83,7 @@ public final class MySQLRecordEmitter<T>
new SchemaRecord(TABLE_CHANGE_SERIALIZER.toDocument(tableChange)));
}
} else if (isDataChangeRecord(element)) {
BinlogPosition position = getBinlogPosition(element);
BinlogOffset position = getBinlogPosition(element);
splitState.setOffsetState(position);
debeziumDeserializationSchema.deserialize(
element,

@ -28,10 +28,10 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorAckEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.EnumeratorRequestReportEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.events.SourceReaderReportEvent;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitKind;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitReader;
@ -155,7 +155,7 @@ public class MySQLSourceReader<T>
private void reportFinishedSnapshotSplits(Collection<MySQLSplit> splits) {
if (!splits.isEmpty()) {
final ArrayList<Tuple2<String, BinlogPosition>> finishedNoAckSplits = new ArrayList<>();
final ArrayList<Tuple2<String, BinlogOffset>> finishedNoAckSplits = new ArrayList<>();
for (MySQLSplit split : splits) {
finishedNoAckSplits.add(Tuple2.of(split.getSplitId(), split.getHighWatermark()));
}

@ -22,7 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.table.types.logical.RowType;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
import io.debezium.relational.TableId;
@ -43,14 +43,14 @@ public class MySQLSplit implements SourceSplit {
// the fields for snapshot split
@Nullable private final Object[] splitBoundaryStart;
@Nullable private final Object[] splitBoundaryEnd;
@Nullable private final BinlogPosition lowWatermark;
@Nullable private final BinlogPosition highWatermark;
@Nullable private final BinlogOffset lowWatermark;
@Nullable private final BinlogOffset highWatermark;
private final boolean snapshotReadFinished;
// the fields for binlog split
private final BinlogPosition offset;
private final BinlogOffset offset;
// (tableId, splitId, splitStart, splitEnd, highWatermark)
private final List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>>
private final List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>>
finishedSplitsInfo;
// TODO: The databaseHistory can be shared in splitReader for all snapshot splits
@ -70,11 +70,11 @@ public class MySQLSplit implements SourceSplit {
RowType splitBoundaryType,
@Nullable Object[] splitBoundaryStart,
@Nullable Object[] splitBoundaryEnd,
@Nullable BinlogPosition lowWatermark,
@Nullable BinlogPosition highWatermark,
@Nullable BinlogOffset lowWatermark,
@Nullable BinlogOffset highWatermark,
boolean snapshotReadFinished,
@Nullable BinlogPosition offset,
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo,
@Nullable BinlogOffset offset,
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo,
Map<TableId, SchemaRecord> databaseHistory) {
this.splitKind = splitKind;
this.tableId = tableId;
@ -122,12 +122,12 @@ public class MySQLSplit implements SourceSplit {
}
@Nullable
public BinlogPosition getLowWatermark() {
public BinlogOffset getLowWatermark() {
return lowWatermark;
}
@Nullable
public BinlogPosition getHighWatermark() {
public BinlogOffset getHighWatermark() {
return highWatermark;
}
@ -135,12 +135,11 @@ public class MySQLSplit implements SourceSplit {
return snapshotReadFinished;
}
public BinlogPosition getOffset() {
public BinlogOffset getOffset() {
return offset;
}
public List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>>
getFinishedSplitsInfo() {
public List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> getFinishedSplitsInfo() {
return finishedSplitsInfo;
}

@ -25,7 +25,7 @@ import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
@ -38,7 +38,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.readBinlogPosition;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.rowToSerializedString;
import static com.alibaba.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.serializedStringToRow;
@ -135,8 +135,8 @@ public final class MySQLSplitSerializer implements SimpleVersionedSerializer<MyS
if (isSnapshotSplit) {
Object[] splitBoundaryStart = serializedStringToRow(in.readUTF());
Object[] splitBoundaryEnd = serializedStringToRow(in.readUTF());
BinlogPosition lowWatermark = readBinlogPosition(in);
BinlogPosition highWatermark = readBinlogPosition(in);
BinlogOffset lowWatermark = readBinlogPosition(in);
BinlogOffset highWatermark = readBinlogPosition(in);
boolean isSnapshotReadFinished = in.readBoolean();
Map<TableId, SchemaRecord> databaseHistory = readDatabaseHistory(in);
@ -155,8 +155,8 @@ public final class MySQLSplitSerializer implements SimpleVersionedSerializer<MyS
new ArrayList<>(),
databaseHistory);
} else {
BinlogPosition offset = readBinlogPosition(in);
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
BinlogOffset offset = readBinlogPosition(in);
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
readFinishedSplitsInfo(in);
Map<TableId, SchemaRecord> databaseHistory = readDatabaseHistory(in);
in.releaseArrays();
@ -201,13 +201,13 @@ public final class MySQLSplitSerializer implements SimpleVersionedSerializer<MyS
}
private static void writeFinishedSplitsInfo(
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo,
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo,
DataOutputSerializer out)
throws IOException {
final int size = finishedSplitsInfo.size();
out.writeInt(size);
for (int i = 0; i < size; i++) {
Tuple5<TableId, String, Object[], Object[], BinlogPosition> splitInfo =
Tuple5<TableId, String, Object[], Object[], BinlogOffset> splitInfo =
finishedSplitsInfo.get(i);
out.writeUTF(splitInfo.f0.toString());
out.writeUTF(splitInfo.f1);
@ -217,9 +217,9 @@ public final class MySQLSplitSerializer implements SimpleVersionedSerializer<MyS
}
}
private static List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>>
private static List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>>
readFinishedSplitsInfo(DataInputDeserializer in) throws IOException {
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
new ArrayList<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
@ -227,7 +227,7 @@ public final class MySQLSplitSerializer implements SimpleVersionedSerializer<MyS
String splitId = in.readUTF();
Object[] splitStart = serializedStringToRow(in.readUTF());
Object[] splitEnd = serializedStringToRow(in.readUTF());
BinlogPosition highWatermark = readBinlogPosition(in);
BinlogOffset highWatermark = readBinlogPosition(in);
finishedSplitsInfo.add(
Tuple5.of(tableId, splitId, splitStart, splitEnd, highWatermark));
}

@ -18,7 +18,7 @@
package com.alibaba.ververica.cdc.connectors.mysql.source.split;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
import io.debezium.relational.TableId;
@ -33,9 +33,9 @@ import java.util.Map;
*/
public final class MySQLSplitState extends MySQLSplit {
private BinlogPosition lowWatermarkState;
private BinlogPosition highWatermarkState;
private BinlogPosition offsetState;
private BinlogOffset lowWatermarkState;
private BinlogOffset highWatermarkState;
private BinlogOffset offsetState;
private boolean snapshotReadFinishedState;
@Nullable private Map<TableId, SchemaRecord> databaseHistoryState;
@ -62,15 +62,15 @@ public final class MySQLSplitState extends MySQLSplit {
split.getDatabaseHistory() == null ? new HashMap<>() : split.getDatabaseHistory();
}
public void setLowWatermarkState(BinlogPosition lowWatermarkState) {
public void setLowWatermarkState(BinlogOffset lowWatermarkState) {
this.lowWatermarkState = lowWatermarkState;
}
public void setHighWatermarkState(BinlogPosition highWatermarkState) {
public void setHighWatermarkState(BinlogOffset highWatermarkState) {
this.highWatermarkState = highWatermarkState;
}
public void setOffsetState(BinlogPosition offsetState) {
public void setOffsetState(BinlogOffset offsetState) {
this.offsetState = offsetState;
}
@ -87,16 +87,16 @@ public final class MySQLSplitState extends MySQLSplit {
this.databaseHistoryState.put(tableId, latestSchemaChange);
}
public BinlogPosition getLowWatermarkState() {
public BinlogOffset getLowWatermarkState() {
return lowWatermarkState;
}
public BinlogPosition getHighWatermarkState() {
public BinlogOffset getHighWatermarkState() {
return highWatermarkState;
}
@Nullable
public BinlogPosition getOffsetState() {
public BinlogOffset getOffsetState() {
return offsetState;
}

@ -23,7 +23,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
@ -217,11 +217,11 @@ public class RecordUtils {
return false;
}
public static BinlogPosition getWatermark(SourceRecord watermarkEvent) {
public static BinlogOffset getWatermark(SourceRecord watermarkEvent) {
Struct value = (Struct) watermarkEvent.value();
String file = value.getString(BINLOG_FILENAME_OFFSET_KEY);
Long position = value.getInt64(BINLOG_POSITION_OFFSET_KEY);
return new BinlogPosition(file, position);
return new BinlogOffset(file, position);
}
public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
@ -238,7 +238,7 @@ public class RecordUtils {
* @return [splitId, splitStart, splitEnd, highWatermark], the information will be used to
* filter binlog events when read binlog of table.
*/
public static Tuple5<TableId, String, Object[], Object[], BinlogPosition> getSnapshotSplitInfo(
public static Tuple5<TableId, String, Object[], Object[], BinlogOffset> getSnapshotSplitInfo(
MySQLSplit split, SourceRecord highWatermark) {
Struct value = (Struct) highWatermark.value();
String splitId = value.getString(SPLIT_ID_KEY);
@ -249,18 +249,18 @@ public class RecordUtils {
splitId,
split.getSplitBoundaryStart(),
split.getSplitBoundaryEnd(),
new BinlogPosition(file, position));
new BinlogOffset(file, position));
}
/** Returns the start offset of the binlog split. */
public static BinlogPosition getStartOffsetOfBinlogSplit(
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>>
public static BinlogOffset getStartOffsetOfBinlogSplit(
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>>
finishedSnapshotSplits) {
BinlogPosition startOffset =
BinlogOffset startOffset =
finishedSnapshotSplits.isEmpty()
? new BinlogPosition("", 0)
? new BinlogOffset("", 0)
: finishedSnapshotSplits.get(0).f4;
for (Tuple5<TableId, String, Object[], Object[], BinlogPosition> finishedSnapshotSplit :
for (Tuple5<TableId, String, Object[], Object[], BinlogOffset> finishedSnapshotSplit :
finishedSnapshotSplits) {
if (!finishedSnapshotSplit.f4.isAtOrBefore(startOffset)) {
startOffset = finishedSnapshotSplit.f4;
@ -292,12 +292,12 @@ public class RecordUtils {
return new Object[] {key.get(splitFieldName)};
}
public static BinlogPosition getBinlogPosition(SourceRecord dataRecord) {
public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String fileName = (String) source.get(BINLOG_FILENAME_OFFSET_KEY);
Long position = (Long) (source.get(BINLOG_POSITION_OFFSET_KEY));
return new BinlogPosition(fileName, position);
return new BinlogOffset(fileName, position);
}
/** Returns the specific key contains in the split key range or not. */

@ -21,7 +21,7 @@ package com.alibaba.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import io.debezium.DebeziumException;
import io.debezium.util.HexConverter;
@ -31,14 +31,14 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.INITIAL_OFFSET;
/** Utils for serialization/deserialization. */
public class SerializerUtils {
private SerializerUtils() {}
public static void writeBinlogPosition(BinlogPosition offset, DataOutputSerializer out)
public static void writeBinlogPosition(BinlogOffset offset, DataOutputSerializer out)
throws IOException {
out.writeBoolean(offset != null);
if (offset != null) {
@ -47,8 +47,8 @@ public class SerializerUtils {
}
}
public static BinlogPosition readBinlogPosition(DataInputDeserializer in) throws IOException {
return in.readBoolean() ? new BinlogPosition(in.readUTF(), in.readLong()) : INITIAL_OFFSET;
public static BinlogOffset readBinlogPosition(DataInputDeserializer in) throws IOException {
return in.readBoolean() ? new BinlogOffset(in.readUTF(), in.readLong()) : INITIAL_OFFSET;
}
public static String rowToSerializedString(Object[] splitBoundary) {

@ -33,10 +33,10 @@ import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLTestBase;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.alibaba.ververica.cdc.connectors.mysql.source.MySQLSourceOptions;
import com.alibaba.ververica.cdc.connectors.mysql.source.assigner.MySQLSnapshotSplitAssigner;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitKind;
import com.alibaba.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
@ -311,9 +311,9 @@ public class BinlogSplitReaderTest extends MySQLTestBase {
}
// step-2: create binlog split according the finished snapshot splits
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
getFinishedSplitsInfo(sqlSplits, fetchedRecords);
BinlogPosition startOffset = getStartOffsetOfBinlogSplit(finishedSplitsInfo);
BinlogOffset startOffset = getStartOffsetOfBinlogSplit(finishedSplitsInfo);
Map<TableId, SchemaRecord> databaseHistory = new HashMap<>();
TableId tableId = null;
for (MySQLSplit mySQLSplit : sqlSplits) {
@ -458,12 +458,12 @@ public class BinlogSplitReaderTest extends MySQLTestBase {
}
}
private List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> getFinishedSplitsInfo(
private List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> getFinishedSplitsInfo(
List<MySQLSplit> mySQLSplits, List<SourceRecord> records) {
Map<String, MySQLSplit> splitMap = new HashMap<>();
mySQLSplits.forEach(r -> splitMap.put(r.getSplitId(), r));
List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
new ArrayList<>();
records.stream()
.filter(event -> isHighWatermarkEvent(event))

@ -24,7 +24,7 @@ import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplit;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitKind;
import com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitSerializer;
@ -39,7 +39,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitSerializerTest.assertSplitsEqual;
import static com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitSerializerTest.getTestHistoryRecord;
import static org.junit.Assert.assertArrayEquals;
@ -103,13 +103,13 @@ public class MySQLParallelSourceEnumStateSerializerTest {
assignedSplits.put(0, assignedSplitsForTask0);
assignedSplits.put(1, assignedSplitsForTask1);
final Map<Integer, List<Tuple2<String, BinlogPosition>>> finishedSnapshotSplits =
final Map<Integer, List<Tuple2<String, BinlogOffset>>> finishedSnapshotSplits =
new HashMap<>();
List<Tuple2<String, BinlogPosition>> finishedSplitsForTask0 = new ArrayList<>();
List<Tuple2<String, BinlogOffset>> finishedSplitsForTask0 = new ArrayList<>();
finishedSplitsForTask0.add(getTestSplitInfo(tableId0, 0));
finishedSplitsForTask0.add(getTestSplitInfo(tableId0, 1));
finishedSplitsForTask0.add(getTestSplitInfo(tableId1, 0));
List<Tuple2<String, BinlogPosition>> finishedSplitsForTask1 = new ArrayList<>();
List<Tuple2<String, BinlogOffset>> finishedSplitsForTask1 = new ArrayList<>();
finishedSplitsForTask0.add(getTestSplitInfo(tableId1, 1));
finishedSplitsForTask0.add(getTestSplitInfo(tableId0, 2));
finishedSnapshotSplits.put(0, finishedSplitsForTask0);
@ -127,8 +127,8 @@ public class MySQLParallelSourceEnumStateSerializerTest {
new RowType(Arrays.asList(new RowType.RowField("id", new BigIntType()))),
new Object[] {100L + splitNo * 1000},
new Object[] {999L + splitNo * 1000},
new BinlogPosition("mysql-bin.000001", 3L + splitNo * 200),
new BinlogPosition("mysql-bin.000001", 78L + splitNo * 200),
new BinlogOffset("mysql-bin.000001", 3L + splitNo * 200),
new BinlogOffset("mysql-bin.000001", 78L + splitNo * 200),
true,
INITIAL_OFFSET,
new ArrayList<>(),
@ -136,7 +136,7 @@ public class MySQLParallelSourceEnumStateSerializerTest {
}
private MySQLSplit getTestBinlogSplit(TableId tableId) throws Exception {
final List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
final List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
new ArrayList<>();
finishedSplitsInfo.add(
Tuple5.of(
@ -144,21 +144,21 @@ public class MySQLParallelSourceEnumStateSerializerTest {
tableId + "-0",
null,
new Object[] {100},
new BinlogPosition("mysql-bin.000001", 0L)));
new BinlogOffset("mysql-bin.000001", 0L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-1",
new Object[] {100},
new Object[] {200},
new BinlogPosition("mysql-bin.000001", 200L)));
new BinlogOffset("mysql-bin.000001", 200L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-2",
new Object[] {200},
null,
new BinlogPosition("mysql-bin.000001", 400L)));
new BinlogOffset("mysql-bin.000001", 400L)));
final Map<TableId, SchemaRecord> databaseHistory = new HashMap<>();
databaseHistory.put(tableId, getTestHistoryRecord());
@ -172,15 +172,14 @@ public class MySQLParallelSourceEnumStateSerializerTest {
null,
null,
true,
new BinlogPosition("mysql-bin.000001", 0L),
new BinlogOffset("mysql-bin.000001", 0L),
finishedSplitsInfo,
databaseHistory);
}
private Tuple2<String, BinlogPosition> getTestSplitInfo(TableId tableId, int splitNo) {
private Tuple2<String, BinlogOffset> getTestSplitInfo(TableId tableId, int splitNo) {
final String splitId = tableId.toString() + "-" + splitNo;
final BinlogPosition highWatermark =
new BinlogPosition("mysql-bin.000001", 0L + splitNo * 200);
final BinlogOffset highWatermark = new BinlogOffset("mysql-bin.000001", 0L + splitNo * 200);
return Tuple2.of(splitId, highWatermark);
}
@ -202,7 +201,7 @@ public class MySQLParallelSourceEnumStateSerializerTest {
assertEquals(
expected.getFinishedSnapshotSplits().size(),
actual.getFinishedSnapshotSplits().size());
for (Map.Entry<Integer, List<Tuple2<String, BinlogPosition>>> entry :
for (Map.Entry<Integer, List<Tuple2<String, BinlogOffset>>> entry :
expected.getFinishedSnapshotSplits().entrySet()) {
assertEquals(
entry.getValue().toString(),

@ -23,7 +23,7 @@ import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
@ -36,7 +36,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.INITIAL_OFFSET;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
@ -54,8 +54,8 @@ public class MySQLSplitSerializerTest {
new RowType(Arrays.asList(new RowType.RowField("id", new BigIntType()))),
new Object[] {100L},
new Object[] {999L},
new BinlogPosition("mysql-bin.000001", 3L),
new BinlogPosition("mysql-bin.000002", 78L),
new BinlogOffset("mysql-bin.000001", 3L),
new BinlogOffset("mysql-bin.000002", 78L),
true,
INITIAL_OFFSET,
new ArrayList<>(),
@ -66,7 +66,7 @@ public class MySQLSplitSerializerTest {
@Test
public void testBinlogSplit() throws Exception {
final TableId tableId = TableId.parse("test_db.test_table");
final List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
final List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
new ArrayList<>();
finishedSplitsInfo.add(
Tuple5.of(
@ -74,28 +74,28 @@ public class MySQLSplitSerializerTest {
tableId + "-0",
null,
new Object[] {100},
new BinlogPosition("mysql-bin.000001", 4L)));
new BinlogOffset("mysql-bin.000001", 4L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-1",
new Object[] {100},
new Object[] {200},
new BinlogPosition("mysql-bin.000001", 200L)));
new BinlogOffset("mysql-bin.000001", 200L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-2",
new Object[] {200},
new Object[] {300},
new BinlogPosition("mysql-bin.000001", 600L)));
new BinlogOffset("mysql-bin.000001", 600L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-3",
new Object[] {300},
null,
new BinlogPosition("mysql-bin.000001", 800L)));
new BinlogOffset("mysql-bin.000001", 800L)));
final Map<TableId, SchemaRecord> databaseHistory = new HashMap<>();
databaseHistory.put(tableId, getTestHistoryRecord());
@ -112,7 +112,7 @@ public class MySQLSplitSerializerTest {
null,
null,
true,
new BinlogPosition("mysql-bin.000001", 4L),
new BinlogOffset("mysql-bin.000001", 4L),
finishedSplitsInfo,
databaseHistory);
assertSplitsEqual(split, serializeAndDeserializeSplit(split));
@ -128,8 +128,8 @@ public class MySQLSplitSerializerTest {
new RowType(Arrays.asList(new RowType.RowField("id", new BigIntType()))),
null,
new Object[] {99L},
new BinlogPosition("mysql-bin.000001", 3L),
new BinlogPosition("mysql-bin.000002", 78L),
new BinlogOffset("mysql-bin.000001", 3L),
new BinlogOffset("mysql-bin.000002", 78L),
true,
INITIAL_OFFSET,
new ArrayList<>(),

@ -23,7 +23,7 @@ import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition;
import com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.alibaba.ververica.cdc.debezium.internal.SchemaRecord;
import io.debezium.relational.TableId;
import org.junit.Test;
@ -34,7 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.alibaba.ververica.cdc.connectors.mysql.debezium.offset.BinlogPosition.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.INITIAL_OFFSET;
import static com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitSerializerTest.assertSplitsEqual;
import static com.alibaba.ververica.cdc.connectors.mysql.source.split.MySQLSplitSerializerTest.getTestHistoryRecord;
@ -51,8 +51,8 @@ public class MySQLSplitStateTest {
new RowType(Arrays.asList(new RowType.RowField("id", new BigIntType()))),
new Object[] {100L},
new Object[] {999L},
new BinlogPosition("mysql-bin.000001", 3L),
new BinlogPosition("mysql-bin.000002", 78L),
new BinlogOffset("mysql-bin.000001", 3L),
new BinlogOffset("mysql-bin.000002", 78L),
true,
INITIAL_OFFSET,
new ArrayList<>(),
@ -78,8 +78,8 @@ public class MySQLSplitStateTest {
new ArrayList<>(),
new HashMap<>());
final MySQLSplitState mySQLSplitState = new MySQLSplitState(split);
mySQLSplitState.setLowWatermarkState(new BinlogPosition("mysql-bin.000001", 3L));
mySQLSplitState.setHighWatermarkState(new BinlogPosition("mysql-bin.000002", 78L));
mySQLSplitState.setLowWatermarkState(new BinlogOffset("mysql-bin.000001", 3L));
mySQLSplitState.setHighWatermarkState(new BinlogOffset("mysql-bin.000002", 78L));
mySQLSplitState.setSnapshotReadFinishedState(true);
final MySQLSplit expected =
@ -90,8 +90,8 @@ public class MySQLSplitStateTest {
new RowType(Arrays.asList(new RowType.RowField("id", new BigIntType()))),
new Object[] {100L},
new Object[] {999L},
new BinlogPosition("mysql-bin.000001", 3L),
new BinlogPosition("mysql-bin.000002", 78L),
new BinlogOffset("mysql-bin.000001", 3L),
new BinlogOffset("mysql-bin.000002", 78L),
true,
INITIAL_OFFSET,
new ArrayList<>(),
@ -103,24 +103,24 @@ public class MySQLSplitStateTest {
public void testRecordBinlogSplitState() throws Exception {
final MySQLSplit split =
getTestBinlogSplitWithOffset(new BinlogPosition("mysql-bin.000001", 4L));
getTestBinlogSplitWithOffset(new BinlogOffset("mysql-bin.000001", 4L));
final MySQLSplitState mySQLSplitState = new MySQLSplitState(split);
mySQLSplitState.setOffsetState(new BinlogPosition("mysql-bin.000001", 100L));
mySQLSplitState.setOffsetState(new BinlogOffset("mysql-bin.000001", 100L));
assertSplitsEqual(
getTestBinlogSplitWithOffset(new BinlogPosition("mysql-bin.000001", 100L)),
getTestBinlogSplitWithOffset(new BinlogOffset("mysql-bin.000001", 100L)),
mySQLSplitState.toMySQLSplit());
mySQLSplitState.setOffsetState(new BinlogPosition("mysql-bin.000001", 400L));
mySQLSplitState.setOffsetState(new BinlogOffset("mysql-bin.000001", 400L));
assertSplitsEqual(
getTestBinlogSplitWithOffset(new BinlogPosition("mysql-bin.000001", 400L)),
getTestBinlogSplitWithOffset(new BinlogOffset("mysql-bin.000001", 400L)),
mySQLSplitState.toMySQLSplit());
}
private MySQLSplit getTestBinlogSplitWithOffset(BinlogPosition offset) throws Exception {
private MySQLSplit getTestBinlogSplitWithOffset(BinlogOffset offset) throws Exception {
final TableId tableId = TableId.parse("test_db.test_table");
final List<Tuple5<TableId, String, Object[], Object[], BinlogPosition>> finishedSplitsInfo =
final List<Tuple5<TableId, String, Object[], Object[], BinlogOffset>> finishedSplitsInfo =
new ArrayList<>();
finishedSplitsInfo.add(
Tuple5.of(
@ -128,28 +128,28 @@ public class MySQLSplitStateTest {
tableId + "-0",
null,
new Object[] {100},
new BinlogPosition("mysql-bin.000001", 4L)));
new BinlogOffset("mysql-bin.000001", 4L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-1",
new Object[] {100},
new Object[] {200},
new BinlogPosition("mysql-bin.000001", 200L)));
new BinlogOffset("mysql-bin.000001", 200L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-2",
new Object[] {200},
new Object[] {300},
new BinlogPosition("mysql-bin.000001", 600L)));
new BinlogOffset("mysql-bin.000001", 600L)));
finishedSplitsInfo.add(
Tuple5.of(
tableId,
tableId + "-3",
new Object[] {300},
null,
new BinlogPosition("mysql-bin.000001", 800L)));
new BinlogOffset("mysql-bin.000001", 800L)));
final Map<TableId, SchemaRecord> databaseHistory = new HashMap<>();
databaseHistory.put(tableId, getTestHistoryRecord());

Loading…
Cancel
Save