[mysql] Fix backward compatibility on deserializing binlog offset from old versions (#1758)

This closes #1757
pull/1936/head
Qingsheng Ren 2 years ago committed by Leonard Xu
parent b9bc1c1ee7
commit c1a049ed1b

@ -18,11 +18,14 @@ package com.ververica.cdc.connectors.mysql.source.offset;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import io.debezium.connector.mysql.GtidSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
@ -94,7 +97,8 @@ public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
return builder().setOffsetKind(NON_STOPPING).build();
}
BinlogOffset(Map<String, String> offset) {
@VisibleForTesting
public BinlogOffset(Map<String, String> offset) {
this.offset = offset;
}
@ -132,7 +136,11 @@ public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
return longOffsetValue(offset, SERVER_ID_KEY);
}
@Nullable
public BinlogOffsetKind getOffsetKind() {
if (offset.get(OFFSET_KIND_KEY) == null) {
return null;
}
return BinlogOffsetKind.valueOf(offset.get(OFFSET_KIND_KEY));
}

@ -20,6 +20,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetSerializer;
import io.debezium.DebeziumException;
import io.debezium.util.HexConverter;
@ -78,6 +79,11 @@ public class SerializerUtils {
if (StringUtils.isEmpty(offset.getFilename()) && offset.getPosition() == 0L) {
return BinlogOffset.ofEarliest();
}
// For other cases we treat it as a specific offset
return BinlogOffset.builder()
.setOffsetKind(BinlogOffsetKind.SPECIFIC)
.setOffsetMap(offset.getOffset())
.build();
}
return offset;
} else {

@ -0,0 +1,121 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
/** Unit test for {@link SerializerUtils}. */
public class SerializerUtilsTest {
@Test
public void testBinlogOffsetSerde() throws Exception {
for (BinlogOffset offset : createBinlogOffsets()) {
byte[] serialized = serializeBinlogOffset(offset);
BinlogOffset deserialized = deserializeBinlogOffset(serialized);
assertEquals(offset, deserialized);
}
}
/**
* Test deserializing from old binlog offsets without {@link BinlogOffsetKind}, which validates
* the backward compatibility.
*/
@Test
public void testDeserializeFromBinlogOffsetWithoutKind() throws Exception {
// Create the INITIAL offset in earlier versions
Map<String, String> initialOffsetMap = new HashMap<>();
initialOffsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, "");
initialOffsetMap.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, "0");
BinlogOffset initialOffset = new BinlogOffset(initialOffsetMap);
BinlogOffset deserialized = deserializeBinlogOffset(serializeBinlogOffset(initialOffset));
assertEquals(BinlogOffset.ofEarliest(), deserialized);
// Create the NON_STOPPING offset in earlier versions
Map<String, String> nonStoppingOffsetMap = new HashMap<>();
nonStoppingOffsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, "");
nonStoppingOffsetMap.put(
BinlogOffset.BINLOG_POSITION_OFFSET_KEY, Long.toString(Long.MIN_VALUE));
BinlogOffset nonStoppingOffset = new BinlogOffset(nonStoppingOffsetMap);
deserialized = deserializeBinlogOffset(serializeBinlogOffset(nonStoppingOffset));
assertEquals(BinlogOffset.ofNonStopping(), deserialized);
// Create a specific offset in earlier versions
Map<String, String> specificOffsetMap = new HashMap<>();
specificOffsetMap.put(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY, "mysql-bin.000001");
specificOffsetMap.put(BinlogOffset.BINLOG_POSITION_OFFSET_KEY, "4");
specificOffsetMap.put(
BinlogOffset.GTID_SET_KEY, "24DA167-0C0C-11E8-8442-00059A3C7B00:1-19");
specificOffsetMap.put(BinlogOffset.TIMESTAMP_KEY, "1668690384");
specificOffsetMap.put(BinlogOffset.EVENTS_TO_SKIP_OFFSET_KEY, "15213");
specificOffsetMap.put(BinlogOffset.ROWS_TO_SKIP_OFFSET_KEY, "18613");
BinlogOffset specificOffset = new BinlogOffset(specificOffsetMap);
deserialized = deserializeBinlogOffset(serializeBinlogOffset(specificOffset));
assertEquals(
BinlogOffset.builder()
.setBinlogFilePosition("mysql-bin.000001", 4L)
.setGtidSet("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")
.setTimestampSec(1668690384L)
.setSkipEvents(15213L)
.setSkipRows(18613L)
.build(),
deserialized);
}
private List<BinlogOffset> createBinlogOffsets() {
return Arrays.asList(
// Specific offsets
BinlogOffset.ofBinlogFilePosition("foo-filename", 15213L),
BinlogOffset.ofGtidSet("foo-gtid"),
BinlogOffset.ofTimestampSec(15513L),
// Special offsets
BinlogOffset.ofNonStopping(),
BinlogOffset.ofEarliest(),
BinlogOffset.ofLatest(),
// Offsets with additional parameters
BinlogOffset.builder()
.setGtidSet("foo-gtid")
.setSkipEvents(18213L)
.setSkipRows(18613L)
.build());
}
private byte[] serializeBinlogOffset(BinlogOffset binlogOffset) throws IOException {
DataOutputSerializer dos = new DataOutputSerializer(64);
SerializerUtils.writeBinlogPosition(binlogOffset, dos);
return dos.getCopyOfBuffer();
}
private BinlogOffset deserializeBinlogOffset(byte[] serialized) throws IOException {
DataInputDeserializer did = new DataInputDeserializer(serialized);
return SerializerUtils.readBinlogPosition(4, did);
}
}
Loading…
Cancel
Save