diff --git a/README.md b/README.md
index 4d7365a99..1737c5b48 100644
--- a/README.md
+++ b/README.md
@@ -59,7 +59,7 @@ Include following Maven dependency (available through Maven Central):
com.alibaba.ververica
flink-connector-mysql-cdc
- 1.0.0
+ 1.2.0
```
diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java
index 102359d09..5738fb0c9 100644
--- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java
+++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java
@@ -18,8 +18,6 @@
package com.alibaba.ververica.cdc.debezium.internal;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-
import io.debezium.config.Configuration;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
@@ -58,7 +56,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
* because we only monitor the schema changes for one single table.
*
* @see
- * com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(FunctionSnapshotContext)
+ * com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext)
*/
public static final Map> ALL_RECORDS =
new HashMap<>();
diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java
index 86fbfe2a7..a330a7fce 100644
--- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java
+++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java
@@ -100,7 +100,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
.stringType()
.defaultValue("initial")
.withDescription(
- "Optional startup mode for Kafka consumer, valid enumerations are "
+ "Optional startup mode for MySQL CDC consumer, valid enumerations are "
+ "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n"
+ "or \"specific-offset\"");
@@ -125,18 +125,6 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");
- private static final ConfigOption SOURCE_OFFSET_FILE =
- ConfigOptions.key("source-offset-file")
- .stringType()
- .noDefaultValue()
- .withDescription("File Name of the MySQL binlog.");
-
- private static final ConfigOption SOURCE_OFFSET_POSITION =
- ConfigOptions.key("source-offset-pos")
- .intType()
- .noDefaultValue()
- .withDescription("Position of the MySQL binlog.");
-
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java
deleted file mode 100644
index f8ad44963..000000000
--- a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.ververica.cdc.connectors.postgres;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.util.Collector;
-
-import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
-import io.debezium.heartbeat.Heartbeat;
-import org.apache.kafka.connect.source.SourceRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema}
- * to drop heartbeat events.
- *
- * @see Heartbeat
- */
-public class HeartbeatEventFilter implements DebeziumDeserializationSchema {
- private static final long serialVersionUID = -4450118969976653497L;
-
- private final String heartbeatTopicPrefix;
- private final DebeziumDeserializationSchema serializer;
-
- public HeartbeatEventFilter(
- String heartbeatTopicPrefix, DebeziumDeserializationSchema serializer) {
- this.heartbeatTopicPrefix = checkNotNull(heartbeatTopicPrefix);
- this.serializer = checkNotNull(serializer);
- }
-
- @Override
- public void deserialize(SourceRecord record, Collector out) throws Exception {
- String topic = record.topic();
- if (topic != null && topic.startsWith(heartbeatTopicPrefix)) {
- // drop heartbeat events
- return;
- }
- serializer.deserialize(record, out);
- }
-
- @Override
- public TypeInformation getProducedType() {
- return serializer.getProducedType();
- }
-}