[mysql] Support read from specific MySQL Binlog position

release-1.2
shizhengchao 4 years ago committed by Jark Wu
parent 799b5d36af
commit befb4adbd9
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -64,6 +64,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
private ConcurrentLinkedQueue<HistoryRecord> records;
private String instanceName;
private boolean databaseexists;
/**
* Registers the given HistoryRecords into global variable under the given instance name,
@ -104,6 +105,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
throw new IllegalStateException(
String.format("Couldn't find engine instance %s in the global records.", instanceName));
}
this.databaseexists = config.getBoolean("database.history.exists", true);
}
@Override
@ -129,7 +131,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
@Override
public boolean exists() {
return true;
return databaseexists;
}
@Override

@ -18,10 +18,17 @@
package com.alibaba.ververica.cdc.connectors.mysql;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.internal.DebeziumState;
import com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
import io.debezium.connector.mysql.MySqlConnector;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -49,6 +56,8 @@ public class MySQLSource {
private String serverTimeZone;
private String[] tableList;
private Properties dbzProperties;
private String sourceOffsetFile;
private Integer sourceOffsetPosition;
private DebeziumDeserializationSchema<T> deserializer;
public Builder<T> hostname(String hostname) {
@ -138,6 +147,22 @@ public class MySQLSource {
return this;
}
/**
* Sets the MySql source offset file name.
*/
public Builder<T> sourceOffsetFile(String sourceOffsetFile) {
this.sourceOffsetFile = sourceOffsetFile;
return this;
}
/**
* Sets the MySql source offset position.
*/
public Builder<T> sourceOffsetPosition(Integer sourceOffsetPosition) {
this.sourceOffsetPosition = sourceOffsetPosition;
return this;
}
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
@ -165,6 +190,30 @@ public class MySQLSource {
if (serverTimeZone != null) {
props.setProperty("database.serverTimezone", serverTimeZone);
}
if (sourceOffsetFile != null && sourceOffsetPosition != null) {
// if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must be configured
props.setProperty("snapshot.mode", "schema_only_recovery");
DebeziumState debeziumState = new DebeziumState();
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("server", props.getProperty("database.server.name"));
debeziumState.setSourcePartition(sourcePartition);
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put("file", sourceOffsetFile);
sourceOffset.put("pos", sourceOffsetPosition);
debeziumState.setSourceOffset(sourceOffset);
try {
ObjectMapper objectMapper = new ObjectMapper();
String offsetJson = objectMapper.writeValueAsString(debeziumState);
// if the task is restored from savepoint, it will be overwritten by restoredOffsetState
props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, offsetJson);
props.setProperty("database.history.exists", "false");
} catch (IOException e) {
throw new RuntimeException("Can't serialize debezium offset state from Object: " + debeziumState, e);
}
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.mysql.options;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Offset option for MySql.
*/
public class MySQLOffsetOptions {
@Nullable
private final String sourceOffsetFile;
@Nullable
private final Integer sourceOffsetPosition;
private MySQLOffsetOptions(@Nullable String sourceOffsetFile, @Nullable Integer sourceOffsetPosition) {
this.sourceOffsetFile = sourceOffsetFile;
this.sourceOffsetPosition = sourceOffsetPosition;
}
@Nullable
public String getSourceOffsetFile() {
return sourceOffsetFile;
}
@Nullable
public Integer getSourceOffsetPosition() {
return sourceOffsetPosition;
}
@Override
public String toString() {
return "MySQLOffsetOptions{" +
"sourceOffsetFile='" + sourceOffsetFile + '\'' +
", sourceOffsetPosition='" + sourceOffsetPosition + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MySQLOffsetOptions that = (MySQLOffsetOptions) o;
return Objects.equals(sourceOffsetFile, that.sourceOffsetFile) &&
Objects.equals(sourceOffsetPosition, that.sourceOffsetPosition);
}
@Override
public int hashCode() {
return Objects.hash(sourceOffsetFile, sourceOffsetPosition);
}
/**
* Creates a builder of {@link MySQLOffsetOptions}.
*/
public static Builder builder() {
return new Builder();
}
/**
* Builder for {@link MySQLOffsetOptions}.
*/
public static class Builder {
private String sourceOffsetFile;
private Integer sourceOffsetPosition;
/**
* Sets the MySql source offset file name.
*/
public Builder sourceOffsetFile(String sourceOffsetFile) {
this.sourceOffsetFile = sourceOffsetFile;
return this;
}
/**
* Sets the MySql source offset position.
*/
public Builder sourceOffsetPosition(Integer sourceOffsetPosition) {
this.sourceOffsetPosition = sourceOffsetPosition;
return this;
}
/**
* Creates an instance of {@link MySQLOffsetOptions}.
*/
public MySQLOffsetOptions build() {
return new MySQLOffsetOptions(sourceOffsetFile, sourceOffsetPosition);
}
}
}

@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
@ -58,6 +59,7 @@ public class MySQLTableSource implements ScanTableSource {
private final String tableName;
private final ZoneId serverTimeZone;
private final Properties dbzProperties;
private final MySQLOffsetOptions offsetOptions;
public MySQLTableSource(
TableSchema physicalSchema,
@ -69,7 +71,8 @@ public class MySQLTableSource implements ScanTableSource {
String password,
ZoneId serverTimeZone,
Properties dbzProperties,
@Nullable Integer serverId) {
@Nullable Integer serverId,
MySQLOffsetOptions offsetOptions) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -80,6 +83,7 @@ public class MySQLTableSource implements ScanTableSource {
this.serverId = serverId;
this.serverTimeZone = serverTimeZone;
this.dbzProperties = dbzProperties;
this.offsetOptions = offsetOptions;
}
@Override
@ -111,7 +115,9 @@ public class MySQLTableSource implements ScanTableSource {
.password(password)
.serverTimeZone(serverTimeZone.toString())
.debeziumProperties(dbzProperties)
.deserializer(deserializer);
.deserializer(deserializer)
.sourceOffsetFile(offsetOptions.getSourceOffsetFile())
.sourceOffsetPosition(offsetOptions.getSourceOffsetPosition());
Optional.ofNullable(serverId).ifPresent(builder::serverId);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
@ -121,16 +127,17 @@ public class MySQLTableSource implements ScanTableSource {
@Override
public DynamicTableSource copy() {
return new MySQLTableSource(
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId,
offsetOptions
);
}
@ -152,12 +159,13 @@ public class MySQLTableSource implements ScanTableSource {
Objects.equals(serverId, that.serverId) &&
Objects.equals(tableName, that.tableName) &&
Objects.equals(serverTimeZone, that.serverTimeZone) &&
Objects.equals(dbzProperties, that.dbzProperties);
Objects.equals(dbzProperties, that.dbzProperties) &&
Objects.equals(offsetOptions, that.offsetOptions);
}
@Override
public int hashCode() {
return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, serverTimeZone, dbzProperties);
return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, serverTimeZone, dbzProperties, offsetOptions);
}
@Override

@ -27,6 +27,7 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions;
import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions;
import java.time.ZoneId;
@ -85,6 +86,16 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
"MySQL database cluster as another server (with this unique ID) so it can read the binlog. " +
"By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.");
private static final ConfigOption<String> SOURCE_OFFSET_FILE = ConfigOptions.key("source-offset-file")
.stringType()
.noDefaultValue()
.withDescription("File Name of the MySQL binlog.");
private static final ConfigOption<Integer> SOURCE_OFFSET_POSITION = ConfigOptions.key("source-offset-pos")
.intType()
.noDefaultValue()
.withDescription("Position of the MySQL binlog.");
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
@ -100,6 +111,9 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
Integer serverId = config.getOptional(SERVER_ID).orElse(null);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
MySQLOffsetOptions.Builder builder = MySQLOffsetOptions.builder();
builder.sourceOffsetFile(config.get(SOURCE_OFFSET_FILE))
.sourceOffsetPosition(config.getOptional(SOURCE_OFFSET_POSITION).orElse(null));
return new MySQLTableSource(
physicalSchema,
@ -111,7 +125,8 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
password,
serverTimeZone,
getDebeziumProperties(context.getCatalogTable().getOptions()),
serverId
serverId,
builder.build()
);
}
@ -137,6 +152,8 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
options.add(PORT);
options.add(SERVER_TIME_ZONE);
options.add(SERVER_ID);
options.add(SOURCE_OFFSET_FILE);
options.add(SOURCE_OFFSET_POSITION);
return options;
}
}

@ -30,6 +30,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;
import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions;
import org.junit.Test;
import java.time.ZoneId;
@ -77,7 +78,8 @@ public class MySQLTableSourceFactoryTest {
MY_PASSWORD,
ZoneId.of("UTC"),
PROPERTIES,
null
null,
MySQLOffsetOptions.builder().build()
);
assertEquals(expectedSource, actualSource);
}
@ -103,7 +105,8 @@ public class MySQLTableSourceFactoryTest {
MY_PASSWORD,
ZoneId.of("Asia/Shanghai"),
dbzProperties,
4321
4321,
MySQLOffsetOptions.builder().build()
);
assertEquals(expectedSource, actualSource);
}

Loading…
Cancel
Save