From 116cea690db4fc78814792c2383814f7e237e114 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Sat, 27 Feb 2021 22:53:14 +0800 Subject: [PATCH] [mysql] Minor code improvement --- README.md | 2 +- .../internal/FlinkDatabaseHistory.java | 4 +- .../mysql/table/MySQLTableSourceFactory.java | 14 +---- .../postgres/HeartbeatEventFilter.java | 62 ------------------- 4 files changed, 3 insertions(+), 79 deletions(-) delete mode 100644 flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java diff --git a/README.md b/README.md index 4d7365a99..1737c5b48 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ Include following Maven dependency (available through Maven Central): com.alibaba.ververica flink-connector-mysql-cdc - 1.0.0 + 1.2.0 ``` diff --git a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java index 102359d09..5738fb0c9 100644 --- a/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java +++ b/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/internal/FlinkDatabaseHistory.java @@ -18,8 +18,6 @@ package com.alibaba.ververica.cdc.debezium.internal; -import org.apache.flink.runtime.state.FunctionSnapshotContext; - import io.debezium.config.Configuration; import io.debezium.relational.history.AbstractDatabaseHistory; import io.debezium.relational.history.DatabaseHistory; @@ -58,7 +56,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory { * because we only monitor the schema changes for one single table. * * @see - * com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(FunctionSnapshotContext) + * com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext) */ public static final Map> ALL_RECORDS = new HashMap<>(); diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java index 86fbfe2a7..a330a7fce 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/MySQLTableSourceFactory.java @@ -100,7 +100,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { .stringType() .defaultValue("initial") .withDescription( - "Optional startup mode for Kafka consumer, valid enumerations are " + "Optional startup mode for MySQL CDC consumer, valid enumerations are " + "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n" + "or \"specific-offset\""); @@ -125,18 +125,6 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory { .withDescription( "Optional timestamp used in case of \"timestamp\" startup mode"); - private static final ConfigOption SOURCE_OFFSET_FILE = - ConfigOptions.key("source-offset-file") - .stringType() - .noDefaultValue() - .withDescription("File Name of the MySQL binlog."); - - private static final ConfigOption SOURCE_OFFSET_POSITION = - ConfigOptions.key("source-offset-pos") - .intType() - .noDefaultValue() - .withDescription("Position of the MySQL binlog."); - @Override public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = diff --git a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java b/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java deleted file mode 100644 index f8ad44963..000000000 --- a/flink-connector-postgres-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/postgres/HeartbeatEventFilter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.ververica.cdc.connectors.postgres; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.util.Collector; - -import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; -import io.debezium.heartbeat.Heartbeat; -import org.apache.kafka.connect.source.SourceRecord; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema} - * to drop heartbeat events. - * - * @see Heartbeat - */ -public class HeartbeatEventFilter implements DebeziumDeserializationSchema { - private static final long serialVersionUID = -4450118969976653497L; - - private final String heartbeatTopicPrefix; - private final DebeziumDeserializationSchema serializer; - - public HeartbeatEventFilter( - String heartbeatTopicPrefix, DebeziumDeserializationSchema serializer) { - this.heartbeatTopicPrefix = checkNotNull(heartbeatTopicPrefix); - this.serializer = checkNotNull(serializer); - } - - @Override - public void deserialize(SourceRecord record, Collector out) throws Exception { - String topic = record.topic(); - if (topic != null && topic.startsWith(heartbeatTopicPrefix)) { - // drop heartbeat events - return; - } - serializer.deserialize(record, out); - } - - @Override - public TypeInformation getProducedType() { - return serializer.getProducedType(); - } -}