[mysql] Original implementation of mysql heartbeat feature

pull/921/head
camelusluo 3 years ago committed by Leonard Xu
parent dfca2417a8
commit 3cb6a815dc

@ -375,6 +375,14 @@ After the server you monitored fails in MySQL cluster, you only need to change t
It's recommended to configure a DNS(Domain Name Service) or VIP(Virtual IP Address) for your MySQL cluster, using the DNS or VIP address for ```mysql-cdc``` connector, the DNS or VIP would automatically route the network request to the active MySQL server. In this way, you don't need to modify the address and restart your pipeline anymore. It's recommended to configure a DNS(Domain Name Service) or VIP(Virtual IP Address) for your MySQL cluster, using the DNS or VIP address for ```mysql-cdc``` connector, the DNS or VIP would automatically route the network request to the active MySQL server. In this way, you don't need to modify the address and restart your pipeline anymore.
#### MySQL Heartbeat Event Support
If the table updates infrequently, the binlog file or GTID set may have been cleaned in its last committed binlog position.
The CDC job restart fails in this case. So the heartbeat event will update binlog position. It's recommended to configure `debezium.heartbeat.interval.ms`.
```
debezium.heartbeat.interval.ms=3000
```
#### How Incremental Snapshot Reading works #### How Incremental Snapshot Reading works
When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism. When the MySQL CDC source is started, it reads snapshot of table parallelly and then reads binlog of table with single parallelism.

@ -159,7 +159,8 @@ public class MySqlSource<T>
new MySqlRecordEmitter<>( new MySqlRecordEmitter<>(
deserializationSchema, deserializationSchema,
sourceReaderMetrics, sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()), sourceConfig.isIncludeSchemaChanges(),
sourceConfig.isHeartbeatEvent()),
readerContext.getConfiguration(), readerContext.getConfiguration(),
mySqlSourceReaderContext, mySqlSourceReaderContext,
sourceConfig); sourceConfig);

@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import java.io.Serializable; import java.io.Serializable;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
@ -201,4 +202,8 @@ public class MySqlSourceConfig implements Serializable {
public RelationalTableFilters getTableFilters() { public RelationalTableFilters getTableFilters() {
return dbzMySqlConfig.getTableFilters(); return dbzMySqlConfig.getTableFilters();
} }
public boolean isHeartbeatEvent() {
return Optional.ofNullable(dbzConfiguration.getString("heartbeat.interval.ms")).isPresent();
}
} }

@ -40,6 +40,7 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getHis
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getMessageTimestamp; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getMessageTimestamp;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getWatermark; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getWatermark;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHeartbeatEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
@ -61,14 +62,17 @@ public final class MySqlRecordEmitter<T>
private final MySqlSourceReaderMetrics sourceReaderMetrics; private final MySqlSourceReaderMetrics sourceReaderMetrics;
private final boolean includeSchemaChanges; private final boolean includeSchemaChanges;
private final OutputCollector<T> outputCollector; private final OutputCollector<T> outputCollector;
private final boolean heartbeatEvent;
public MySqlRecordEmitter( public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema, DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics, MySqlSourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges) { boolean includeSchemaChanges,
boolean heartbeatEvent) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema; this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics; this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges; this.includeSchemaChanges = includeSchemaChanges;
this.heartbeatEvent = heartbeatEvent;
this.outputCollector = new OutputCollector<>(); this.outputCollector = new OutputCollector<>();
} }
@ -100,6 +104,11 @@ public final class MySqlRecordEmitter<T>
} }
reportMetrics(element); reportMetrics(element);
emitElement(element, output); emitElement(element, output);
} else if (heartbeatEvent && isHeartbeatEvent(element)) {
if (splitState.isBinlogSplitState()) {
BinlogOffset position = getBinlogPosition(element);
splitState.asBinlogSplitState().setStartingOffset(position);
}
} else { } else {
// unknown element // unknown element
LOG.info("Meet unknown element {}, just skip.", element); LOG.info("Meet unknown element {}, just skip.", element);

@ -62,6 +62,8 @@ public class RecordUtils {
public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = public static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
"io.debezium.connector.mysql.SchemaChangeKey"; "io.debezium.connector.mysql.SchemaChangeKey";
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME =
"io.debezium.connector.common.Heartbeat";
private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader(); private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
/** Converts a {@link ResultSet} row to an array of Objects. */ /** Converts a {@link ResultSet} row to an array of Objects. */
@ -298,6 +300,15 @@ public class RecordUtils {
return false; return false;
} }
public static boolean isHeartbeatEvent(SourceRecord record) {
Schema keySchema = record.keySchema();
if (keySchema != null
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())) {
return true;
}
return false;
}
/** /**
* Return the finished snapshot split information. * Return the finished snapshot split information.
* *

@ -258,7 +258,8 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
new MySqlRecordEmitter<>( new MySqlRecordEmitter<>(
new ForwardDeserializeSchema(), new ForwardDeserializeSchema(),
new MySqlSourceReaderMetrics(metricGroup), new MySqlSourceReaderMetrics(metricGroup),
configuration.isIncludeSchemaChanges()); configuration.isIncludeSchemaChanges(),
configuration.isHeartbeatEvent());
final MySqlSourceReaderContext mySqlSourceReaderContext = final MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext); new MySqlSourceReaderContext(readerContext);
return new MySqlSourceReader<>( return new MySqlSourceReader<>(

Loading…
Cancel
Save