[mysql] Minor code improvement

pull/334/head
Jark Wu 4 years ago
parent 8925d943bf
commit 116cea690d
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -59,7 +59,7 @@ Include following Maven dependency (available through Maven Central):
<groupId>com.alibaba.ververica</groupId> <groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database --> <!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId> <artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.0.0</version> <version>1.2.0</version>
</dependency> </dependency>
``` ```

@ -18,8 +18,6 @@
package com.alibaba.ververica.cdc.debezium.internal; package com.alibaba.ververica.cdc.debezium.internal;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.relational.history.AbstractDatabaseHistory; import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.DatabaseHistory;
@ -58,7 +56,7 @@ public class FlinkDatabaseHistory extends AbstractDatabaseHistory {
* because we only monitor the schema changes for one single table. * because we only monitor the schema changes for one single table.
* *
* @see * @see
* com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(FunctionSnapshotContext) * com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext)
*/ */
public static final Map<String, ConcurrentLinkedQueue<HistoryRecord>> ALL_RECORDS = public static final Map<String, ConcurrentLinkedQueue<HistoryRecord>> ALL_RECORDS =
new HashMap<>(); new HashMap<>();

@ -100,7 +100,7 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
.stringType() .stringType()
.defaultValue("initial") .defaultValue("initial")
.withDescription( .withDescription(
"Optional startup mode for Kafka consumer, valid enumerations are " "Optional startup mode for MySQL CDC consumer, valid enumerations are "
+ "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n" + "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n"
+ "or \"specific-offset\""); + "or \"specific-offset\"");
@ -125,18 +125,6 @@ public class MySQLTableSourceFactory implements DynamicTableSourceFactory {
.withDescription( .withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode"); "Optional timestamp used in case of \"timestamp\" startup mode");
private static final ConfigOption<String> SOURCE_OFFSET_FILE =
ConfigOptions.key("source-offset-file")
.stringType()
.noDefaultValue()
.withDescription("File Name of the MySQL binlog.");
private static final ConfigOption<Integer> SOURCE_OFFSET_POSITION =
ConfigOptions.key("source-offset-pos")
.intType()
.noDefaultValue()
.withDescription("Position of the MySQL binlog.");
@Override @Override
public DynamicTableSource createDynamicTableSource(Context context) { public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = final FactoryUtil.TableFactoryHelper helper =

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.postgres;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.heartbeat.Heartbeat;
import org.apache.kafka.connect.source.SourceRecord;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema}
* to drop heartbeat events.
*
* @see Heartbeat
*/
public class HeartbeatEventFilter<T> implements DebeziumDeserializationSchema<T> {
private static final long serialVersionUID = -4450118969976653497L;
private final String heartbeatTopicPrefix;
private final DebeziumDeserializationSchema<T> serializer;
public HeartbeatEventFilter(
String heartbeatTopicPrefix, DebeziumDeserializationSchema<T> serializer) {
this.heartbeatTopicPrefix = checkNotNull(heartbeatTopicPrefix);
this.serializer = checkNotNull(serializer);
}
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
String topic = record.topic();
if (topic != null && topic.startsWith(heartbeatTopicPrefix)) {
// drop heartbeat events
return;
}
serializer.deserialize(record, out);
}
@Override
public TypeInformation<T> getProducedType() {
return serializer.getProducedType();
}
}
Loading…
Cancel
Save