[mysql] Fix the restoration failure when checkpoint happened in one MySQL transaction

pull/437/head
Leonard Xu 3 years ago committed by Leonard Xu
parent 22f2ac8a75
commit a785efab5a

@ -96,6 +96,12 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>

@ -85,7 +85,10 @@ public class DebeziumUtils {
if (rs.next()) {
final String binlogFilename = rs.getString(1);
final long binlogPosition = rs.getLong(2);
return new BinlogOffset(binlogFilename, binlogPosition);
final String gtidSet =
rs.getMetaData().getColumnCount() > 4 ? rs.getString(5) : null;
return new BinlogOffset(
binlogFilename, binlogPosition, 0L, 0, 0, gtidSet, null);
} else {
throw new FlinkRuntimeException(
"Cannot read the binlog filename and position via '"

@ -21,7 +21,6 @@ package com.ververica.cdc.connectors.mysql.debezium.dispatcher;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Schema;
@ -29,6 +28,8 @@ import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Map;
/**
* A dispatcher to dispatch watermark signal events.
*
@ -53,15 +54,13 @@ public class SignalEventDispatcher {
private final Schema signalEventKeySchema;
private final Schema signalEventValueSchema;
private final MySqlOffsetContext offsetContext;
private final Map<String, ?> sourcePartition;
private final String topic;
private final ChangeEventQueue<DataChangeEvent> queue;
public SignalEventDispatcher(
MySqlOffsetContext offsetContext,
String topic,
ChangeEventQueue<DataChangeEvent> queue) {
this.offsetContext = offsetContext;
Map<String, ?> sourcePartition, String topic, ChangeEventQueue<DataChangeEvent> queue) {
this.sourcePartition = sourcePartition;
this.topic = topic;
this.queue = queue;
this.signalEventKeySchema =
@ -75,8 +74,6 @@ public class SignalEventDispatcher {
.name(SCHEMA_NAME_ADJUSTER.adjust(SIGNAL_EVENT_VALUE_SCHEMA_NAME))
.field(SPLIT_ID_KEY, Schema.STRING_SCHEMA)
.field(WATERMARK_KIND, Schema.STRING_SCHEMA)
.field(BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA)
.field(BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA)
.build();
}
@ -86,13 +83,13 @@ public class SignalEventDispatcher {
SourceRecord sourceRecord =
new SourceRecord(
offsetContext.getPartition(),
offsetContext.getOffset(),
sourcePartition,
watermark.getOffset(),
topic,
signalEventKeySchema,
signalRecordKey(mySqlSplit.splitId()),
signalEventValueSchema,
signalRecordValue(mySqlSplit.splitId(), watermark, watermarkKind));
signalRecordValue(mySqlSplit.splitId(), watermarkKind));
queue.enqueue(new DataChangeEvent(sourceRecord));
}
@ -103,13 +100,10 @@ public class SignalEventDispatcher {
return result;
}
private Struct signalRecordValue(
String splitId, BinlogOffset binlogOffset, WatermarkKind watermarkKind) {
private Struct signalRecordValue(String splitId, WatermarkKind watermarkKind) {
Struct result = new Struct(signalEventValueSchema);
result.put(SPLIT_ID_KEY, splitId);
result.put(WATERMARK_KIND, watermarkKind.toString());
result.put(BINLOG_FILENAME_OFFSET_KEY, binlogOffset.getFilename());
result.put(BINLOG_POSITION_OFFSET_KEY, binlogOffset.getPosition());
return result;
}

@ -35,6 +35,7 @@ import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,6 +75,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
private Map<TableId, List<FinishedSnapshotSplitInfo>> finishedSplitsInfo;
// tableId -> the max splitHighWatermark
private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
private Tables.TableFilter capturedTableFilter;
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
this.statefulTaskContext = statefulTaskContext;
@ -87,11 +89,10 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
this.currentBinlogSplit = mySqlSplit.asBinlogSplit();
configureFilter();
statefulTaskContext.configure(currentBinlogSplit);
this.capturedTableFilter =
statefulTaskContext.getConnectorConfig().getTableFilters().dataCollectionFilter();
this.queue = statefulTaskContext.getQueue();
final MySqlOffsetContext mySqlOffsetContext = statefulTaskContext.getOffsetContext();
mySqlOffsetContext.setBinlogStartPoint(
currentBinlogSplit.getStartingOffset().getFilename(),
currentBinlogSplit.getStartingOffset().getPosition());
this.binlogSplitReadTask =
new MySqlBinlogSplitReadTask(
statefulTaskContext.getConnectorConfig(),
@ -195,10 +196,11 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
if (isDataChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
BinlogOffset position = getBinlogPosition(sourceRecord);
// aligned, all snapshot splits of the table has reached max highWatermark
if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
if (hasEnterPureBinlogPhase(tableId, position)) {
return true;
}
// only the table who captured snapshot splits need to filter
if (finishedSplitsInfo.containsKey(tableId)) {
Object[] key =
getSplitKey(
currentBinlogSplit.getSplitKeyType(),
@ -211,6 +213,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
return true;
}
}
}
// not in the monitored splits scope, do not emit
return false;
}
@ -219,16 +222,34 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
return true;
}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
return true;
}
// capture dynamically new added tables
// TODO: there is still very little chance that we can't capture new added table.
// That the tables dynamically added after discovering captured tables in enumerator
// and before the lowest binlog offset of all table splits. This interval should be
// very short, so we don't support it for now.
return !maxSplitHighWatermarkMap.containsKey(tableId)
&& capturedTableFilter.isIncluded(tableId);
}
private void configureFilter() {
List<FinishedSnapshotSplitInfo> finishedSplitInfos =
currentBinlogSplit.getFinishedSnapshotSplitInfos();
Map<TableId, List<FinishedSnapshotSplitInfo>> splitsInfoMap = new HashMap<>();
Map<TableId, BinlogOffset> tableIdBinlogPositionMap = new HashMap<>();
// latest-offset mode
if (finishedSplitInfos.isEmpty()) {
for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) {
tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset());
}
} else {
}
// initial mode
else {
for (FinishedSnapshotSplitInfo finishedSplitInfo : finishedSplitInfos) {
TableId tableId = finishedSplitInfo.getTableId();
List<FinishedSnapshotSplitInfo> list =

@ -37,9 +37,8 @@ import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
/**
* Task to read all binlog for table and also supports read bounded (from lowWatermark to
@ -47,7 +46,7 @@ import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.NO_S
*/
public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
private static final Logger logger = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
private final MySqlBinlogSplit binlogSplit;
private final MySqlOffsetContext offsetContext;
private final EventDispatcherImpl<TableId> eventDispatcher;
@ -80,7 +79,8 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
this.offsetContext = offsetContext;
this.errorHandler = errorHandler;
this.signalEventDispatcher =
new SignalEventDispatcher(offsetContext, topic, eventDispatcher.getQueue());
new SignalEventDispatcher(
offsetContext.getPartition(), topic, eventDispatcher.getQueue());
}
@Override
@ -94,14 +94,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
super.handleEvent(event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
new BinlogOffset(
offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
Long.parseLong(
offsetContext
.getOffset()
.get(BINLOG_POSITION_OFFSET_KEY)
.toString()));
final BinlogOffset currentBinlogOffset = getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) {
// send binlog end event
@ -111,7 +104,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
logger.error("Send signal event error.", e);
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}

@ -59,7 +59,7 @@ import java.sql.Types;
import java.time.Duration;
import java.util.Calendar;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.getCurrentBinlogPosition;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset;
/** Task to read snapshot split of table. */
public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {
@ -129,28 +129,27 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;
ctx.offset = offsetContext;
final SignalEventDispatcher signalEventDispatcher =
new SignalEventDispatcher(
offsetContext.getPartition(),
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());
final BinlogOffset lowWatermark = getCurrentBinlogPosition(jdbcConnection);
final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit);
offsetContext.setBinlogStartPoint(lowWatermark.getFilename(), lowWatermark.getPosition());
ctx.offset = offsetContext;
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setLowWatermark(lowWatermark);
final SignalEventDispatcher signalEventDispatcher =
new SignalEventDispatcher(
offsetContext,
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
LOG.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit.getTableId());
final BinlogOffset highWatermark = getCurrentBinlogPosition(jdbcConnection);
final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,

@ -58,9 +58,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static io.debezium.config.CommonConnectorConfig.TOMBSTONES_ON_DELETE;
/**
@ -119,9 +120,7 @@ public class StatefulTaskContext {
schemaNameAdjuster,
tableIdCaseInsensitive);
this.offsetContext =
(MySqlOffsetContext)
loadStartingOffsetState(
new MySqlOffsetContext.Loader(connectorConfig), mySqlSplit);
loadStartingOffsetState(new MySqlOffsetContext.Loader(connectorConfig), mySqlSplit);
validateAndLoadDatabaseHistory(offsetContext, databaseSchema);
this.taskContext =
@ -174,22 +173,49 @@ public class StatefulTaskContext {
}
/** Loads the connector's persistent offset (if present) via the given loader. */
private OffsetContext loadStartingOffsetState(
private MySqlOffsetContext loadStartingOffsetState(
OffsetContext.Loader loader, MySqlSplit mySqlSplit) {
Map<String, Object> previousOffset = new HashMap<>();
BinlogOffset offset =
mySqlSplit.isSnapshotSplit()
? BinlogOffset.INITIAL_OFFSET
: mySqlSplit.asBinlogSplit().getStartingOffset();
previousOffset.put("file", offset.getFilename());
previousOffset.put("pos", offset.getPosition());
if (previousOffset != null) {
OffsetContext offsetContext = loader.load(previousOffset);
return offsetContext;
MySqlOffsetContext mySqlOffsetContext =
(MySqlOffsetContext) loader.load(offset.getOffset());
if (!isBinlogAvailable(mySqlOffsetContext)) {
throw new IllegalStateException(
"The connector is trying to read binlog starting at "
+ mySqlOffsetContext.getSourceInfo()
+ ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
return mySqlOffsetContext;
}
private boolean isBinlogAvailable(MySqlOffsetContext offset) {
String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY);
if (binlogFilename == null) {
return true; // start at current position
}
if (binlogFilename.equals("")) {
return true; // start at beginning
}
// Accumulate the available binlog filenames ...
List<String> logNames = connection.availableBinlogFiles();
// And compare with the one we're supposed to use ...
boolean found = logNames.stream().anyMatch(binlogFilename::equals);
if (!found) {
LOG.info(
"Connector requires binlog file '{}', but MySQL only has {}",
binlogFilename,
String.join(", ", logNames));
} else {
return null;
LOG.info("MySQL has the binlog file '{}' required by the connector", binlogFilename);
}
return found;
}
private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {

@ -43,7 +43,7 @@ import static com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.wr
*/
public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> {
private static final int VERSION = 1;
private static final int VERSION = 2;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@ -96,22 +96,45 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
@Override
public PendingSplitsState deserialize(int version, byte[] serialized) throws IOException {
if (version == VERSION) {
switch (version) {
case 1:
case 2:
return deserializeV2(serialized);
default:
throw new IOException("Unknown version: " + version);
}
}
public PendingSplitsState deserializeV1(byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
final int splitVersion = in.readInt();
final int stateFlag = in.readInt();
if (stateFlag == SNAPSHOT_PENDING_SPLITS_STATE_FLAG) {
return deserializeSnapshotPendingSplitsState(splitVersion, in);
return deserializeSnapshotPendingSplitsState(1, splitVersion, in);
} else if (stateFlag == BINLOG_PENDING_SPLITS_STATE_FLAG) {
return deserializeBinlogPendingSplitsState(in);
} else if (stateFlag == HYBRID_PENDING_SPLITS_STATE_FLAG) {
return deserializeHybridPendingSplitsState(splitVersion, in);
return deserializeHybridPendingSplitsState(1, splitVersion, in);
} else {
throw new IOException(
"Unsupported to deserialize PendingSplitsState flag: " + stateFlag);
}
}
throw new IOException("Unknown version: " + version);
public PendingSplitsState deserializeV2(byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
final int splitVersion = in.readInt();
final int stateFlag = in.readInt();
if (stateFlag == SNAPSHOT_PENDING_SPLITS_STATE_FLAG) {
return deserializeSnapshotPendingSplitsState(2, splitVersion, in);
} else if (stateFlag == BINLOG_PENDING_SPLITS_STATE_FLAG) {
return deserializeBinlogPendingSplitsState(in);
} else if (stateFlag == HYBRID_PENDING_SPLITS_STATE_FLAG) {
return deserializeHybridPendingSplitsState(2, splitVersion, in);
} else {
throw new IOException(
"Unsupported to deserialize PendingSplitsState flag: " + stateFlag);
}
}
// ------------------------------------------------------------------------------------------
@ -143,12 +166,12 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
// ------------------------------------------------------------------------------------------
private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
int splitVersion, DataInputDeserializer in) throws IOException {
int offsetVersion, int splitVersion, DataInputDeserializer in) throws IOException {
List<TableId> alreadyProcessedTables = readTableIds(in);
List<MySqlSnapshotSplit> remainingSplits = readMySqlSnapshotSplits(splitVersion, in);
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(offsetVersion, in);
boolean isAssignerFinished = in.readBoolean();
return new SnapshotPendingSplitsState(
alreadyProcessedTables,
@ -159,9 +182,9 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
}
private HybridPendingSplitsState deserializeHybridPendingSplitsState(
int splitVersion, DataInputDeserializer in) throws IOException {
int offsetVersion, int splitVersion, DataInputDeserializer in) throws IOException {
SnapshotPendingSplitsState snapshotPendingSplitsState =
deserializeSnapshotPendingSplitsState(splitVersion, in);
deserializeSnapshotPendingSplitsState(offsetVersion, splitVersion, in);
boolean isBinlogSplitAssigned = in.readBoolean();
return new HybridPendingSplitsState(snapshotPendingSplitsState, isBinlogSplitAssigned);
}
@ -185,13 +208,13 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
}
}
private Map<String, BinlogOffset> readFinishedOffsets(DataInputDeserializer in)
throws IOException {
private Map<String, BinlogOffset> readFinishedOffsets(
int offsetVersion, DataInputDeserializer in) throws IOException {
Map<String, BinlogOffset> splitsInfo = new HashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
String splitId = in.readUTF();
BinlogOffset binlogOffset = readBinlogPosition(in);
BinlogOffset binlogOffset = readBinlogPosition(offsetVersion, in);
splitsInfo.put(splitId, binlogOffset);
}
return splitsInfo;

@ -18,48 +18,212 @@
package com.ververica.cdc.connectors.mysql.source.offset;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import io.debezium.connector.mysql.GtidSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.jdbc.JdbcConnection;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
/** A structure describes an offset in a binlog of MySQL server. */
/**
* A structure describes a fine grained offset in a binlog event including binlog position and gtid
* set etc.
*
* <p>This structure can also be used to deal the binlog event in transaction, a transaction may
* contains multiple change events, and each change event may contain multiple rows. When restart
* from a specific {@link BinlogOffset}, we need to skip the processed change events and the
* processed rows.
*/
public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
private static final long serialVersionUID = 1L;
public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
public static final String ROWS_TO_SKIP_OFFSET_KEY = "row";
public static final String GTID_SET_KEY = "gtids";
public static final String TIMESTAMP_KEY = "ts_sec";
public static final String SERVER_ID_KEY = "server_id";
public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0);
public static final BinlogOffset NO_STOPPING_OFFSET = new BinlogOffset("", Long.MIN_VALUE);
private final String filename;
private final long position;
private final Map<String, String> offset;
public BinlogOffset(Map<String, String> offset) {
this.offset = offset;
}
public BinlogOffset(String filename, long position) {
Preconditions.checkNotNull(filename);
this.filename = filename;
this.position = position;
this(filename, position, 0L, 0L, 0L, null, null);
}
public BinlogOffset(
String filename,
long position,
long restartSkipEvents,
long restartSkipRows,
long binlogEpochSecs,
@Nullable String restartGtidSet,
@Nullable Integer serverId) {
Map<String, String> offsetMap = new HashMap<>();
offsetMap.put(BINLOG_FILENAME_OFFSET_KEY, filename);
offsetMap.put(BINLOG_POSITION_OFFSET_KEY, String.valueOf(position));
offsetMap.put(EVENTS_TO_SKIP_OFFSET_KEY, String.valueOf(restartSkipEvents));
offsetMap.put(ROWS_TO_SKIP_OFFSET_KEY, String.valueOf(restartSkipRows));
offsetMap.put(TIMESTAMP_KEY, String.valueOf(binlogEpochSecs));
if (restartGtidSet != null) {
offsetMap.put(GTID_SET_KEY, restartGtidSet);
}
if (serverId != null) {
offsetMap.put(SERVER_ID_KEY, String.valueOf(serverId));
}
this.offset = offsetMap;
}
public Map<String, String> getOffset() {
return offset;
}
public String getFilename() {
return filename;
return offset.get(BINLOG_FILENAME_OFFSET_KEY);
}
public long getPosition() {
return position;
return longOffsetValue(offset, BINLOG_POSITION_OFFSET_KEY);
}
public long getRestartSkipEvents() {
return longOffsetValue(offset, EVENTS_TO_SKIP_OFFSET_KEY);
}
public long getRestartSkipRows() {
return longOffsetValue(offset, ROWS_TO_SKIP_OFFSET_KEY);
}
public String getGtidSet() {
return offset.get(GTID_SET_KEY);
}
public long getTimestamp() {
return longOffsetValue(offset, TIMESTAMP_KEY);
}
public Long getServerId() {
return longOffsetValue(offset, SERVER_ID_KEY);
}
private long longOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj == null) {
return 0L;
}
if (obj instanceof Number) {
return ((Number) obj).longValue();
}
try {
return Long.parseLong(obj.toString());
} catch (NumberFormatException e) {
throw new ConnectException(
"Source offset '"
+ key
+ "' parameter value "
+ obj
+ " could not be converted to a long");
}
}
/**
* This method is inspired by {@link io.debezium.relational.history.HistoryRecordComparator}.
*/
@Override
public int compareTo(BinlogOffset o) {
if (this.filename.equals(o.filename)) {
return Long.compare(this.position, o.position);
} else {
// The bing log filenames are ordered
return this.getFilename().compareTo(o.getFilename());
public int compareTo(BinlogOffset that) {
// the NO_STOPPING_OFFSET is the max offset
if (NO_STOPPING_OFFSET.equals(that) && NO_STOPPING_OFFSET.equals(this)) {
return 0;
}
if (NO_STOPPING_OFFSET.equals(this)) {
return 1;
}
if (NO_STOPPING_OFFSET.equals(that)) {
return -1;
}
String gtidSetStr = this.getGtidSet();
String targetGtidSetStr = that.getGtidSet();
if (StringUtils.isNotEmpty(targetGtidSetStr)) {
// The target offset uses GTIDs, so we ideally compare using GTIDs ...
if (StringUtils.isNotEmpty(gtidSetStr)) {
// Both have GTIDs, so base the comparison entirely on the GTID sets.
GtidSet gtidSet = new GtidSet(gtidSetStr);
GtidSet targetGtidSet = new GtidSet(targetGtidSetStr);
if (gtidSet.equals(targetGtidSet)) {
long restartSkipEvents = this.getRestartSkipEvents();
long targetRestartSkipEvents = that.getRestartSkipEvents();
return Long.compare(restartSkipEvents, targetRestartSkipEvents);
}
// The GTIDs are not an exact match, so figure out if this is a subset of the target
// offset
// ...
return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1;
}
// The target offset did use GTIDs while this did not use GTIDs. So, we assume
// that this offset is older since GTIDs are often enabled but rarely disabled.
// And if they are disabled,
// it is likely that this offset would not include GTIDs as we would be trying
// to read the binlog of a
// server that no longer has GTIDs. And if they are enabled, disabled, and re-enabled,
// per
// https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all properly
// configured slaves that
// use GTIDs should always have the complete set of GTIDs copied from the master, in
// which case
// again we know that this offset not having GTIDs is before the target offset ...
return -1;
} else if (StringUtils.isNotEmpty(gtidSetStr)) {
// This offset has a GTID but the target offset does not, so per the previous paragraph
// we
// assume that previous
// is not at or before ...
return 1;
}
// Both offsets are missing GTIDs. Look at the servers ...
long serverId = this.getServerId();
long targetServerId = that.getServerId();
if (serverId != targetServerId) {
// These are from different servers, and their binlog coordinates are not related. So
// the only thing we can do
// is compare timestamps, and we have to assume that the server timestamps can be
// compared ...
long timestamp = this.getTimestamp();
long targetTimestamp = that.getTimestamp();
return Long.compare(timestamp, targetTimestamp);
}
// First compare the MySQL binlog filenames
if (this.getFilename().compareToIgnoreCase(that.getFilename()) != 0) {
return this.getFilename().compareToIgnoreCase(that.getFilename());
}
// The filenames are the same, so compare the positions
if (this.getPosition() != that.getPosition()) {
return Long.compare(this.getPosition(), that.getPosition());
}
// The positions are the same, so compare the completed events in the transaction ...
if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) {
return Long.compare(this.getRestartSkipEvents(), that.getRestartSkipEvents());
}
// The completed events are the same, so compare the row number ...
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
}
public boolean isAtOrBefore(BinlogOffset that) {
@ -72,7 +236,7 @@ public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
@Override
public String toString() {
return filename + ":" + position;
return "BinlogOffset{" + "offset=" + offset + '}';
}
@Override
@ -80,43 +244,15 @@ public class BinlogOffset implements Comparable<BinlogOffset>, Serializable {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (!(o instanceof BinlogOffset)) {
return false;
}
BinlogOffset that = (BinlogOffset) o;
return position == that.position && Objects.equals(filename, that.filename);
return offset.equals(that.offset);
}
@Override
public int hashCode() {
return Objects.hash(filename, position);
}
public static BinlogOffset getCurrentBinlogPosition(JdbcConnection jdbcConnection) {
AtomicReference<BinlogOffset> currentBinlogPosition =
new AtomicReference<>(BinlogOffset.INITIAL_OFFSET);
try {
jdbcConnection.setAutoCommit(false);
String showMasterStmt = "SHOW MASTER STATUS";
jdbcConnection.query(
showMasterStmt,
rs -> {
if (rs.next()) {
String binlogFilename = rs.getString(1);
long binlogPosition = rs.getLong(2);
currentBinlogPosition.set(
new BinlogOffset(binlogFilename, binlogPosition));
} else {
throw new IllegalStateException(
"Cannot read the binlog filename and position via '"
+ showMasterStmt
+ "'. Make sure your server is correctly configured");
}
});
jdbcConnection.commit();
} catch (Exception e) {
throw new FlinkRuntimeException("Read current binlog position error.", e);
}
return currentBinlogPosition.get();
return Objects.hashCode(offset);
}
}

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.offset;
import org.apache.flink.annotation.Internal;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
/** Serializer implementation for a {@link BinlogOffset}. */
@Internal
public class BinlogOffsetSerializer {
public static final BinlogOffsetSerializer INSTANCE = new BinlogOffsetSerializer();
public byte[] serialize(BinlogOffset binlogOffset) throws IOException {
// use JSON serialization
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsBytes(binlogOffset.getOffset());
}
public BinlogOffset deserialize(byte[] bytes) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, String> offset = objectMapper.readValue(bytes, Map.class);
return new BinlogOffset(offset);
}
}

@ -48,7 +48,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
public static final MySqlSplitSerializer INSTANCE = new MySqlSplitSerializer();
private static final int VERSION = 1;
private static final int VERSION = 2;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@ -98,7 +98,6 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
out.writeInt(BINLOG_SPLIT_FLAG);
out.writeUTF(binlogSplit.splitId());
out.writeUTF(binlogSplit.getSplitKeyType().asSerializableString());
writeBinlogPosition(binlogSplit.getStartingOffset(), out);
writeBinlogPosition(binlogSplit.getEndingOffset(), out);
writeFinishedSplitsInfo(binlogSplit.getFinishedSnapshotSplitInfos(), out);
@ -114,13 +113,16 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
@Override
public MySqlSplit deserialize(int version, byte[] serialized) throws IOException {
if (version == VERSION) {
return deserializeV1(serialized);
}
switch (version) {
case 1:
case 2:
return deserializeSplit(version, serialized);
default:
throw new IOException("Unknown version: " + version);
}
}
public MySqlSplit deserializeV1(byte[] serialized) throws IOException {
public MySqlSplit deserializeSplit(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
int splitKind = in.readInt();
@ -130,7 +132,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF());
Object[] splitBoundaryStart = serializedStringToRow(in.readUTF());
Object[] splitBoundaryEnd = serializedStringToRow(in.readUTF());
BinlogOffset highWatermark = readBinlogPosition(in);
BinlogOffset highWatermark = readBinlogPosition(version, in);
Map<TableId, TableChange> tableSchemas = readTableSchemas(in);
return new MySqlSnapshotSplit(
@ -144,9 +146,10 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
} else if (splitKind == BINLOG_SPLIT_FLAG) {
String splitId = in.readUTF();
RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF());
BinlogOffset startingOffset = readBinlogPosition(in);
BinlogOffset endingOffset = readBinlogPosition(in);
List<FinishedSnapshotSplitInfo> finishedSplitsInfo = readFinishedSplitsInfo(in);
BinlogOffset startingOffset = readBinlogPosition(version, in);
BinlogOffset endingOffset = readBinlogPosition(version, in);
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
readFinishedSplitsInfo(version, in);
Map<TableId, TableChange> tableChangeMap = readTableSchemas(in);
in.releaseArrays();
return new MySqlBinlogSplit(
@ -201,8 +204,8 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
}
}
private static List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(DataInputDeserializer in)
throws IOException {
private static List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(
int version, DataInputDeserializer in) throws IOException {
List<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
@ -210,7 +213,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
String splitId = in.readUTF();
Object[] splitStart = serializedStringToRow(in.readUTF());
Object[] splitEnd = serializedStringToRow(in.readUTF());
BinlogOffset highWatermark = readBinlogPosition(in);
BinlogOffset highWatermark = readBinlogPosition(version, in);
finishedSplitsInfo.add(
new FinishedSnapshotSplitInfo(
tableId, splitId, splitStart, splitEnd, highWatermark));

@ -47,8 +47,6 @@ import java.util.Map;
import java.util.Optional;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.SIGNAL_EVENT_VALUE_SCHEMA_NAME;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.SPLIT_ID_KEY;
import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WATERMARK_KIND;
@ -230,10 +228,7 @@ public class RecordUtils {
}
public static BinlogOffset getWatermark(SourceRecord watermarkEvent) {
Struct value = (Struct) watermarkEvent.value();
String file = value.getString(BINLOG_FILENAME_OFFSET_KEY);
Long position = value.getInt64(BINLOG_POSITION_OFFSET_KEY);
return new BinlogOffset(file, position);
return getBinlogPosition(watermarkEvent.sourceOffset());
}
/**
@ -290,14 +285,12 @@ public class RecordUtils {
MySqlSnapshotSplit split, SourceRecord highWatermark) {
Struct value = (Struct) highWatermark.value();
String splitId = value.getString(SPLIT_ID_KEY);
String file = value.getString(BINLOG_FILENAME_OFFSET_KEY);
Long position = value.getInt64(BINLOG_POSITION_OFFSET_KEY);
return new FinishedSnapshotSplitInfo(
split.getTableId(),
splitId,
split.getSplitStart(),
split.getSplitEnd(),
new BinlogOffset(file, position));
getBinlogPosition(highWatermark.sourceOffset()));
}
/** Returns the start offset of the binlog split. */
@ -339,11 +332,16 @@ public class RecordUtils {
}
public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String fileName = (String) source.get(BINLOG_FILENAME_OFFSET_KEY);
Long position = (Long) (source.get(BINLOG_POSITION_OFFSET_KEY));
return new BinlogOffset(fileName, position);
return getBinlogPosition(dataRecord.sourceOffset());
}
public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
Map<String, String> offsetStrMap = new HashMap<>();
for (Map.Entry<String, ?> entry : offset.entrySet()) {
offsetStrMap.put(
entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
return new BinlogOffset(offsetStrMap);
}
/** Returns the specific key contains in the split key range or not. */

@ -22,6 +22,7 @@ import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetSerializer;
import io.debezium.DebeziumException;
import io.debezium.util.HexConverter;
@ -40,13 +41,30 @@ public class SerializerUtils {
throws IOException {
out.writeBoolean(offset != null);
if (offset != null) {
out.writeUTF(offset.getFilename());
out.writeLong(offset.getPosition());
byte[] binlogOffsetBytes = BinlogOffsetSerializer.INSTANCE.serialize(offset);
out.writeInt(binlogOffsetBytes.length);
out.write(binlogOffsetBytes);
}
}
public static BinlogOffset readBinlogPosition(DataInputDeserializer in) throws IOException {
public static BinlogOffset readBinlogPosition(int offsetVersion, DataInputDeserializer in)
throws IOException {
switch (offsetVersion) {
case 1:
return in.readBoolean() ? new BinlogOffset(in.readUTF(), in.readLong()) : null;
case 2:
boolean offsetNonNull = in.readBoolean();
if (offsetNonNull) {
int binlogOffsetBytesLength = in.readInt();
byte[] binlogOffsetBytes = new byte[binlogOffsetBytesLength];
in.readFully(binlogOffsetBytes);
return BinlogOffsetSerializer.INSTANCE.deserialize(binlogOffsetBytes);
} else {
return null;
}
default:
throw new IOException("Unknown version: " + offsetVersion);
}
}
public static String rowToSerializedString(Object[] splitBoundary) {

@ -28,7 +28,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.junit.Ignore;

@ -25,8 +25,8 @@ import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import com.fasterxml.jackson.core.JsonParseException;
import com.jayway.jsonpath.JsonPath;
import com.ververica.cdc.connectors.mysql.MySqlTestUtils.TestingListState;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.document.Document;

@ -20,16 +20,13 @@ package com.ververica.cdc.connectors.mysql;
import org.apache.flink.test.util.AbstractTestBase;
import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.stream.Stream;
/**
@ -43,7 +40,7 @@ public abstract class MySqlTestBase extends AbstractTestBase {
protected static final MySqlContainer MYSQL_CONTAINER =
(MySqlContainer)
new MySqlContainer()
.withConfigurationOverride("docker/my.cnf")
.withConfigurationOverride("docker/server/my.cnf")
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
@ -56,11 +53,4 @@ public abstract class MySqlTestBase extends AbstractTestBase {
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("Containers are started.");
}
protected Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(),
MYSQL_CONTAINER.getUsername(),
MYSQL_CONTAINER.getPassword());
}
}

@ -32,7 +32,7 @@ import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.connectors.utils.TestSourceContext;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;

@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.ValidationException;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSource;
import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
@ -98,7 +98,7 @@ public class MySqlValidatorTest {
String.format(
"Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is %s.",
version);
doValidate(version, "docker/my.cnf", message);
doValidate(version, "docker/server/my.cnf", message);
}
@Test

@ -18,22 +18,16 @@
package com.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
@ -41,9 +35,8 @@ import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
@ -68,14 +61,13 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlTestBase {
public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
private static final int currentParallelism = 4;
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -305,9 +297,6 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
final RowType pkType =
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType();
String[] expected =
new String[] {
"-U[103, user_3, Shanghai, 123567891234]",
@ -324,7 +313,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
};
List<String> actual =
readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertThat(actual, containsInAnyOrder(expected));
}
private List<String> readBinlogSplitsFromLatestOffset(
@ -359,7 +348,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
while (recordIterator.hasNext()) {
fetchedRecords.add(recordIterator.next());
}
actual = formatResult(fetchedRecords, dataType);
actual.addAll(formatResult(fetchedRecords, dataType));
fetchedRecords.clear();
if (actual.size() >= expectedSize) {
break;
}
@ -436,7 +426,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
while (recordIterator.hasNext()) {
fetchedRecords.add(recordIterator.next());
}
actual = formatResult(fetchedRecords, dataType);
actual.addAll(formatResult(fetchedRecords, dataType));
fetchedRecords.clear();
if (actual.size() >= expectedSize) {
break;
}
@ -447,6 +438,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
private void makeCustomersBinlogEvents(
JdbcConnection connection, String tableId, boolean firstSplitOnly) throws SQLException {
// make binlog events for the first split
connection.setAutoCommit(false);
connection.execute(
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
@ -558,36 +550,14 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
return finishedSplitsInfo;
}
private List<String> formatResult(List<SourceRecord> records, DataType dataType)
throws Exception {
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
final DebeziumDeserializationSchema<RowData> deserializationSchema =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), ZoneId.of("UTC"));
SimpleCollector collector = new SimpleCollector();
RowRowConverter rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
// filter signal event
// filter schema change event
for (SourceRecord r : records) {
if (!isWatermarkEvent(r)) {
if (!isSchemaChangeEvent(r)) {
deserializationSchema.deserialize(r, collector);
}
}
}
return collector.list.stream()
.map(rowRowConverter::toExternal)
.map(Row::toString)
.sorted()
.collect(Collectors.toList());
private List<String> formatResult(List<SourceRecord> records, DataType dataType) {
final RecordsFormatter formatter = new RecordsFormatter(dataType);
return formatter.format(records);
}
private List<MySqlSnapshotSplit> getMySqlSplits(Configuration configuration) {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM);
assigner.open();
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
while (true) {
@ -625,19 +595,4 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
return Configuration.fromMap(properties);
}
private static class SimpleCollector implements Collector<RowData> {
private List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
list.add(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -18,27 +18,18 @@
package com.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.BeforeClass;
@ -58,9 +49,8 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
/** Tests for {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderTest extends MySqlTestBase {
public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
private static final int currentParallelism = 4;
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -234,36 +224,13 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
}
private List<String> formatResult(List<SourceRecord> records, DataType dataType) {
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo =
(TypeInformation<RowData>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
final DebeziumDeserializationSchema<RowData> deserializationSchema =
new RowDataDebeziumDeserializeSchema(
rowType, typeInfo, ((rowData, rowKind) -> {}), ZoneId.of("UTC"));
SimpleCollector collector = new SimpleCollector();
RowRowConverter rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
records.stream()
// filter signal event
.filter(r -> !RecordUtils.isWatermarkEvent(r))
.forEach(
r -> {
try {
deserializationSchema.deserialize(r, collector);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return collector.list.stream()
.map(rowRowConverter::toExternal)
.map(Row::toString)
.sorted()
.collect(Collectors.toList());
final RecordsFormatter formatter = new RecordsFormatter(dataType);
return formatter.format(records);
}
private List<MySqlSplit> getMySqlSplits(Configuration configuration) {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM);
assigner.open();
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
while (true) {
@ -302,19 +269,4 @@ public class SnapshotSplitReaderTest extends MySqlTestBase {
properties.put("scan.snapshot.fetch.size", "2");
return Configuration.fromMap(properties);
}
static class SimpleCollector implements Collector<RowData> {
private List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
list.add(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -18,11 +18,42 @@
package com.ververica.cdc.connectors.mysql.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.junit.Test;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** IT tests for {@link MySqlParallelSource}. */
public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
private final UniqueDatabase customDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@Test
public void testReadSingleTableWithSingleParallelism() throws Exception {
testMySqlParallelSource(
@ -89,4 +120,264 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
testMySqlParallelSource(
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"customers"});
}
private void testMySqlParallelSource(
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM, failoverType, failoverPhase, captureCustomerTables);
}
private void testMySqlParallelSource(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'server-id' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getTableName(captureCustomerTables),
getServerId());
// first step: check the snapshot data
String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
// trigger checkpoint after some snapshot splits read finished
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
}
String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]);
assertThat(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
// second step: check the binlog data
for (String tableId : captureCustomerTables) {
makeFirstPartBinlogEvents(
getConnection(), customDatabase.getDatabaseName() + '.' + tableId);
}
if (failoverPhase == FailoverPhase.BINLOG) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
}
for (String tableId : captureCustomerTables) {
makeSecondPartBinlogEvents(
getConnection(), customDatabase.getDatabaseName() + '.' + tableId);
}
String[] binlogForSingleTable =
new String[] {
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, user_11, Hangzhou, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]",
};
List<String> expectedBinlogData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
}
String[] expectedBinlog = expectedBinlogData.toArray(new String[0]);
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
tableResult.getJobClient().get().cancel().get();
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String getTableName(String[] captureCustomerTables) {
if (captureCustomerTables.length == 1) {
return "customers";
} else {
// pattern that matches test table: customers and customers_1
return "customers.*";
}
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + DEFAULT_PARALLELISM);
}
private void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId)
throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events for the first split
connection.execute(
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
connection.commit();
} finally {
connection.close();
}
}
private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId)
throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events for split-1
connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010");
connection.commit();
// make binlog events for the last split
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(2001, 'user_22','Shanghai','123567891234'),"
+ " (2002, 'user_23','Shanghai','123567891234'),"
+ "(2003, 'user_24','Shanghai','123567891234')");
connection.commit();
} finally {
connection.close();
}
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", customDatabase.getUsername());
properties.put("database.password", customDatabase.getPassword());
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
Configuration configuration = Configuration.fromMap(properties);
return StatefulTaskContext.getConnection(configuration);
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
/** The type of failover. */
private enum FailoverType {
TM,
JM,
NONE
}
/** The phase of failover. */
private enum FailoverPhase {
SNAPSHOT,
BINLOG,
NEVER
}
private static void triggerFailover(
FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
switch (type) {
case TM:
restartTaskManager(miniCluster, afterFailAction);
break;
case JM:
triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
break;
case NONE:
break;
default:
throw new IllegalStateException("Unexpected value: " + type);
}
}
private static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
private static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
miniCluster.terminateTaskManager(0).get();
afterFailAction.run();
miniCluster.startTaskManager();
}
}

@ -18,26 +18,12 @@
package com.ververica.cdc.connectors.mysql.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
@ -45,29 +31,18 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** Basic class for testing {@link MySqlParallelSource}. */
public abstract class MySqlParallelSourceTestBase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(MySqlParallelSourceTestBase.class);
private static final int PARALLELISM = 4;
private static final MySqlContainer MYSQL_CONTAINER =
protected static final int DEFAULT_PARALLELISM = 4;
protected static final MySqlContainer MYSQL_CONTAINER =
(MySqlContainer)
new MySqlContainer()
.withConfigurationOverride("docker/my.cnf")
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
@ -79,277 +54,15 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger {
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
private final UniqueDatabase customDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("Containers are started.");
}
protected void testMySqlParallelSource(
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
throws Exception {
testMySqlParallelSource(PARALLELISM, failoverType, failoverPhase, captureCustomerTables);
}
protected void testMySqlParallelSource(
int parallelism,
FailoverType failoverType,
FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'server-id' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getTableName(captureCustomerTables),
getServerId());
// first step: check the snapshot data
String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
// trigger checkpoint after some snapshot splits read finished
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
}
String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]);
assertThat(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
// second step: check the binlog data
for (String tableId : captureCustomerTables) {
makeFirstPartBinlogEvents(
getConnection(), customDatabase.getDatabaseName() + '.' + tableId);
}
if (failoverPhase == FailoverPhase.BINLOG) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
}
for (String tableId : captureCustomerTables) {
makeSecondPartBinlogEvents(
getConnection(), customDatabase.getDatabaseName() + '.' + tableId);
}
String[] binlogForSingleTable =
new String[] {
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, user_11, Hangzhou, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]",
};
List<String> expectedBinlogData = new ArrayList<>();
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
}
String[] expectedBinlog = expectedBinlogData.toArray(new String[0]);
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
tableResult.getJobClient().get().cancel().get();
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String getTableName(String[] captureCustomerTables) {
if (captureCustomerTables.length == 1) {
return "customers";
} else {
// pattern that matches test table: customers and customers_1
return "customers.*";
}
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + PARALLELISM);
}
private void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId)
throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events for the first split
connection.execute(
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
connection.commit();
} finally {
connection.close();
}
}
private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId)
throws SQLException {
try {
connection.setAutoCommit(false);
// make binlog events for split-1
connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010");
connection.commit();
// make binlog events for the last split
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(2001, 'user_22','Shanghai','123567891234'),"
+ " (2002, 'user_23','Shanghai','123567891234'),"
+ "(2003, 'user_24','Shanghai','123567891234')");
connection.commit();
} finally {
connection.close();
}
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", customDatabase.getUsername());
properties.put("database.password", customDatabase.getPassword());
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
Configuration configuration = Configuration.fromMap(properties);
return StatefulTaskContext.getConnection(configuration);
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
/** The type of failover. */
protected enum FailoverType {
TM,
JM,
NONE
}
/** The phase of failover. */
protected enum FailoverPhase {
SNAPSHOT,
BINLOG,
NEVER
}
private static void triggerFailover(
FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
switch (type) {
case TM:
restartTaskManager(miniCluster, afterFailAction);
break;
case JM:
triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
break;
case NONE:
break;
default:
throw new IllegalStateException("Unexpected value: " + type);
}
}
private static void triggerJobManagerFailover(
JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get();
haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
private static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction)
throws Exception {
miniCluster.terminateTaskManager(0).get();
afterFailAction.run();
miniCluster.startTaskManager();
}
}

@ -24,8 +24,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import com.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
@ -33,7 +33,7 @@ import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import org.junit.BeforeClass;
@ -47,15 +47,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
/** Tests for {@link MySqlHybridSplitAssigner}. */
public class MySqlHybridSplitAssignerTest extends MySqlTestBase {
public class MySqlHybridSplitAssignerTest extends MySqlParallelSourceTestBase {
private static final int currentParallelism = 4;
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -115,7 +113,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlTestBase {
HybridPendingSplitsState checkpoint =
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(configuration, currentParallelism, checkpoint);
new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
Optional<MySqlSplit> binlogSplit = assigner.getNext();
@ -159,7 +157,6 @@ public class MySqlHybridSplitAssignerTest extends MySqlTestBase {
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
properties.put("snapshot.mode", "initial");
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
properties.put("database.history.instance.name", UUID.randomUUID().toString());
return Configuration.fromMap(properties);
}
}

@ -21,10 +21,10 @@ package com.ververica.cdc.connectors.mysql.source.assigners;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.BeforeClass;
import org.junit.Test;
@ -36,7 +36,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
@ -46,9 +45,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for {@link MySqlSnapshotSplitAssigner}. */
public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
public class MySqlSnapshotSplitAssignerTest extends MySqlParallelSourceTestBase {
private static final int currentParallelism = 4;
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@ -141,7 +139,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
configuration.setString("table.whitelist", String.join(",", captureTableIds));
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM);
assigner.open();
List<MySqlSplit> sqlSplits = new ArrayList<>();
@ -290,7 +288,6 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
properties.put("snapshot.mode", "initial");
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
properties.put("database.history.instance.name", UUID.randomUUID().toString());
return Configuration.fromMap(properties);
}
}

@ -76,7 +76,7 @@ public class PendingSplitsStateSerializerTest {
final PendingSplitsStateSerializer serializer =
new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
byte[] serialized = serializer.serialize(state);
return serializer.deserialize(1, serialized);
return serializer.deserialize(2, serialized);
}
private static SnapshotPendingSplitsState getTestSnapshotPendingSplitsState() {
@ -128,6 +128,7 @@ public class PendingSplitsStateSerializerTest {
}
private static MySqlSnapshotSplit getTestSnapshotSplit(TableId tableId, int splitNo) {
long restartSkipEvent = splitNo;
return new MySqlSnapshotSplit(
tableId,
tableId.toString() + "-" + splitNo,
@ -135,7 +136,8 @@ public class PendingSplitsStateSerializerTest {
Collections.singletonList(new RowType.RowField("id", new BigIntType()))),
new Object[] {100L + splitNo * 1000},
new Object[] {999L + splitNo * 1000},
new BinlogOffset("mysql-bin.000001", 78L + splitNo * 200),
new BinlogOffset(
"mysql-bin.000001", 78L + splitNo * 200, restartSkipEvent, 0L, 0L, null, 0),
new HashMap<>());
}

@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.reader;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
/** Tests for {@link MySqlSourceReader}. */
public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase {
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
@Test
public void testBinlogReadFailoverCrossTransaction() throws Exception {
customerDatabase.createAndInitialize();
final Configuration configuration = getConfig(new String[] {"customers"});
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
MySqlSplit binlogSplit = createBinlogSplit(configuration);
MySqlSourceReader<SourceRecord> reader = createReader(configuration);
reader.start();
reader.addSplits(Arrays.asList(binlogSplit));
// step-1: make 6 change events in one MySQL transaction
TableId tableId = binlogSplit.getTableSchemas().keySet().iterator().next();
makeBinlogEventsInOneTransaction(configuration, tableId.toString());
// step-2: fetch the first 2 records belong to the MySQL transaction
String[] expectedRecords =
new String[] {
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]"
};
// the 2 records are produced by 1 operations
List<String> actualRecords = consumeRecords(reader, dataType, 1);
assertEquals(
Arrays.stream(expectedRecords).sorted().collect(Collectors.toList()),
actualRecords);
List<MySqlSplit> splitsState = reader.snapshotState(1L);
// check the binlog split state
assertEquals(1, splitsState.size());
reader.close();
// step-3: mock failover from a restored state
MySqlSourceReader<SourceRecord> restartReader = createReader(configuration);
restartReader.start();
restartReader.addSplits(splitsState);
// step-4: fetch the rest 4 records belong to the MySQL transaction
String[] expectedRestRecords =
new String[] {
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]"
};
// the 4 records are produced by 3 operations
List<String> restRecords = consumeRecords(restartReader, dataType, 3);
assertEquals(
Arrays.stream(expectedRestRecords).sorted().collect(Collectors.toList()),
restRecords);
restartReader.close();
}
private MySqlSourceReader<SourceRecord> createReader(Configuration configuration) {
final FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementsQueue =
new FutureCompletingBlockingQueue<>();
final SourceReaderContext readerContext = new TestingReaderContext();
final MySqlRecordEmitter<SourceRecord> recordEmitter =
new MySqlRecordEmitter<>(
new ForwardDeserializeSchema(),
new MySqlSourceReaderMetrics(readerContext.metricGroup()));
return new MySqlSourceReader<>(
elementsQueue,
() -> createSplitReader(configuration),
recordEmitter,
configuration,
readerContext);
}
private MySqlSplitReader createSplitReader(Configuration configuration) {
return new MySqlSplitReader(configuration, 0);
}
private void makeBinlogEventsInOneTransaction(Configuration configuration, String tableId)
throws SQLException {
JdbcConnection connection = StatefulTaskContext.getConnection(configuration);
// make 6 binlog events by 4 operations
connection.setAutoCommit(false);
connection.execute(
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103");
connection.commit();
connection.close();
}
private MySqlSplit createBinlogSplit(Configuration configuration) {
MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(configuration);
binlogSplitAssigner.open();
return binlogSplitAssigner.getNext().get();
}
private Configuration getConfig(String[] captureTables) {
Map<String, String> properties = new HashMap<>();
properties.put("database.server.name", "embedded-test");
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", customerDatabase.getUsername());
properties.put("database.password", customerDatabase.getPassword());
properties.put("database.whitelist", customerDatabase.getDatabaseName());
properties.put("database.history.skip.unparseable.ddl", "true");
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
properties.put("snapshot.mode", "initial");
properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName());
properties.put("database.history.instance.name", UUID.randomUUID().toString());
List<String> captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.collect(Collectors.toList());
properties.put("table.whitelist", String.join(",", captureTableIds));
properties.put("scan.incremental.snapshot.chunk.size", "10");
properties.put("scan.snapshot.fetch.size", "2");
properties.put("scan.startup.mode", "latest-offset");
return Configuration.fromMap(properties);
}
private List<String> consumeRecords(
MySqlSourceReader<SourceRecord> sourceReader, DataType recordType, int changeEventNum)
throws Exception {
// Poll all the n records of the single split.
final SimpleReaderOutput output = new SimpleReaderOutput();
while (output.getResults().size() < changeEventNum) {
sourceReader.pollNext(output);
}
final RecordsFormatter formatter = new RecordsFormatter(recordType);
return formatter.format(output.getResults());
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
private static class SimpleReaderOutput implements ReaderOutput<SourceRecord> {
private final List<SourceRecord> results = new ArrayList<>();
@Override
public void collect(SourceRecord record) {
results.add(record);
}
public List<SourceRecord> getResults() {
return results;
}
@Override
public void collect(SourceRecord record, long timestamp) {
collect(record);
}
@Override
public void emitWatermark(Watermark watermark) {}
@Override
public void markIdle() {}
@Override
public SourceOutput<SourceRecord> createOutputForSplit(java.lang.String splitId) {
return this;
}
@Override
public void releaseOutputForSplit(java.lang.String splitId) {}
}
private static class ForwardDeserializeSchema
implements DebeziumDeserializationSchema<SourceRecord> {
private static final long serialVersionUID = 1L;
@Override
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
out.collect(record);
}
@Override
public TypeInformation<SourceRecord> getProducedType() {
return TypeInformation.of(SourceRecord.class);
}
}
}

@ -127,7 +127,7 @@ public class MySqlSplitSerializerTest {
private MySqlSplit serializeAndDeserializeSplit(MySqlSplit split) throws Exception {
final MySqlSplitSerializer sqlSplitSerializer = new MySqlSplitSerializer();
byte[] serialized = sqlSplitSerializer.serialize(split);
return sqlSplitSerializer.deserializeV1(serialized);
return sqlSplitSerializer.deserialize(sqlSplitSerializer.getVersion(), serialized);
}
public static TableChange getTestTableSchema() throws Exception {

@ -27,8 +27,8 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.mysql.MySqlTestBase;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -49,7 +49,7 @@ import static org.junit.Assert.assertThat;
/** Integration tests for MySQL binlog SQL source. */
@RunWith(Parameterized.class)
public class MySqlConnectorITCase extends MySqlTestBase {
public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
@ -95,7 +95,7 @@ public class MySqlConnectorITCase extends MySqlTestBase {
public void before() {
TestValuesTableFactory.clearAllData();
if (incrementalSnapshot) {
env.setParallelism(4);
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200);
} else {
env.setParallelism(1);

@ -26,8 +26,8 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.mysql.MySqlValidatorTest;
import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

@ -16,7 +16,7 @@
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils;
package com.ververica.cdc.connectors.mysql.testutils;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.JdbcDatabaseContainer;

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.testutils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
/** Formatter that formats the {@link org.apache.kafka.connect.source.SourceRecord} to String. */
public class RecordsFormatter {
private final DataType dataType;
private final ZoneId zoneId;
private TypeInformation<RowData> typeInfo;
private DebeziumDeserializationSchema<RowData> deserializationSchema;
private SimpleCollector collector;
private RowRowConverter rowRowConverter;
public RecordsFormatter(DataType dataType) {
this(dataType, ZoneId.of("UTC"));
}
public RecordsFormatter(DataType dataType, ZoneId zoneId) {
this.dataType = dataType;
this.zoneId = zoneId;
this.typeInfo =
(TypeInformation<RowData>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
this.deserializationSchema =
new RowDataDebeziumDeserializeSchema(
(RowType) dataType.getLogicalType(),
typeInfo,
((rowData, rowKind) -> {}),
ZoneId.of("UTC"));
this.collector = new SimpleCollector();
this.rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
}
public List<String> format(List<SourceRecord> records) {
records.stream()
// filter signal event
.filter(r -> !isWatermarkEvent(r))
// filter schema change event
.filter(r -> !isSchemaChangeEvent(r))
.forEach(
r -> {
try {
deserializationSchema.deserialize(r, collector);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return collector.list.stream()
.map(rowRowConverter::toExternal)
.map(Row::toString)
.sorted()
.collect(Collectors.toList());
}
private static class SimpleCollector implements Collector<RowData> {
private List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
list.add(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -16,7 +16,7 @@
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils;
package com.ververica.cdc.connectors.mysql.testutils;
import java.net.URL;
import java.nio.file.Files;

@ -0,0 +1,63 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid
# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------
# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on

@ -57,5 +57,3 @@ server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row
Loading…
Cancel
Save