[mysql] Support read binlog from specific timestamp

release-1.2
Jark Wu 4 years ago
parent 5fcfe8ca05
commit 5264e80a7b
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -141,14 +141,6 @@ under the License.
<!-- tests will have log4j as the default logging framework available -->
<!-- Logging API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>

@ -212,6 +212,14 @@ public class MySQLSource {
specificOffset.setSourceOffset(sourceOffset);
break;
case TIMESTAMP:
checkNotNull(deserializer);
props.setProperty("snapshot.mode", "never");
deserializer = new SeekBinlogToTimestampFilter<>(
startupOptions.startupTimestampMillis,
deserializer);
break;
default:
throw new UnsupportedOperationException();
}

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.ververica.cdc.connectors.mysql;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link DebeziumDeserializationSchema} which wraps a real {@link DebeziumDeserializationSchema}
* to seek binlog to the specific timestamp.
*/
public class SeekBinlogToTimestampFilter<T> implements DebeziumDeserializationSchema<T> {
private static final long serialVersionUID = -4450118969976653497L;
protected static final Logger LOG = LoggerFactory.getLogger(SeekBinlogToTimestampFilter.class);
private final long startupTimestampMillis;
private final DebeziumDeserializationSchema<T> serializer;
private transient boolean find = false;
private transient long filtered = 0L;
public SeekBinlogToTimestampFilter(long startupTimestampMillis, DebeziumDeserializationSchema<T> serializer) {
this.startupTimestampMillis = startupTimestampMillis;
this.serializer = serializer;
}
@Override
public void deserialize(SourceRecord record, Collector<T> out) throws Exception {
if (find) {
serializer.deserialize(record, out);
return;
}
if (filtered == 0) {
LOG.info("Begin to seek binlog to the specific timestamp {}.", startupTimestampMillis);
}
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Long ts = source.getInt64(Envelope.FieldName.TIMESTAMP);
if (ts != null && ts >= startupTimestampMillis) {
serializer.deserialize(record, out);
find = true;
LOG.info("Successfully seek to the specific timestamp {} with filtered {} change events.",
startupTimestampMillis,
filtered);
} else {
filtered++;
if (filtered % 10000 == 0) {
LOG.info("Seeking binlog to specific timestamp with filtered {} change events.", filtered);
}
}
}
@Override
public TypeInformation<T> getProducedType() {
return serializer.getProducedType();
}
}

@ -474,6 +474,66 @@ public class MySQLConnectorITCase extends MySQLTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testStartupFromTimestamp() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL = String.format(
"CREATE TABLE debezium_source (" +
" id INT NOT NULL," +
" name STRING," +
" description STRING," +
" weight DECIMAL(10,3)" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = '%s'," +
" 'port' = '%s'," +
" 'username' = '%s'," +
" 'password' = '%s'," +
" 'database-name' = '%s'," +
" 'table-name' = '%s'," +
" 'scan.startup.mode' = 'timestamp'," +
" 'scan.startup.timestamp-millis' = '%s'" +
")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"products",
System.currentTimeMillis());
String sinkDDL = "CREATE TABLE sink " +
" WITH (" +
" 'connector' = 'values'," +
" 'sink-insert-only' = 'false'" +
") LIKE debezium_source (EXCLUDING OPTIONS)";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
Thread.sleep(5000L);
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute("INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute("UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
waitForSinkSize("sink", 7);
String[] expected = new String[]{"110,jacket,new water resistent white wind breaker,0.500"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
// ------------------------------------------------------------------------------------
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {

Loading…
Cancel
Save