diff --git a/flink-connector-mysql-cdc/pom.xml b/flink-connector-mysql-cdc/pom.xml index 776b4eb9a..a1277fdbe 100644 --- a/flink-connector-mysql-cdc/pom.xml +++ b/flink-connector-mysql-cdc/pom.xml @@ -96,6 +96,12 @@ under the License. ${flink.version} test + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + org.apache.flink diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java index f08737c67..d746d7564 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -85,7 +85,10 @@ public class DebeziumUtils { if (rs.next()) { final String binlogFilename = rs.getString(1); final long binlogPosition = rs.getLong(2); - return new BinlogOffset(binlogFilename, binlogPosition); + final String gtidSet = + rs.getMetaData().getColumnCount() > 4 ? rs.getString(5) : null; + return new BinlogOffset( + binlogFilename, binlogPosition, 0L, 0, 0, gtidSet, null); } else { throw new FlinkRuntimeException( "Cannot read the binlog filename and position via '" diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java index b8e94ef24..1691207b9 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/dispatcher/SignalEventDispatcher.java @@ -21,7 +21,6 @@ package com.ververica.cdc.connectors.mysql.debezium.dispatcher; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import io.debezium.connector.base.ChangeEventQueue; -import io.debezium.connector.mysql.MySqlOffsetContext; import io.debezium.pipeline.DataChangeEvent; import io.debezium.util.SchemaNameAdjuster; import org.apache.kafka.connect.data.Schema; @@ -29,6 +28,8 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import java.util.Map; + /** * A dispatcher to dispatch watermark signal events. * @@ -53,15 +54,13 @@ public class SignalEventDispatcher { private final Schema signalEventKeySchema; private final Schema signalEventValueSchema; - private final MySqlOffsetContext offsetContext; + private final Map sourcePartition; private final String topic; private final ChangeEventQueue queue; public SignalEventDispatcher( - MySqlOffsetContext offsetContext, - String topic, - ChangeEventQueue queue) { - this.offsetContext = offsetContext; + Map sourcePartition, String topic, ChangeEventQueue queue) { + this.sourcePartition = sourcePartition; this.topic = topic; this.queue = queue; this.signalEventKeySchema = @@ -75,8 +74,6 @@ public class SignalEventDispatcher { .name(SCHEMA_NAME_ADJUSTER.adjust(SIGNAL_EVENT_VALUE_SCHEMA_NAME)) .field(SPLIT_ID_KEY, Schema.STRING_SCHEMA) .field(WATERMARK_KIND, Schema.STRING_SCHEMA) - .field(BINLOG_FILENAME_OFFSET_KEY, Schema.STRING_SCHEMA) - .field(BINLOG_POSITION_OFFSET_KEY, Schema.INT64_SCHEMA) .build(); } @@ -86,13 +83,13 @@ public class SignalEventDispatcher { SourceRecord sourceRecord = new SourceRecord( - offsetContext.getPartition(), - offsetContext.getOffset(), + sourcePartition, + watermark.getOffset(), topic, signalEventKeySchema, signalRecordKey(mySqlSplit.splitId()), signalEventValueSchema, - signalRecordValue(mySqlSplit.splitId(), watermark, watermarkKind)); + signalRecordValue(mySqlSplit.splitId(), watermarkKind)); queue.enqueue(new DataChangeEvent(sourceRecord)); } @@ -103,13 +100,10 @@ public class SignalEventDispatcher { return result; } - private Struct signalRecordValue( - String splitId, BinlogOffset binlogOffset, WatermarkKind watermarkKind) { + private Struct signalRecordValue(String splitId, WatermarkKind watermarkKind) { Struct result = new Struct(signalEventValueSchema); result.put(SPLIT_ID_KEY, splitId); result.put(WATERMARK_KIND, watermarkKind.toString()); - result.put(BINLOG_FILENAME_OFFSET_KEY, binlogOffset.getFilename()); - result.put(BINLOG_POSITION_OFFSET_KEY, binlogOffset.getPosition()); return result; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 0651aaa20..930dd6e0f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -35,6 +35,7 @@ import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics; import io.debezium.pipeline.DataChangeEvent; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.relational.Tables; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ public class BinlogSplitReader implements DebeziumReader> finishedSplitsInfo; // tableId -> the max splitHighWatermark private Map maxSplitHighWatermarkMap; + private Tables.TableFilter capturedTableFilter; public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { this.statefulTaskContext = statefulTaskContext; @@ -87,11 +89,10 @@ public class BinlogSplitReader implements DebeziumReader finishedSplitInfos = currentBinlogSplit.getFinishedSnapshotSplitInfos(); Map> splitsInfoMap = new HashMap<>(); Map tableIdBinlogPositionMap = new HashMap<>(); + // latest-offset mode if (finishedSplitInfos.isEmpty()) { for (TableId tableId : currentBinlogSplit.getTableSchemas().keySet()) { tableIdBinlogPositionMap.put(tableId, currentBinlogSplit.getStartingOffset()); } - } else { + } + // initial mode + else { for (FinishedSnapshotSplitInfo finishedSplitInfo : finishedSplitInfos) { TableId tableId = finishedSplitInfo.getTableId(); List list = diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java index c81cbddcf..e5b0c1ebb 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java @@ -37,9 +37,8 @@ import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY; -import static com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY; import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; /** * Task to read all binlog for table and also supports read bounded (from lowWatermark to @@ -47,7 +46,7 @@ import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.NO_S */ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { - private static final Logger logger = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class); + private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class); private final MySqlBinlogSplit binlogSplit; private final MySqlOffsetContext offsetContext; private final EventDispatcherImpl eventDispatcher; @@ -80,7 +79,8 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { this.offsetContext = offsetContext; this.errorHandler = errorHandler; this.signalEventDispatcher = - new SignalEventDispatcher(offsetContext, topic, eventDispatcher.getQueue()); + new SignalEventDispatcher( + offsetContext.getPartition(), topic, eventDispatcher.getQueue()); } @Override @@ -94,14 +94,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { super.handleEvent(event); // check do we need to stop for read binlog for snapshot split. if (isBoundedRead()) { - final BinlogOffset currentBinlogOffset = - new BinlogOffset( - offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(), - Long.parseLong( - offsetContext - .getOffset() - .get(BINLOG_POSITION_OFFSET_KEY) - .toString())); + final BinlogOffset currentBinlogOffset = getBinlogPosition(offsetContext.getOffset()); // reach the high watermark, the binlog reader should finished if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) { // send binlog end event @@ -111,7 +104,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource { currentBinlogOffset, SignalEventDispatcher.WatermarkKind.BINLOG_END); } catch (InterruptedException e) { - logger.error("Send signal event error.", e); + LOG.error("Send signal event error.", e); errorHandler.setProducerThrowable( new DebeziumException("Error processing binlog signal event", e)); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index 72ba2b8b1..bc09359b6 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -59,7 +59,7 @@ import java.sql.Types; import java.time.Duration; import java.util.Calendar; -import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.getCurrentBinlogPosition; +import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset; /** Task to read snapshot split of table. */ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource { @@ -129,28 +129,27 @@ public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSourc throws Exception { final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx = (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext; + ctx.offset = offsetContext; + final SignalEventDispatcher signalEventDispatcher = + new SignalEventDispatcher( + offsetContext.getPartition(), + topicSelector.topicNameFor(snapshotSplit.getTableId()), + dispatcher.getQueue()); - final BinlogOffset lowWatermark = getCurrentBinlogPosition(jdbcConnection); + final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); - offsetContext.setBinlogStartPoint(lowWatermark.getFilename(), lowWatermark.getPosition()); - ctx.offset = offsetContext; ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context)) .setLowWatermark(lowWatermark); - - final SignalEventDispatcher signalEventDispatcher = - new SignalEventDispatcher( - offsetContext, - topicSelector.topicNameFor(snapshotSplit.getTableId()), - dispatcher.getQueue()); signalEventDispatcher.dispatchWatermarkEvent( snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW); LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); - final BinlogOffset highWatermark = getCurrentBinlogPosition(jdbcConnection); + + final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); LOG.info( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index e3af11ce7..cb0cc1eed 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -58,9 +58,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; -import java.util.HashMap; +import java.util.List; import java.util.Map; +import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY; import static io.debezium.config.CommonConnectorConfig.TOMBSTONES_ON_DELETE; /** @@ -119,9 +120,7 @@ public class StatefulTaskContext { schemaNameAdjuster, tableIdCaseInsensitive); this.offsetContext = - (MySqlOffsetContext) - loadStartingOffsetState( - new MySqlOffsetContext.Loader(connectorConfig), mySqlSplit); + loadStartingOffsetState(new MySqlOffsetContext.Loader(connectorConfig), mySqlSplit); validateAndLoadDatabaseHistory(offsetContext, databaseSchema); this.taskContext = @@ -174,22 +173,49 @@ public class StatefulTaskContext { } /** Loads the connector's persistent offset (if present) via the given loader. */ - private OffsetContext loadStartingOffsetState( + private MySqlOffsetContext loadStartingOffsetState( OffsetContext.Loader loader, MySqlSplit mySqlSplit) { - Map previousOffset = new HashMap<>(); BinlogOffset offset = mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET : mySqlSplit.asBinlogSplit().getStartingOffset(); - previousOffset.put("file", offset.getFilename()); - previousOffset.put("pos", offset.getPosition()); - if (previousOffset != null) { - OffsetContext offsetContext = loader.load(previousOffset); - return offsetContext; + MySqlOffsetContext mySqlOffsetContext = + (MySqlOffsetContext) loader.load(offset.getOffset()); + + if (!isBinlogAvailable(mySqlOffsetContext)) { + throw new IllegalStateException( + "The connector is trying to read binlog starting at " + + mySqlOffsetContext.getSourceInfo() + + ", but this is no longer " + + "available on the server. Reconfigure the connector to use a snapshot when needed."); + } + return mySqlOffsetContext; + } + + private boolean isBinlogAvailable(MySqlOffsetContext offset) { + String binlogFilename = offset.getSourceInfo().getString(BINLOG_FILENAME_OFFSET_KEY); + if (binlogFilename == null) { + return true; // start at current position + } + if (binlogFilename.equals("")) { + return true; // start at beginning + } + + // Accumulate the available binlog filenames ... + List logNames = connection.availableBinlogFiles(); + + // And compare with the one we're supposed to use ... + boolean found = logNames.stream().anyMatch(binlogFilename::equals); + if (!found) { + LOG.info( + "Connector requires binlog file '{}', but MySQL only has {}", + binlogFilename, + String.join(", ", logNames)); } else { - return null; + LOG.info("MySQL has the binlog file '{}' required by the connector", binlogFilename); } + return found; } private static MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java index 9d1b66dfb..35511850b 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java @@ -43,7 +43,7 @@ import static com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.wr */ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 1; + private static final int VERSION = 2; private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); @@ -96,22 +96,45 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

alreadyProcessedTables = readTableIds(in); List remainingSplits = readMySqlSnapshotSplits(splitVersion, in); Map assignedSnapshotSplits = readAssignedSnapshotSplits(splitVersion, in); - Map finishedOffsets = readFinishedOffsets(in); + Map finishedOffsets = readFinishedOffsets(offsetVersion, in); boolean isAssignerFinished = in.readBoolean(); return new SnapshotPendingSplitsState( alreadyProcessedTables, @@ -159,9 +182,9 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

readFinishedOffsets(DataInputDeserializer in) - throws IOException { + private Map readFinishedOffsets( + int offsetVersion, DataInputDeserializer in) throws IOException { Map splitsInfo = new HashMap<>(); final int size = in.readInt(); for (int i = 0; i < size; i++) { String splitId = in.readUTF(); - BinlogOffset binlogOffset = readBinlogPosition(in); + BinlogOffset binlogOffset = readBinlogPosition(offsetVersion, in); splitsInfo.put(splitId, binlogOffset); } return splitsInfo; diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java index ce5b59b3a..c9cc92ca5 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffset.java @@ -18,48 +18,212 @@ package com.ververica.cdc.connectors.mysql.source.offset; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; +import io.debezium.connector.mysql.GtidSet; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.errors.ConnectException; -import io.debezium.jdbc.JdbcConnection; +import javax.annotation.Nullable; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; -/** A structure describes an offset in a binlog of MySQL server. */ +/** + * A structure describes a fine grained offset in a binlog event including binlog position and gtid + * set etc. + * + *

This structure can also be used to deal the binlog event in transaction, a transaction may + * contains multiple change events, and each change event may contain multiple rows. When restart + * from a specific {@link BinlogOffset}, we need to skip the processed change events and the + * processed rows. + */ public class BinlogOffset implements Comparable, Serializable { private static final long serialVersionUID = 1L; + public static final String BINLOG_FILENAME_OFFSET_KEY = "file"; + public static final String BINLOG_POSITION_OFFSET_KEY = "pos"; + public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event"; + public static final String ROWS_TO_SKIP_OFFSET_KEY = "row"; + public static final String GTID_SET_KEY = "gtids"; + public static final String TIMESTAMP_KEY = "ts_sec"; + public static final String SERVER_ID_KEY = "server_id"; + public static final BinlogOffset INITIAL_OFFSET = new BinlogOffset("", 0); public static final BinlogOffset NO_STOPPING_OFFSET = new BinlogOffset("", Long.MIN_VALUE); - private final String filename; - private final long position; + private final Map offset; + + public BinlogOffset(Map offset) { + this.offset = offset; + } public BinlogOffset(String filename, long position) { - Preconditions.checkNotNull(filename); - this.filename = filename; - this.position = position; + this(filename, position, 0L, 0L, 0L, null, null); + } + + public BinlogOffset( + String filename, + long position, + long restartSkipEvents, + long restartSkipRows, + long binlogEpochSecs, + @Nullable String restartGtidSet, + @Nullable Integer serverId) { + Map offsetMap = new HashMap<>(); + offsetMap.put(BINLOG_FILENAME_OFFSET_KEY, filename); + offsetMap.put(BINLOG_POSITION_OFFSET_KEY, String.valueOf(position)); + offsetMap.put(EVENTS_TO_SKIP_OFFSET_KEY, String.valueOf(restartSkipEvents)); + offsetMap.put(ROWS_TO_SKIP_OFFSET_KEY, String.valueOf(restartSkipRows)); + offsetMap.put(TIMESTAMP_KEY, String.valueOf(binlogEpochSecs)); + if (restartGtidSet != null) { + offsetMap.put(GTID_SET_KEY, restartGtidSet); + } + if (serverId != null) { + offsetMap.put(SERVER_ID_KEY, String.valueOf(serverId)); + } + this.offset = offsetMap; + } + + public Map getOffset() { + return offset; } public String getFilename() { - return filename; + return offset.get(BINLOG_FILENAME_OFFSET_KEY); } public long getPosition() { - return position; + return longOffsetValue(offset, BINLOG_POSITION_OFFSET_KEY); } + public long getRestartSkipEvents() { + return longOffsetValue(offset, EVENTS_TO_SKIP_OFFSET_KEY); + } + + public long getRestartSkipRows() { + return longOffsetValue(offset, ROWS_TO_SKIP_OFFSET_KEY); + } + + public String getGtidSet() { + return offset.get(GTID_SET_KEY); + } + + public long getTimestamp() { + return longOffsetValue(offset, TIMESTAMP_KEY); + } + + public Long getServerId() { + return longOffsetValue(offset, SERVER_ID_KEY); + } + + private long longOffsetValue(Map values, String key) { + Object obj = values.get(key); + if (obj == null) { + return 0L; + } + if (obj instanceof Number) { + return ((Number) obj).longValue(); + } + try { + return Long.parseLong(obj.toString()); + } catch (NumberFormatException e) { + throw new ConnectException( + "Source offset '" + + key + + "' parameter value " + + obj + + " could not be converted to a long"); + } + } + + /** + * This method is inspired by {@link io.debezium.relational.history.HistoryRecordComparator}. + */ @Override - public int compareTo(BinlogOffset o) { - if (this.filename.equals(o.filename)) { - return Long.compare(this.position, o.position); - } else { - // The bing log filenames are ordered - return this.getFilename().compareTo(o.getFilename()); + public int compareTo(BinlogOffset that) { + // the NO_STOPPING_OFFSET is the max offset + if (NO_STOPPING_OFFSET.equals(that) && NO_STOPPING_OFFSET.equals(this)) { + return 0; + } + if (NO_STOPPING_OFFSET.equals(this)) { + return 1; + } + if (NO_STOPPING_OFFSET.equals(that)) { + return -1; + } + + String gtidSetStr = this.getGtidSet(); + String targetGtidSetStr = that.getGtidSet(); + if (StringUtils.isNotEmpty(targetGtidSetStr)) { + // The target offset uses GTIDs, so we ideally compare using GTIDs ... + if (StringUtils.isNotEmpty(gtidSetStr)) { + // Both have GTIDs, so base the comparison entirely on the GTID sets. + GtidSet gtidSet = new GtidSet(gtidSetStr); + GtidSet targetGtidSet = new GtidSet(targetGtidSetStr); + if (gtidSet.equals(targetGtidSet)) { + long restartSkipEvents = this.getRestartSkipEvents(); + long targetRestartSkipEvents = that.getRestartSkipEvents(); + return Long.compare(restartSkipEvents, targetRestartSkipEvents); + } + // The GTIDs are not an exact match, so figure out if this is a subset of the target + // offset + // ... + return gtidSet.isContainedWithin(targetGtidSet) ? -1 : 1; + } + // The target offset did use GTIDs while this did not use GTIDs. So, we assume + // that this offset is older since GTIDs are often enabled but rarely disabled. + // And if they are disabled, + // it is likely that this offset would not include GTIDs as we would be trying + // to read the binlog of a + // server that no longer has GTIDs. And if they are enabled, disabled, and re-enabled, + // per + // https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html all properly + // configured slaves that + // use GTIDs should always have the complete set of GTIDs copied from the master, in + // which case + // again we know that this offset not having GTIDs is before the target offset ... + return -1; + } else if (StringUtils.isNotEmpty(gtidSetStr)) { + // This offset has a GTID but the target offset does not, so per the previous paragraph + // we + // assume that previous + // is not at or before ... + return 1; + } + + // Both offsets are missing GTIDs. Look at the servers ... + long serverId = this.getServerId(); + long targetServerId = that.getServerId(); + + if (serverId != targetServerId) { + // These are from different servers, and their binlog coordinates are not related. So + // the only thing we can do + // is compare timestamps, and we have to assume that the server timestamps can be + // compared ... + long timestamp = this.getTimestamp(); + long targetTimestamp = that.getTimestamp(); + return Long.compare(timestamp, targetTimestamp); } + + // First compare the MySQL binlog filenames + if (this.getFilename().compareToIgnoreCase(that.getFilename()) != 0) { + return this.getFilename().compareToIgnoreCase(that.getFilename()); + } + + // The filenames are the same, so compare the positions + if (this.getPosition() != that.getPosition()) { + return Long.compare(this.getPosition(), that.getPosition()); + } + + // The positions are the same, so compare the completed events in the transaction ... + if (this.getRestartSkipEvents() != that.getRestartSkipEvents()) { + return Long.compare(this.getRestartSkipEvents(), that.getRestartSkipEvents()); + } + + // The completed events are the same, so compare the row number ... + return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows()); } public boolean isAtOrBefore(BinlogOffset that) { @@ -72,7 +236,7 @@ public class BinlogOffset implements Comparable, Serializable { @Override public String toString() { - return filename + ":" + position; + return "BinlogOffset{" + "offset=" + offset + '}'; } @Override @@ -80,43 +244,15 @@ public class BinlogOffset implements Comparable, Serializable { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof BinlogOffset)) { return false; } BinlogOffset that = (BinlogOffset) o; - return position == that.position && Objects.equals(filename, that.filename); + return offset.equals(that.offset); } @Override public int hashCode() { - return Objects.hash(filename, position); - } - - public static BinlogOffset getCurrentBinlogPosition(JdbcConnection jdbcConnection) { - AtomicReference currentBinlogPosition = - new AtomicReference<>(BinlogOffset.INITIAL_OFFSET); - try { - jdbcConnection.setAutoCommit(false); - String showMasterStmt = "SHOW MASTER STATUS"; - jdbcConnection.query( - showMasterStmt, - rs -> { - if (rs.next()) { - String binlogFilename = rs.getString(1); - long binlogPosition = rs.getLong(2); - currentBinlogPosition.set( - new BinlogOffset(binlogFilename, binlogPosition)); - } else { - throw new IllegalStateException( - "Cannot read the binlog filename and position via '" - + showMasterStmt - + "'. Make sure your server is correctly configured"); - } - }); - jdbcConnection.commit(); - } catch (Exception e) { - throw new FlinkRuntimeException("Read current binlog position error.", e); - } - return currentBinlogPosition.get(); + return Objects.hashCode(offset); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetSerializer.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetSerializer.java new file mode 100644 index 000000000..1c0191144 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/offset/BinlogOffsetSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.offset; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.Map; + +/** Serializer implementation for a {@link BinlogOffset}. */ +@Internal +public class BinlogOffsetSerializer { + + public static final BinlogOffsetSerializer INSTANCE = new BinlogOffsetSerializer(); + + public byte[] serialize(BinlogOffset binlogOffset) throws IOException { + // use JSON serialization + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.writeValueAsBytes(binlogOffset.getOffset()); + } + + public BinlogOffset deserialize(byte[] bytes) throws IOException { + ObjectMapper objectMapper = new ObjectMapper(); + Map offset = objectMapper.readValue(bytes, Map.class); + return new BinlogOffset(offset); + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java index f676f4493..692013eb9 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializer.java @@ -48,7 +48,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); @@ -98,7 +98,6 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer tableSchemas = readTableSchemas(in); return new MySqlSnapshotSplit( @@ -144,9 +146,10 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer finishedSplitsInfo = readFinishedSplitsInfo(in); + BinlogOffset startingOffset = readBinlogPosition(version, in); + BinlogOffset endingOffset = readBinlogPosition(version, in); + List finishedSplitsInfo = + readFinishedSplitsInfo(version, in); Map tableChangeMap = readTableSchemas(in); in.releaseArrays(); return new MySqlBinlogSplit( @@ -201,8 +204,8 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer readFinishedSplitsInfo(DataInputDeserializer in) - throws IOException { + private static List readFinishedSplitsInfo( + int version, DataInputDeserializer in) throws IOException { List finishedSplitsInfo = new ArrayList<>(); final int size = in.readInt(); for (int i = 0; i < size; i++) { @@ -210,7 +213,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer offset) { + Map offsetStrMap = new HashMap<>(); + for (Map.Entry entry : offset.entrySet()) { + offsetStrMap.put( + entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString()); + } + return new BinlogOffset(offsetStrMap); } /** Returns the specific key contains in the split key range or not. */ diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SerializerUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SerializerUtils.java index fcbf6d2e9..59023fb76 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SerializerUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/SerializerUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetSerializer; import io.debezium.DebeziumException; import io.debezium.util.HexConverter; @@ -40,13 +41,30 @@ public class SerializerUtils { throws IOException { out.writeBoolean(offset != null); if (offset != null) { - out.writeUTF(offset.getFilename()); - out.writeLong(offset.getPosition()); + byte[] binlogOffsetBytes = BinlogOffsetSerializer.INSTANCE.serialize(offset); + out.writeInt(binlogOffsetBytes.length); + out.write(binlogOffsetBytes); } } - public static BinlogOffset readBinlogPosition(DataInputDeserializer in) throws IOException { - return in.readBoolean() ? new BinlogOffset(in.readUTF(), in.readLong()) : null; + public static BinlogOffset readBinlogPosition(int offsetVersion, DataInputDeserializer in) + throws IOException { + switch (offsetVersion) { + case 1: + return in.readBoolean() ? new BinlogOffset(in.readUTF(), in.readLong()) : null; + case 2: + boolean offsetNonNull = in.readBoolean(); + if (offsetNonNull) { + int binlogOffsetBytesLength = in.readInt(); + byte[] binlogOffsetBytes = new byte[binlogOffsetBytesLength]; + in.readFully(binlogOffsetBytes); + return BinlogOffsetSerializer.INSTANCE.deserialize(binlogOffsetBytes); + } else { + return null; + } + default: + throw new IOException("Unknown version: " + offsetVersion); + } } public static String rowToSerializedString(Object[] splitBoundary) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java index c3c37886b..f8f025e03 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import com.alibaba.fastjson.JSONObject; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.junit.Ignore; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java index bdf91b1e1..17b869d59 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import com.fasterxml.jackson.core.JsonParseException; import com.jayway.jsonpath.JsonPath; import com.ververica.cdc.connectors.mysql.MySqlTestUtils.TestingListState; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.connectors.utils.TestSourceContext; import com.ververica.cdc.debezium.DebeziumSourceFunction; import io.debezium.document.Document; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestBase.java index 0cdd1b8d1..e1d4f8231 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestBase.java @@ -20,16 +20,13 @@ package com.ververica.cdc.connectors.mysql; import org.apache.flink.test.util.AbstractTestBase; -import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer; +import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; import java.util.stream.Stream; /** @@ -43,7 +40,7 @@ public abstract class MySqlTestBase extends AbstractTestBase { protected static final MySqlContainer MYSQL_CONTAINER = (MySqlContainer) new MySqlContainer() - .withConfigurationOverride("docker/my.cnf") + .withConfigurationOverride("docker/server/my.cnf") .withSetupSQL("docker/setup.sql") .withDatabaseName("flink-test") .withUsername("flinkuser") @@ -56,11 +53,4 @@ public abstract class MySqlTestBase extends AbstractTestBase { Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); LOG.info("Containers are started."); } - - protected Connection getJdbcConnection() throws SQLException { - return DriverManager.getConnection( - MYSQL_CONTAINER.getJdbcUrl(), - MYSQL_CONTAINER.getUsername(), - MYSQL_CONTAINER.getPassword()); - } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java index 0c56cc819..b97855eb2 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.connectors.utils.TestSourceContext; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java index e3c83b6b0..0583d0eea 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java @@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import com.ververica.cdc.connectors.mysql.source.MySqlParallelSource; -import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.kafka.connect.source.SourceRecord; import org.junit.AfterClass; @@ -98,7 +98,7 @@ public class MySqlValidatorTest { String.format( "Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is %s.", version); - doValidate(version, "docker/my.cnf", message); + doValidate(version, "docker/server/my.cnf", message); } @Test diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 0655cf175..145924453 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -18,22 +18,16 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.conversion.RowRowConverter; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; import com.github.shyiko.mysql.binlog.BinaryLogClient; -import com.ververica.cdc.connectors.mysql.MySqlTestBase; import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher; import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -41,9 +35,8 @@ import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; @@ -68,14 +61,13 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; -import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; -import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; /** Tests for {@link BinlogSplitReader}. */ -public class BinlogSplitReaderTest extends MySqlTestBase { +public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { - private static final int currentParallelism = 4; private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -305,9 +297,6 @@ public class BinlogSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); - final RowType pkType = - (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); - String[] expected = new String[] { "-U[103, user_3, Shanghai, 123567891234]", @@ -324,7 +313,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { }; List actual = readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertThat(actual, containsInAnyOrder(expected)); } private List readBinlogSplitsFromLatestOffset( @@ -359,7 +348,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase { while (recordIterator.hasNext()) { fetchedRecords.add(recordIterator.next()); } - actual = formatResult(fetchedRecords, dataType); + actual.addAll(formatResult(fetchedRecords, dataType)); + fetchedRecords.clear(); if (actual.size() >= expectedSize) { break; } @@ -436,7 +426,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase { while (recordIterator.hasNext()) { fetchedRecords.add(recordIterator.next()); } - actual = formatResult(fetchedRecords, dataType); + actual.addAll(formatResult(fetchedRecords, dataType)); + fetchedRecords.clear(); if (actual.size() >= expectedSize) { break; } @@ -447,6 +438,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { private void makeCustomersBinlogEvents( JdbcConnection connection, String tableId, boolean firstSplitOnly) throws SQLException { // make binlog events for the first split + connection.setAutoCommit(false); connection.execute( "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + tableId + " where id = 102", @@ -558,36 +550,14 @@ public class BinlogSplitReaderTest extends MySqlTestBase { return finishedSplitsInfo; } - private List formatResult(List records, DataType dataType) - throws Exception { - final RowType rowType = (RowType) dataType.getLogicalType(); - final TypeInformation typeInfo = - (TypeInformation) TypeConversions.fromDataTypeToLegacyInfo(dataType); - final DebeziumDeserializationSchema deserializationSchema = - new RowDataDebeziumDeserializeSchema( - rowType, typeInfo, ((rowData, rowKind) -> {}), ZoneId.of("UTC")); - SimpleCollector collector = new SimpleCollector(); - RowRowConverter rowRowConverter = RowRowConverter.create(dataType); - rowRowConverter.open(Thread.currentThread().getContextClassLoader()); - // filter signal event - // filter schema change event - for (SourceRecord r : records) { - if (!isWatermarkEvent(r)) { - if (!isSchemaChangeEvent(r)) { - deserializationSchema.deserialize(r, collector); - } - } - } - return collector.list.stream() - .map(rowRowConverter::toExternal) - .map(Row::toString) - .sorted() - .collect(Collectors.toList()); + private List formatResult(List records, DataType dataType) { + final RecordsFormatter formatter = new RecordsFormatter(dataType); + return formatter.format(records); } private List getMySqlSplits(Configuration configuration) { final MySqlSnapshotSplitAssigner assigner = - new MySqlSnapshotSplitAssigner(configuration, currentParallelism); + new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM); assigner.open(); List mySqlSplits = new ArrayList<>(); while (true) { @@ -625,19 +595,4 @@ public class BinlogSplitReaderTest extends MySqlTestBase { return Configuration.fromMap(properties); } - - private static class SimpleCollector implements Collector { - - private List list = new ArrayList<>(); - - @Override - public void collect(RowData record) { - list.add(record); - } - - @Override - public void close() { - // do nothing - } - } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index a747119c9..108594ae8 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -18,27 +18,18 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.conversion.RowRowConverter; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; import com.github.shyiko.mysql.binlog.BinaryLogClient; -import com.ververica.cdc.connectors.mysql.MySqlTestBase; import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; -import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import io.debezium.connector.mysql.MySqlConnection; import org.apache.kafka.connect.source.SourceRecord; import org.junit.BeforeClass; @@ -58,9 +49,8 @@ import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; /** Tests for {@link SnapshotSplitReader}. */ -public class SnapshotSplitReaderTest extends MySqlTestBase { +public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { - private static final int currentParallelism = 4; private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -234,36 +224,13 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { } private List formatResult(List records, DataType dataType) { - final RowType rowType = (RowType) dataType.getLogicalType(); - final TypeInformation typeInfo = - (TypeInformation) TypeConversions.fromDataTypeToLegacyInfo(dataType); - final DebeziumDeserializationSchema deserializationSchema = - new RowDataDebeziumDeserializeSchema( - rowType, typeInfo, ((rowData, rowKind) -> {}), ZoneId.of("UTC")); - SimpleCollector collector = new SimpleCollector(); - RowRowConverter rowRowConverter = RowRowConverter.create(dataType); - rowRowConverter.open(Thread.currentThread().getContextClassLoader()); - records.stream() - // filter signal event - .filter(r -> !RecordUtils.isWatermarkEvent(r)) - .forEach( - r -> { - try { - deserializationSchema.deserialize(r, collector); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - return collector.list.stream() - .map(rowRowConverter::toExternal) - .map(Row::toString) - .sorted() - .collect(Collectors.toList()); + final RecordsFormatter formatter = new RecordsFormatter(dataType); + return formatter.format(records); } private List getMySqlSplits(Configuration configuration) { final MySqlSnapshotSplitAssigner assigner = - new MySqlSnapshotSplitAssigner(configuration, currentParallelism); + new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM); assigner.open(); List mySqlSplitList = new ArrayList<>(); while (true) { @@ -302,19 +269,4 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { properties.put("scan.snapshot.fetch.size", "2"); return Configuration.fromMap(properties); } - - static class SimpleCollector implements Collector { - - private List list = new ArrayList<>(); - - @Override - public void collect(RowData record) { - list.add(record); - } - - @Override - public void close() { - // do nothing - } - } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java index e2ba8fc6c..c935898b1 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java @@ -18,11 +18,42 @@ package com.ververica.cdc.connectors.mysql.source; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.jdbc.JdbcConnection; import org.junit.Test; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + /** IT tests for {@link MySqlParallelSource}. */ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase { + private final UniqueDatabase customDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + @Test public void testReadSingleTableWithSingleParallelism() throws Exception { testMySqlParallelSource( @@ -89,4 +120,264 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase { testMySqlParallelSource( 1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"customers"}); } + + private void testMySqlParallelSource( + FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) + throws Exception { + testMySqlParallelSource( + DEFAULT_PARALLELISM, failoverType, failoverPhase, captureCustomerTables); + } + + private void testMySqlParallelSource( + int parallelism, + FailoverType failoverType, + FailoverPhase failoverPhase, + String[] captureCustomerTables) + throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + String sourceDDL = + String.format( + "CREATE TABLE customers (" + + " id BIGINT NOT NULL," + + " name STRING," + + " address STRING," + + " phone_number STRING," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-id' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + customDatabase.getUsername(), + customDatabase.getPassword(), + customDatabase.getDatabaseName(), + getTableName(captureCustomerTables), + getServerId()); + // first step: check the snapshot data + String[] snapshotForSingleTable = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from customers"); + CloseableIterator iterator = tableResult.collect(); + JobID jobId = tableResult.getJobClient().get().getJobID(); + List expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); + } + + // trigger checkpoint after some snapshot splits read finished + if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) { + triggerFailover( + failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100)); + } + String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]); + + assertThat( + fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); + + // second step: check the binlog data + for (String tableId : captureCustomerTables) { + makeFirstPartBinlogEvents( + getConnection(), customDatabase.getDatabaseName() + '.' + tableId); + } + if (failoverPhase == FailoverPhase.BINLOG) { + triggerFailover( + failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200)); + } + for (String tableId : captureCustomerTables) { + makeSecondPartBinlogEvents( + getConnection(), customDatabase.getDatabaseName() + '.' + tableId); + } + + String[] binlogForSingleTable = + new String[] { + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]", + "-D[102, user_2, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]", + "-U[1010, user_11, Shanghai, 123567891234]", + "+U[1010, user_11, Hangzhou, 123567891234]", + "+I[2001, user_22, Shanghai, 123567891234]", + "+I[2002, user_23, Shanghai, 123567891234]", + "+I[2003, user_24, Shanghai, 123567891234]", + }; + List expectedBinlogData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); + } + String[] expectedBinlog = expectedBinlogData.toArray(new String[0]); + assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); + tableResult.getJobClient().get().cancel().get(); + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + private String getTableName(String[] captureCustomerTables) { + if (captureCustomerTables.length == 1) { + return "customers"; + } else { + // pattern that matches test table: customers and customers_1 + return "customers.*"; + } + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + DEFAULT_PARALLELISM); + } + + private void sleepMs(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } + + private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId) + throws SQLException { + try { + connection.setAutoCommit(false); + + // make binlog events for the first split + connection.execute( + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "DELETE FROM " + tableId + " where id = 102", + "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); + connection.commit(); + } finally { + connection.close(); + } + } + + private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId) + throws SQLException { + try { + connection.setAutoCommit(false); + + // make binlog events for split-1 + connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"); + connection.commit(); + + // make binlog events for the last split + connection.execute( + "INSERT INTO " + + tableId + + " VALUES(2001, 'user_22','Shanghai','123567891234')," + + " (2002, 'user_23','Shanghai','123567891234')," + + "(2003, 'user_24','Shanghai','123567891234')"); + connection.commit(); + } finally { + connection.close(); + } + } + + private MySqlConnection getConnection() { + Map properties = new HashMap<>(); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", customDatabase.getUsername()); + properties.put("database.password", customDatabase.getPassword()); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + Configuration configuration = Configuration.fromMap(properties); + return StatefulTaskContext.getConnection(configuration); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + /** The type of failover. */ + private enum FailoverType { + TM, + JM, + NONE + } + + /** The phase of failover. */ + private enum FailoverPhase { + SNAPSHOT, + BINLOG, + NEVER + } + + private static void triggerFailover( + FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) + throws Exception { + switch (type) { + case TM: + restartTaskManager(miniCluster, afterFailAction); + break; + case JM: + triggerJobManagerFailover(jobId, miniCluster, afterFailAction); + break; + case NONE: + break; + default: + throw new IllegalStateException("Unexpected value: " + type); + } + } + + private static void triggerJobManagerFailover( + JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { + final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); + haLeadershipControl.revokeJobMasterLeadership(jobId).get(); + afterFailAction.run(); + haLeadershipControl.grantJobMasterLeadership(jobId).get(); + } + + private static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) + throws Exception { + miniCluster.terminateTaskManager(0).get(); + afterFailAction.run(); + miniCluster.startTaskManager(); + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java index 6f295d373..9a7073586 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java @@ -18,26 +18,12 @@ package com.ververica.cdc.connectors.mysql.source; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; -import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.TestLogger; -import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; -import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; -import io.debezium.connector.mysql.MySqlConnection; -import io.debezium.jdbc.JdbcConnection; +import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; import org.junit.BeforeClass; import org.junit.Rule; import org.slf4j.Logger; @@ -45,29 +31,18 @@ import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import java.sql.SQLException; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; import java.util.stream.Stream; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; - /** Basic class for testing {@link MySqlParallelSource}. */ public abstract class MySqlParallelSourceTestBase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(MySqlParallelSourceTestBase.class); - private static final int PARALLELISM = 4; - private static final MySqlContainer MYSQL_CONTAINER = + + protected static final int DEFAULT_PARALLELISM = 4; + protected static final MySqlContainer MYSQL_CONTAINER = (MySqlContainer) new MySqlContainer() - .withConfigurationOverride("docker/my.cnf") + .withConfigurationOverride("docker/server-gtids/my.cnf") .withSetupSQL("docker/setup.sql") .withDatabaseName("flink-test") .withUsername("flinkuser") @@ -79,277 +54,15 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger { new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) - .setNumberSlotsPerTaskManager(PARALLELISM) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) .setRpcServiceSharing(RpcServiceSharing.DEDICATED) .withHaLeadershipControl() .build()); - private final UniqueDatabase customDatabase = - new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); - @BeforeClass public static void startContainers() { LOG.info("Starting containers..."); Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); LOG.info("Containers are started."); } - - protected void testMySqlParallelSource( - FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables) - throws Exception { - testMySqlParallelSource(PARALLELISM, failoverType, failoverPhase, captureCustomerTables); - } - - protected void testMySqlParallelSource( - int parallelism, - FailoverType failoverType, - FailoverPhase failoverPhase, - String[] captureCustomerTables) - throws Exception { - customDatabase.createAndInitialize(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - - env.setParallelism(parallelism); - env.enableCheckpointing(200L); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); - String sourceDDL = - String.format( - "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," - + " address STRING," - + " phone_number STRING," - + " primary key (id) not enforced" - + ") WITH (" - + " 'connector' = 'mysql-cdc'," - + " 'scan.incremental.snapshot.enabled' = 'true'," - + " 'hostname' = '%s'," - + " 'port' = '%s'," - + " 'username' = '%s'," - + " 'password' = '%s'," - + " 'database-name' = '%s'," - + " 'table-name' = '%s'," - + " 'scan.incremental.snapshot.chunk.size' = '100'," - + " 'server-id' = '%s'" - + ")", - MYSQL_CONTAINER.getHost(), - MYSQL_CONTAINER.getDatabasePort(), - customDatabase.getUsername(), - customDatabase.getPassword(), - customDatabase.getDatabaseName(), - getTableName(captureCustomerTables), - getServerId()); - // first step: check the snapshot data - String[] snapshotForSingleTable = - new String[] { - "+I[101, user_1, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "+I[103, user_3, Shanghai, 123567891234]", - "+I[109, user_4, Shanghai, 123567891234]", - "+I[110, user_5, Shanghai, 123567891234]", - "+I[111, user_6, Shanghai, 123567891234]", - "+I[118, user_7, Shanghai, 123567891234]", - "+I[121, user_8, Shanghai, 123567891234]", - "+I[123, user_9, Shanghai, 123567891234]", - "+I[1009, user_10, Shanghai, 123567891234]", - "+I[1010, user_11, Shanghai, 123567891234]", - "+I[1011, user_12, Shanghai, 123567891234]", - "+I[1012, user_13, Shanghai, 123567891234]", - "+I[1013, user_14, Shanghai, 123567891234]", - "+I[1014, user_15, Shanghai, 123567891234]", - "+I[1015, user_16, Shanghai, 123567891234]", - "+I[1016, user_17, Shanghai, 123567891234]", - "+I[1017, user_18, Shanghai, 123567891234]", - "+I[1018, user_19, Shanghai, 123567891234]", - "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" - }; - tEnv.executeSql(sourceDDL); - TableResult tableResult = tEnv.executeSql("select * from customers"); - CloseableIterator iterator = tableResult.collect(); - JobID jobId = tableResult.getJobClient().get().getJobID(); - List expectedSnapshotData = new ArrayList<>(); - for (int i = 0; i < captureCustomerTables.length; i++) { - expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); - } - - // trigger checkpoint after some snapshot splits read finished - if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) { - triggerFailover( - failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100)); - } - String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]); - - assertThat( - fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); - - // second step: check the binlog data - for (String tableId : captureCustomerTables) { - makeFirstPartBinlogEvents( - getConnection(), customDatabase.getDatabaseName() + '.' + tableId); - } - if (failoverPhase == FailoverPhase.BINLOG) { - triggerFailover( - failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200)); - } - for (String tableId : captureCustomerTables) { - makeSecondPartBinlogEvents( - getConnection(), customDatabase.getDatabaseName() + '.' + tableId); - } - - String[] binlogForSingleTable = - new String[] { - "-U[103, user_3, Shanghai, 123567891234]", - "+U[103, user_3, Hangzhou, 123567891234]", - "-D[102, user_2, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "-U[103, user_3, Hangzhou, 123567891234]", - "+U[103, user_3, Shanghai, 123567891234]", - "-U[1010, user_11, Shanghai, 123567891234]", - "+U[1010, user_11, Hangzhou, 123567891234]", - "+I[2001, user_22, Shanghai, 123567891234]", - "+I[2002, user_23, Shanghai, 123567891234]", - "+I[2003, user_24, Shanghai, 123567891234]", - }; - List expectedBinlogData = new ArrayList<>(); - for (int i = 0; i < captureCustomerTables.length; i++) { - expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); - } - String[] expectedBinlog = expectedBinlogData.toArray(new String[0]); - assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); - tableResult.getJobClient().get().cancel().get(); - } - - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); - while (size > 0 && iter.hasNext()) { - Row row = iter.next(); - rows.add(row.toString()); - size--; - } - return rows; - } - - private String getTableName(String[] captureCustomerTables) { - if (captureCustomerTables.length == 1) { - return "customers"; - } else { - // pattern that matches test table: customers and customers_1 - return "customers.*"; - } - } - - private String getServerId() { - final Random random = new Random(); - int serverId = random.nextInt(100) + 5400; - return serverId + "-" + (serverId + PARALLELISM); - } - - private void sleepMs(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException ignored) { - } - } - - private void makeFirstPartBinlogEvents(JdbcConnection connection, String tableId) - throws SQLException { - try { - connection.setAutoCommit(false); - - // make binlog events for the first split - connection.execute( - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); - connection.commit(); - } finally { - connection.close(); - } - } - - private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableId) - throws SQLException { - try { - connection.setAutoCommit(false); - - // make binlog events for split-1 - connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"); - connection.commit(); - - // make binlog events for the last split - connection.execute( - "INSERT INTO " - + tableId - + " VALUES(2001, 'user_22','Shanghai','123567891234')," - + " (2002, 'user_23','Shanghai','123567891234')," - + "(2003, 'user_24','Shanghai','123567891234')"); - connection.commit(); - } finally { - connection.close(); - } - } - - private MySqlConnection getConnection() { - Map properties = new HashMap<>(); - properties.put("database.hostname", MYSQL_CONTAINER.getHost()); - properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); - properties.put("database.user", customDatabase.getUsername()); - properties.put("database.password", customDatabase.getPassword()); - properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); - Configuration configuration = Configuration.fromMap(properties); - return StatefulTaskContext.getConnection(configuration); - } - - // ------------------------------------------------------------------------ - // test utilities - // ------------------------------------------------------------------------ - - /** The type of failover. */ - protected enum FailoverType { - TM, - JM, - NONE - } - - /** The phase of failover. */ - protected enum FailoverPhase { - SNAPSHOT, - BINLOG, - NEVER - } - - private static void triggerFailover( - FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) - throws Exception { - switch (type) { - case TM: - restartTaskManager(miniCluster, afterFailAction); - break; - case JM: - triggerJobManagerFailover(jobId, miniCluster, afterFailAction); - break; - case NONE: - break; - default: - throw new IllegalStateException("Unexpected value: " + type); - } - } - - private static void triggerJobManagerFailover( - JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception { - final HaLeadershipControl haLeadershipControl = miniCluster.getHaLeadershipControl().get(); - haLeadershipControl.revokeJobMasterLeadership(jobId).get(); - afterFailAction.run(); - haLeadershipControl.grantJobMasterLeadership(jobId).get(); - } - - private static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) - throws Exception { - miniCluster.terminateTaskManager(0).get(); - afterFailAction.run(); - miniCluster.startTaskManager(); - } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java index 381cb4dc3..22fd785f6 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -24,8 +24,8 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; -import com.ververica.cdc.connectors.mysql.MySqlTestBase; import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; +import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase; import com.ververica.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState; import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -33,7 +33,7 @@ import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; import org.junit.BeforeClass; @@ -47,15 +47,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; /** Tests for {@link MySqlHybridSplitAssigner}. */ -public class MySqlHybridSplitAssignerTest extends MySqlTestBase { +public class MySqlHybridSplitAssignerTest extends MySqlParallelSourceTestBase { - private static final int currentParallelism = 4; private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -115,7 +113,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlTestBase { HybridPendingSplitsState checkpoint = new HybridPendingSplitsState(snapshotPendingSplitsState, false); final MySqlHybridSplitAssigner assigner = - new MySqlHybridSplitAssigner(configuration, currentParallelism, checkpoint); + new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint); // step 2. Get the MySqlBinlogSplit after all snapshot splits finished Optional binlogSplit = assigner.getNext(); @@ -159,7 +157,6 @@ public class MySqlHybridSplitAssignerTest extends MySqlTestBase { properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); properties.put("snapshot.mode", "initial"); properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); - properties.put("database.history.instance.name", UUID.randomUUID().toString()); return Configuration.fromMap(properties); } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 6ace49d20..b99e1af11 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -21,10 +21,10 @@ package com.ververica.cdc.connectors.mysql.source.assigners; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.ExceptionUtils; -import com.ververica.cdc.connectors.mysql.MySqlTestBase; import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; +import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import org.junit.BeforeClass; import org.junit.Test; @@ -36,7 +36,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; import java.util.stream.Collectors; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; @@ -46,9 +45,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** Tests for {@link MySqlSnapshotSplitAssigner}. */ -public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { +public class MySqlSnapshotSplitAssignerTest extends MySqlParallelSourceTestBase { - private static final int currentParallelism = 4; private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -141,7 +139,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { configuration.setString("table.whitelist", String.join(",", captureTableIds)); final MySqlSnapshotSplitAssigner assigner = - new MySqlSnapshotSplitAssigner(configuration, currentParallelism); + new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM); assigner.open(); List sqlSplits = new ArrayList<>(); @@ -290,7 +288,6 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); properties.put("snapshot.mode", "initial"); properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); - properties.put("database.history.instance.name", UUID.randomUUID().toString()); return Configuration.fromMap(properties); } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java index cb825dcad..758db771d 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java @@ -76,7 +76,7 @@ public class PendingSplitsStateSerializerTest { final PendingSplitsStateSerializer serializer = new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE); byte[] serialized = serializer.serialize(state); - return serializer.deserialize(1, serialized); + return serializer.deserialize(2, serialized); } private static SnapshotPendingSplitsState getTestSnapshotPendingSplitsState() { @@ -128,6 +128,7 @@ public class PendingSplitsStateSerializerTest { } private static MySqlSnapshotSplit getTestSnapshotSplit(TableId tableId, int splitNo) { + long restartSkipEvent = splitNo; return new MySqlSnapshotSplit( tableId, tableId.toString() + "-" + splitNo, @@ -135,7 +136,8 @@ public class PendingSplitsStateSerializerTest { Collections.singletonList(new RowType.RowField("id", new BigIntType()))), new Object[] {100L + splitNo * 1000}, new Object[] {999L + splitNo * 1000}, - new BinlogOffset("mysql-bin.000001", 78L + splitNo * 200), + new BinlogOffset( + "mysql-bin.000001", 78L + splitNo * 200, restartSkipEvent, 0L, 0L, null, 0), new HashMap<>()); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java new file mode 100644 index 000000000..94ca6b4e0 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.reader; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Collector; + +import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory; +import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase; +import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner; +import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.TableId; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link MySqlSourceReader}. */ +public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase { + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + + @Test + public void testBinlogReadFailoverCrossTransaction() throws Exception { + customerDatabase.createAndInitialize(); + final Configuration configuration = getConfig(new String[] {"customers"}); + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + MySqlSplit binlogSplit = createBinlogSplit(configuration); + + MySqlSourceReader reader = createReader(configuration); + reader.start(); + reader.addSplits(Arrays.asList(binlogSplit)); + + // step-1: make 6 change events in one MySQL transaction + TableId tableId = binlogSplit.getTableSchemas().keySet().iterator().next(); + makeBinlogEventsInOneTransaction(configuration, tableId.toString()); + + // step-2: fetch the first 2 records belong to the MySQL transaction + String[] expectedRecords = + new String[] { + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]" + }; + // the 2 records are produced by 1 operations + List actualRecords = consumeRecords(reader, dataType, 1); + assertEquals( + Arrays.stream(expectedRecords).sorted().collect(Collectors.toList()), + actualRecords); + List splitsState = reader.snapshotState(1L); + // check the binlog split state + assertEquals(1, splitsState.size()); + reader.close(); + + // step-3: mock failover from a restored state + MySqlSourceReader restartReader = createReader(configuration); + restartReader.start(); + restartReader.addSplits(splitsState); + + // step-4: fetch the rest 4 records belong to the MySQL transaction + String[] expectedRestRecords = + new String[] { + "-D[102, user_2, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]" + }; + // the 4 records are produced by 3 operations + List restRecords = consumeRecords(restartReader, dataType, 3); + assertEquals( + Arrays.stream(expectedRestRecords).sorted().collect(Collectors.toList()), + restRecords); + restartReader.close(); + } + + private MySqlSourceReader createReader(Configuration configuration) { + final FutureCompletingBlockingQueue> elementsQueue = + new FutureCompletingBlockingQueue<>(); + final SourceReaderContext readerContext = new TestingReaderContext(); + final MySqlRecordEmitter recordEmitter = + new MySqlRecordEmitter<>( + new ForwardDeserializeSchema(), + new MySqlSourceReaderMetrics(readerContext.metricGroup())); + return new MySqlSourceReader<>( + elementsQueue, + () -> createSplitReader(configuration), + recordEmitter, + configuration, + readerContext); + } + + private MySqlSplitReader createSplitReader(Configuration configuration) { + return new MySqlSplitReader(configuration, 0); + } + + private void makeBinlogEventsInOneTransaction(Configuration configuration, String tableId) + throws SQLException { + JdbcConnection connection = StatefulTaskContext.getConnection(configuration); + // make 6 binlog events by 4 operations + connection.setAutoCommit(false); + connection.execute( + "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", + "DELETE FROM " + tableId + " where id = 102", + "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", + "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); + connection.commit(); + connection.close(); + } + + private MySqlSplit createBinlogSplit(Configuration configuration) { + MySqlBinlogSplitAssigner binlogSplitAssigner = new MySqlBinlogSplitAssigner(configuration); + binlogSplitAssigner.open(); + return binlogSplitAssigner.getNext().get(); + } + + private Configuration getConfig(String[] captureTables) { + Map properties = new HashMap<>(); + properties.put("database.server.name", "embedded-test"); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", customerDatabase.getUsername()); + properties.put("database.password", customerDatabase.getPassword()); + properties.put("database.whitelist", customerDatabase.getDatabaseName()); + properties.put("database.history.skip.unparseable.ddl", "true"); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + properties.put("snapshot.mode", "initial"); + properties.put("database.history", EmbeddedFlinkDatabaseHistory.class.getCanonicalName()); + properties.put("database.history.instance.name", UUID.randomUUID().toString()); + List captureTableIds = + Arrays.stream(captureTables) + .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) + .collect(Collectors.toList()); + properties.put("table.whitelist", String.join(",", captureTableIds)); + properties.put("scan.incremental.snapshot.chunk.size", "10"); + properties.put("scan.snapshot.fetch.size", "2"); + properties.put("scan.startup.mode", "latest-offset"); + + return Configuration.fromMap(properties); + } + + private List consumeRecords( + MySqlSourceReader sourceReader, DataType recordType, int changeEventNum) + throws Exception { + // Poll all the n records of the single split. + final SimpleReaderOutput output = new SimpleReaderOutput(); + while (output.getResults().size() < changeEventNum) { + sourceReader.pollNext(output); + } + final RecordsFormatter formatter = new RecordsFormatter(recordType); + return formatter.format(output.getResults()); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + private static class SimpleReaderOutput implements ReaderOutput { + + private final List results = new ArrayList<>(); + + @Override + public void collect(SourceRecord record) { + results.add(record); + } + + public List getResults() { + return results; + } + + @Override + public void collect(SourceRecord record, long timestamp) { + collect(record); + } + + @Override + public void emitWatermark(Watermark watermark) {} + + @Override + public void markIdle() {} + + @Override + public SourceOutput createOutputForSplit(java.lang.String splitId) { + return this; + } + + @Override + public void releaseOutputForSplit(java.lang.String splitId) {} + } + + private static class ForwardDeserializeSchema + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 1L; + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + out.collect(record); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(SourceRecord.class); + } + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java index 8a24eee4b..b94dc70e8 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/split/MySqlSplitSerializerTest.java @@ -127,7 +127,7 @@ public class MySqlSplitSerializerTest { private MySqlSplit serializeAndDeserializeSplit(MySqlSplit split) throws Exception { final MySqlSplitSerializer sqlSplitSerializer = new MySqlSplitSerializer(); byte[] serialized = sqlSplitSerializer.serialize(split); - return sqlSplitSerializer.deserializeV1(serialized); + return sqlSplitSerializer.deserialize(sqlSplitSerializer.getVersion(), serialized); } public static TableChange getTestTableSchema() throws Exception { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 99cdf8394..8923ba1b2 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -27,8 +27,8 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; -import com.ververica.cdc.connectors.mysql.MySqlTestBase; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -49,7 +49,7 @@ import static org.junit.Assert.assertThat; /** Integration tests for MySQL binlog SQL source. */ @RunWith(Parameterized.class) -public class MySqlConnectorITCase extends MySqlTestBase { +public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; @@ -95,7 +95,7 @@ public class MySqlConnectorITCase extends MySqlTestBase { public void before() { TestValuesTableFactory.clearAllData(); if (incrementalSnapshot) { - env.setParallelism(4); + env.setParallelism(DEFAULT_PARALLELISM); env.enableCheckpointing(200); } else { env.setParallelism(1); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java index 6ed60703a..d042bef56 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import com.ververica.cdc.connectors.mysql.MySqlValidatorTest; -import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer; -import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; +import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/MySqlContainer.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlContainer.java similarity index 98% rename from flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/MySqlContainer.java rename to flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlContainer.java index 4d31dfaa4..549533662 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/MySqlContainer.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlContainer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.ververica.cdc.connectors.mysql.source.utils; +package com.ververica.cdc.connectors.mysql.testutils; import org.testcontainers.containers.ContainerLaunchException; import org.testcontainers.containers.JdbcDatabaseContainer; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java new file mode 100644 index 000000000..58094ece5 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.testutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; + +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import org.apache.kafka.connect.source.SourceRecord; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent; + +/** Formatter that formats the {@link org.apache.kafka.connect.source.SourceRecord} to String. */ +public class RecordsFormatter { + + private final DataType dataType; + private final ZoneId zoneId; + + private TypeInformation typeInfo; + private DebeziumDeserializationSchema deserializationSchema; + private SimpleCollector collector; + private RowRowConverter rowRowConverter; + + public RecordsFormatter(DataType dataType) { + this(dataType, ZoneId.of("UTC")); + } + + public RecordsFormatter(DataType dataType, ZoneId zoneId) { + this.dataType = dataType; + this.zoneId = zoneId; + this.typeInfo = + (TypeInformation) TypeConversions.fromDataTypeToLegacyInfo(dataType); + this.deserializationSchema = + new RowDataDebeziumDeserializeSchema( + (RowType) dataType.getLogicalType(), + typeInfo, + ((rowData, rowKind) -> {}), + ZoneId.of("UTC")); + this.collector = new SimpleCollector(); + this.rowRowConverter = RowRowConverter.create(dataType); + rowRowConverter.open(Thread.currentThread().getContextClassLoader()); + } + + public List format(List records) { + records.stream() + // filter signal event + .filter(r -> !isWatermarkEvent(r)) + // filter schema change event + .filter(r -> !isSchemaChangeEvent(r)) + .forEach( + r -> { + try { + deserializationSchema.deserialize(r, collector); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + return collector.list.stream() + .map(rowRowConverter::toExternal) + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + } + + private static class SimpleCollector implements Collector { + + private List list = new ArrayList<>(); + + @Override + public void collect(RowData record) { + list.add(record); + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/UniqueDatabase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/UniqueDatabase.java similarity index 99% rename from flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/UniqueDatabase.java rename to flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/UniqueDatabase.java index 836d56d83..1666d1847 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/utils/UniqueDatabase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/UniqueDatabase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.ververica.cdc.connectors.mysql.source.utils; +package com.ververica.cdc.connectors.mysql.testutils; import java.net.URL; import java.nio.file.Files; diff --git a/flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/my.cnf b/flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/my.cnf new file mode 100644 index 000000000..87a492c49 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/my.cnf @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +#secure-file-priv=/var/lib/mysql-files +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems +server-id = 223344 +log_bin = mysql-bin +expire_logs_days = 1 +binlog_format = row + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file diff --git a/flink-connector-mysql-cdc/src/test/resources/docker/my.cnf b/flink-connector-mysql-cdc/src/test/resources/docker/server/my.cnf similarity index 99% rename from flink-connector-mysql-cdc/src/test/resources/docker/my.cnf rename to flink-connector-mysql-cdc/src/test/resources/docker/server/my.cnf index bc0c99179..f3b8d1fe8 100644 --- a/flink-connector-mysql-cdc/src/test/resources/docker/my.cnf +++ b/flink-connector-mysql-cdc/src/test/resources/docker/server/my.cnf @@ -57,5 +57,3 @@ server-id = 223344 log_bin = mysql-bin expire_logs_days = 1 binlog_format = row - -