diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java index 0b47a638e..1b9fe6cfa 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java @@ -64,6 +64,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory { private ConcurrentLinkedQueue records; private String instanceName; + private boolean databaseexists; /** * Registers the given HistoryRecords into global variable under the given instance name, @@ -104,6 +105,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory { throw new IllegalStateException( String.format("Couldn't find engine instance %s in the global records.", instanceName)); } + this.databaseexists = config.getBoolean("database.history.exists", true); } @Override @@ -129,7 +131,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory { @Override public boolean exists() { - return true; + return databaseexists; } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java index 2405492b0..30f0ce37c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/MySQLSource.java @@ -18,10 +18,17 @@ package com.alibaba.ververica.cdc.connectors.mysql; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; +import com.alibaba.ververica.cdc.debezium.internal.DebeziumState; +import com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore; import io.debezium.connector.mysql.MySqlConnector; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -49,6 +56,8 @@ public class MySQLSource { private String serverTimeZone; private String[] tableList; private Properties dbzProperties; + private String sourceOffsetFile; + private Integer sourceOffsetPosition; private DebeziumDeserializationSchema deserializer; public Builder hostname(String hostname) { @@ -138,6 +147,22 @@ public class MySQLSource { return this; } + /** + * Sets the MySql source offset file name. + */ + public Builder sourceOffsetFile(String sourceOffsetFile) { + this.sourceOffsetFile = sourceOffsetFile; + return this; + } + + /** + * Sets the MySql source offset position. + */ + public Builder sourceOffsetPosition(Integer sourceOffsetPosition) { + this.sourceOffsetPosition = sourceOffsetPosition; + return this; + } + public DebeziumSourceFunction build() { Properties props = new Properties(); props.setProperty("connector.class", MySqlConnector.class.getCanonicalName()); @@ -165,6 +190,30 @@ public class MySQLSource { if (serverTimeZone != null) { props.setProperty("database.serverTimezone", serverTimeZone); } + if (sourceOffsetFile != null && sourceOffsetPosition != null) { + // if binlog offset is specified, 'snapshot.mode=schema_only_recovery' must be configured + props.setProperty("snapshot.mode", "schema_only_recovery"); + + DebeziumState debeziumState = new DebeziumState(); + Map sourcePartition = new HashMap<>(); + sourcePartition.put("server", props.getProperty("database.server.name")); + debeziumState.setSourcePartition(sourcePartition); + + Map sourceOffset = new HashMap<>(); + sourceOffset.put("file", sourceOffsetFile); + sourceOffset.put("pos", sourceOffsetPosition); + debeziumState.setSourceOffset(sourceOffset); + + try { + ObjectMapper objectMapper = new ObjectMapper(); + String offsetJson = objectMapper.writeValueAsString(debeziumState); + // if the task is restored from savepoint, it will be overwritten by restoredOffsetState + props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, offsetJson); + props.setProperty("database.history.exists", "false"); + } catch (IOException e) { + throw new RuntimeException("Can't serialize debezium offset state from Object: " + debeziumState, e); + } + } if (dbzProperties != null) { dbzProperties.forEach(props::put); diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/options/MySQLOffsetOptions.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/options/MySQLOffsetOptions.java new file mode 100644 index 000000000..b8c6aa4bb --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/options/MySQLOffsetOptions.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.ververica.cdc.connectors.mysql.options; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * Offset option for MySql. + */ +public class MySQLOffsetOptions { + + @Nullable + private final String sourceOffsetFile; + @Nullable + private final Integer sourceOffsetPosition; + + private MySQLOffsetOptions(@Nullable String sourceOffsetFile, @Nullable Integer sourceOffsetPosition) { + this.sourceOffsetFile = sourceOffsetFile; + this.sourceOffsetPosition = sourceOffsetPosition; + } + + @Nullable + public String getSourceOffsetFile() { + return sourceOffsetFile; + } + + @Nullable + public Integer getSourceOffsetPosition() { + return sourceOffsetPosition; + } + + @Override + public String toString() { + return "MySQLOffsetOptions{" + + "sourceOffsetFile='" + sourceOffsetFile + '\'' + + ", sourceOffsetPosition='" + sourceOffsetPosition + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MySQLOffsetOptions that = (MySQLOffsetOptions) o; + return Objects.equals(sourceOffsetFile, that.sourceOffsetFile) && + Objects.equals(sourceOffsetPosition, that.sourceOffsetPosition); + } + + @Override + public int hashCode() { + return Objects.hash(sourceOffsetFile, sourceOffsetPosition); + } + + /** + * Creates a builder of {@link MySQLOffsetOptions}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link MySQLOffsetOptions}. + */ + public static class Builder { + + private String sourceOffsetFile; + private Integer sourceOffsetPosition; + + /** + * Sets the MySql source offset file name. + */ + public Builder sourceOffsetFile(String sourceOffsetFile) { + this.sourceOffsetFile = sourceOffsetFile; + return this; + } + + /** + * Sets the MySql source offset position. + */ + public Builder sourceOffsetPosition(Integer sourceOffsetPosition) { + this.sourceOffsetPosition = sourceOffsetPosition; + return this; + } + + /** + * Creates an instance of {@link MySQLOffsetOptions}. + */ + public MySQLOffsetOptions build() { + return new MySQLOffsetOptions(sourceOffsetFile, sourceOffsetPosition); + } + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java index c2468fa38..a8abf5442 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSource.java @@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; +import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions; import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; @@ -58,6 +59,7 @@ public class MySQLTableSource implements ScanTableSource { private final String tableName; private final ZoneId serverTimeZone; private final Properties dbzProperties; + private final MySQLOffsetOptions offsetOptions; public MySQLTableSource( TableSchema physicalSchema, @@ -69,7 +71,8 @@ public class MySQLTableSource implements ScanTableSource { String password, ZoneId serverTimeZone, Properties dbzProperties, - @Nullable Integer serverId) { + @Nullable Integer serverId, + MySQLOffsetOptions offsetOptions) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -80,6 +83,7 @@ public class MySQLTableSource implements ScanTableSource { this.serverId = serverId; this.serverTimeZone = serverTimeZone; this.dbzProperties = dbzProperties; + this.offsetOptions = offsetOptions; } @Override @@ -111,7 +115,9 @@ public class MySQLTableSource implements ScanTableSource { .password(password) .serverTimeZone(serverTimeZone.toString()) .debeziumProperties(dbzProperties) - .deserializer(deserializer); + .deserializer(deserializer) + .sourceOffsetFile(offsetOptions.getSourceOffsetFile()) + .sourceOffsetPosition(offsetOptions.getSourceOffsetPosition()); Optional.ofNullable(serverId).ifPresent(builder::serverId); DebeziumSourceFunction sourceFunction = builder.build(); @@ -121,16 +127,17 @@ public class MySQLTableSource implements ScanTableSource { @Override public DynamicTableSource copy() { return new MySQLTableSource( - physicalSchema, - port, - hostname, - database, - tableName, - username, - password, - serverTimeZone, - dbzProperties, - serverId + physicalSchema, + port, + hostname, + database, + tableName, + username, + password, + serverTimeZone, + dbzProperties, + serverId, + offsetOptions ); } @@ -152,12 +159,13 @@ public class MySQLTableSource implements ScanTableSource { Objects.equals(serverId, that.serverId) && Objects.equals(tableName, that.tableName) && Objects.equals(serverTimeZone, that.serverTimeZone) && - Objects.equals(dbzProperties, that.dbzProperties); + Objects.equals(dbzProperties, that.dbzProperties) && + Objects.equals(offsetOptions, that.offsetOptions); } @Override public int hashCode() { - return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, serverTimeZone, dbzProperties); + return Objects.hash(physicalSchema, port, hostname, database, username, password, serverId, tableName, serverTimeZone, dbzProperties, offsetOptions); } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java index 5b1654b59..07629862f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; +import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions; import com.alibaba.ververica.cdc.debezium.table.DebeziumOptions; import java.time.ZoneId; @@ -85,6 +86,16 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { "MySQL database cluster as another server (with this unique ID) so it can read the binlog. " + "By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value."); + private static final ConfigOption SOURCE_OFFSET_FILE = ConfigOptions.key("source-offset-file") + .stringType() + .noDefaultValue() + .withDescription("File Name of the MySQL binlog."); + + private static final ConfigOption SOURCE_OFFSET_POSITION = ConfigOptions.key("source-offset-pos") + .intType() + .noDefaultValue() + .withDescription("Position of the MySQL binlog."); + @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); @@ -100,6 +111,9 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { Integer serverId = config.getOptional(SERVER_ID).orElse(null); ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + MySQLOffsetOptions.Builder builder = MySQLOffsetOptions.builder(); + builder.sourceOffsetFile(config.get(SOURCE_OFFSET_FILE)) + .sourceOffsetPosition(config.getOptional(SOURCE_OFFSET_POSITION).orElse(null)); return new MySQLTableSource( physicalSchema, @@ -111,7 +125,8 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { password, serverTimeZone, getDebeziumProperties(context.getCatalogTable().getOptions()), - serverId + serverId, + builder.build() ); } @@ -137,6 +152,8 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { options.add(PORT); options.add(SERVER_TIME_ZONE); options.add(SERVER_ID); + options.add(SOURCE_OFFSET_FILE); + options.add(SOURCE_OFFSET_POSITION); return options; } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java index cd4ebbaf6..fbf7d16d7 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactoryTest.java @@ -30,6 +30,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.ExceptionUtils; +import com.alibaba.ververica.cdc.connectors.mysql.options.MySQLOffsetOptions; import org.junit.Test; import java.time.ZoneId; @@ -77,7 +78,8 @@ public class MySQLTableSourceFactoryTest { MY_PASSWORD, ZoneId.of("UTC"), PROPERTIES, - null + null, + MySQLOffsetOptions.builder().build() ); assertEquals(expectedSource, actualSource); } @@ -103,7 +105,8 @@ public class MySQLTableSourceFactoryTest { MY_PASSWORD, ZoneId.of("Asia/Shanghai"), dbzProperties, - 4321 + 4321, + MySQLOffsetOptions.builder().build() ); assertEquals(expectedSource, actualSource); }