[mysql] Support MySQL source function which can consume snapshot and continue binlog
parent
c99f00d30b
commit
1286a9920c
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.debezium.utils;
|
||||
|
||||
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
||||
import org.apache.flink.streaming.api.watermark.Watermark;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
/**
|
||||
* A testable {@link SourceFunction.SourceContext}.
|
||||
*/
|
||||
public class TestSourceContext<T> implements SourceFunction.SourceContext<T> {
|
||||
|
||||
private final Object checkpointLock = new Object();
|
||||
|
||||
private LinkedBlockingQueue<StreamRecord<T>> collectedOutputs = new LinkedBlockingQueue<>();
|
||||
|
||||
@Override
|
||||
public void collect(T element) {
|
||||
this.collectedOutputs.add(new StreamRecord<>(element));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collectWithTimestamp(T element, long timestamp) {
|
||||
this.collectedOutputs.offer(new StreamRecord<>(element, timestamp));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emitWatermark(Watermark mark) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markAsTemporarilyIdle() {}
|
||||
|
||||
@Override
|
||||
public Object getCheckpointLock() {
|
||||
return checkpointLock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
|
||||
public StreamRecord<T> removeLatestOutput() {
|
||||
return collectedOutputs.poll();
|
||||
}
|
||||
|
||||
public LinkedBlockingQueue<StreamRecord<T>> getCollectedOutputs() {
|
||||
return collectedOutputs;
|
||||
}
|
||||
}
|
@ -0,0 +1,137 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.connectors.mysql;
|
||||
|
||||
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
|
||||
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
|
||||
import io.debezium.connector.mysql.MySqlConnector;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.flink.util.Preconditions.checkNotNull;
|
||||
|
||||
/**
|
||||
* A builder to build a SourceFunction which can read snapshot and continue to consume binlog.
|
||||
*/
|
||||
public class MySqlBinlogSource {
|
||||
|
||||
public static <T> Builder<T> builder() {
|
||||
return new Builder<>();
|
||||
}
|
||||
|
||||
static class Builder<T> {
|
||||
|
||||
private int port = 3306; // default 3306 port
|
||||
private String hostname;
|
||||
private String database;
|
||||
private String username;
|
||||
private String password;
|
||||
private Integer serverId;
|
||||
private String tableName;
|
||||
private DebeziumDeserializationSchema<T> deserializer;
|
||||
|
||||
public Builder<T> hostname(String hostname) {
|
||||
this.hostname = hostname;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Integer port number of the MySQL database server.
|
||||
*/
|
||||
public Builder<T> port(int port) {
|
||||
this.port = port;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The database name to consume.
|
||||
*/
|
||||
public Builder<T> database(String database) {
|
||||
this.database = database;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The table name to consume.
|
||||
*/
|
||||
public Builder<T> tableName(String tableName) {
|
||||
this.tableName = tableName;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Name of the MySQL database to use when connecting to the MySQL database server.
|
||||
*/
|
||||
public Builder<T> username(String username) {
|
||||
this.username = username;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Password to use when connecting to the MySQL database server.
|
||||
*/
|
||||
public Builder<T> password(String password) {
|
||||
this.password = password;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A numeric ID of this database client, which must be unique across all currently-running
|
||||
* database processes in the MySQL cluster. This connector joins the MySQL database cluster
|
||||
* as another server (with this unique ID) so it can read the binlog. By default, a random
|
||||
* number is generated between 5400 and 6400, though we recommend setting an explicit value.
|
||||
*/
|
||||
public Builder<T> serverId(int serverId) {
|
||||
this.serverId = serverId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The deserializer used to convert from consumed {@link org.apache.kafka.connect.source.SourceRecord}.
|
||||
*/
|
||||
public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
|
||||
this.deserializer = deserializer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DebeziumSourceFunction<T> build() {
|
||||
Properties props = new Properties();
|
||||
props.setProperty("connector.class", MySqlConnector.class.getCanonicalName());
|
||||
// hard code server name, because we don't need to distinguish it, docs:
|
||||
// Logical name that identifies and provides a namespace for the particular MySQL database
|
||||
// server/cluster being monitored. The logical name should be unique across all other connectors,
|
||||
// since it is used as a prefix for all Kafka topic names emanating from this connector.
|
||||
// Only alphanumeric characters and underscores should be used.
|
||||
props.setProperty("database.server.name", "mysql-binlog-source");
|
||||
props.setProperty("database.hostname", checkNotNull(hostname));
|
||||
props.setProperty("database.port", String.valueOf(port));
|
||||
props.setProperty("database.whitelist", checkNotNull(database));
|
||||
props.setProperty("database.user", checkNotNull(username));
|
||||
props.setProperty("database.password", checkNotNull(password));
|
||||
props.setProperty("database.server.id", checkNotNull(serverId).toString());
|
||||
// An optional comma-separated list of regular expressions that match fully-qualified
|
||||
// table identifiers for tables to be monitored; any table not included in the whitelist
|
||||
// will be excluded from monitoring. Each identifier is of the form databaseName.tableName.
|
||||
props.setProperty("table.whitelist", checkNotNull(database) + "." + checkNotNull(tableName));
|
||||
return new DebeziumSourceFunction<>(
|
||||
deserializer,
|
||||
props);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,587 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.connectors.mysql;
|
||||
|
||||
import org.apache.flink.api.common.state.BroadcastState;
|
||||
import org.apache.flink.api.common.state.KeyedStateStore;
|
||||
import org.apache.flink.api.common.state.ListState;
|
||||
import org.apache.flink.api.common.state.ListStateDescriptor;
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||
import org.apache.flink.api.common.state.OperatorStateStore;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.testutils.CheckedThread;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.Preconditions;
|
||||
|
||||
import com.alibaba.ververica.cdc.connectors.mysql.utils.UniqueDatabase;
|
||||
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
|
||||
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
|
||||
import com.alibaba.ververica.cdc.debezium.utils.TestSourceContext;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Statement;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.alibaba.ververica.cdc.connectors.mysql.utils.AssertUtils.assertDelete;
|
||||
import static com.alibaba.ververica.cdc.connectors.mysql.utils.AssertUtils.assertInsert;
|
||||
import static com.alibaba.ververica.cdc.connectors.mysql.utils.AssertUtils.assertUpdate;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests for {@link MySqlBinlogSource} which also heavily tests {@link DebeziumSourceFunction}.
|
||||
*/
|
||||
public class MySqlBinlogSourceTest extends MySqlBinlogTestBase {
|
||||
|
||||
private final UniqueDatabase DATABASE = new UniqueDatabase(
|
||||
mysqlContainer,
|
||||
"inventory",
|
||||
"mysqluser",
|
||||
"mysqlpw");
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
DATABASE.createAndInitialize();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsumingAllEvents() throws Exception {
|
||||
DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
|
||||
TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
|
||||
|
||||
setupSource(source);
|
||||
|
||||
try (Connection connection = DATABASE.getJdbcConnection();
|
||||
Statement statement = connection.createStatement()) {
|
||||
|
||||
// start the source
|
||||
final CheckedThread runThread = new CheckedThread() {
|
||||
@Override
|
||||
public void go() throws Exception {
|
||||
source.run(sourceContext);
|
||||
}
|
||||
};
|
||||
runThread.start();
|
||||
|
||||
List<SourceRecord> records = drain(sourceContext, 9);
|
||||
assertEquals(9, records.size());
|
||||
for (int i = 0; i < records.size(); i++) {
|
||||
assertInsert(records.get(i), "id", 101 + i);
|
||||
}
|
||||
|
||||
statement.execute("INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"); // 110
|
||||
records = drain(sourceContext, 1);
|
||||
assertInsert(records.get(0), "id", 110);
|
||||
|
||||
statement.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56)"); // 1001
|
||||
records = drain(sourceContext, 1);
|
||||
assertInsert(records.get(0), "id", 1001);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Changing the primary key of a row should result in 2 events: INSERT, DELETE (TOMBSTONE is dropped)
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
statement.execute("UPDATE products SET id=2001, description='really old robot' WHERE id=1001");
|
||||
records = drain(sourceContext, 2);
|
||||
assertDelete(records.get(0), "id", 1001);
|
||||
assertInsert(records.get(1), "id", 2001);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Simple UPDATE (with no schema changes)
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
statement.execute("UPDATE products SET weight=1345.67 WHERE id=2001");
|
||||
records = drain(sourceContext, 1);
|
||||
assertUpdate(records.get(0), "id", 2001);
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Change our schema with a fully-qualified name; we should still see this event
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Add a column with default to the 'products' table and explicitly update one record ...
|
||||
statement.execute(String.format(
|
||||
"ALTER TABLE %s.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL AFTER description",
|
||||
DATABASE.getDatabaseName()));
|
||||
statement.execute("UPDATE products SET volume=13.5 WHERE id=2001");
|
||||
records = drain(sourceContext, 1);
|
||||
assertUpdate(records.get(0), "id", 2001);
|
||||
|
||||
// cleanup
|
||||
source.cancel();
|
||||
source.close();
|
||||
runThread.sync();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckpointAndRestore() throws Exception {
|
||||
final TestingListState<byte[]> offsetState = new TestingListState<>();
|
||||
final TestingListState<String> historyState = new TestingListState<>();
|
||||
int prevPos = 0;
|
||||
{
|
||||
// ---------------------------------------------------------------------------
|
||||
// Step-1: start the source from empty state
|
||||
// ---------------------------------------------------------------------------
|
||||
final DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
|
||||
// we use blocking context to block the source to emit before last snapshot record
|
||||
final BlockingSourceContext<SourceRecord> sourceContext = new BlockingSourceContext<>(8);
|
||||
// setup source with empty state
|
||||
setupSource(source, false, offsetState, historyState, true, 0, 1);
|
||||
|
||||
final CheckedThread runThread = new CheckedThread() {
|
||||
@Override
|
||||
public void go() throws Exception {
|
||||
source.run(sourceContext);
|
||||
}
|
||||
};
|
||||
runThread.start();
|
||||
|
||||
// wait until consumer is started
|
||||
int received = drain(sourceContext, 1).size();
|
||||
assertEquals(1, received);
|
||||
|
||||
// we can't perform checkpoint during DB snapshot
|
||||
assertFalse(waitForCheckpointLock(sourceContext.getCheckpointLock(), Duration.ofSeconds(3)));
|
||||
|
||||
// unblock the source context to continue the processing
|
||||
sourceContext.blocker.release();
|
||||
// wait until the source finishes the database snapshot
|
||||
List<SourceRecord> records = drain(sourceContext, 9 - received);
|
||||
assertEquals(9, records.size() + received);
|
||||
|
||||
// state is still empty
|
||||
assertEquals(0, offsetState.list.size());
|
||||
assertEquals(0, historyState.list.size());
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Step-2: trigger checkpoint-1 after snapshot finished
|
||||
// ---------------------------------------------------------------------------
|
||||
synchronized (sourceContext.getCheckpointLock()) {
|
||||
// trigger checkpoint-1
|
||||
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
|
||||
}
|
||||
|
||||
assertHistoryState(historyState);
|
||||
assertEquals(1, offsetState.list.size());
|
||||
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
|
||||
assertEquals("mysql-binlog-source", JsonPath.read(state, "$.sourcePartition.server"));
|
||||
assertEquals("mysql-bin.000003", JsonPath.read(state, "$.sourceOffset.file"));
|
||||
assertFalse(state.contains("row"));
|
||||
assertFalse(state.contains("server_id"));
|
||||
assertFalse(state.contains("event"));
|
||||
int pos = JsonPath.read(state, "$.sourceOffset.pos");
|
||||
assertTrue(pos > prevPos);
|
||||
prevPos = pos;
|
||||
|
||||
source.cancel();
|
||||
source.close();
|
||||
runThread.sync();
|
||||
}
|
||||
|
||||
{
|
||||
// ---------------------------------------------------------------------------
|
||||
// Step-3: restore the source from state
|
||||
// ---------------------------------------------------------------------------
|
||||
final DebeziumSourceFunction<SourceRecord> source2 = createMySqlBinlogSource();
|
||||
final TestSourceContext<SourceRecord> sourceContext2 = new TestSourceContext<>();
|
||||
setupSource(source2, true, offsetState, historyState, true, 0, 1);
|
||||
final CheckedThread runThread2 = new CheckedThread() {
|
||||
@Override
|
||||
public void go() throws Exception {
|
||||
source2.run(sourceContext2);
|
||||
}
|
||||
};
|
||||
runThread2.start();
|
||||
|
||||
// make sure there is no more events
|
||||
assertFalse(waitForAvailableRecords(Duration.ofSeconds(5), sourceContext2));
|
||||
|
||||
try (Connection connection = DATABASE.getJdbcConnection();
|
||||
Statement statement = connection.createStatement()) {
|
||||
|
||||
statement.execute("INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"); // 110
|
||||
List<SourceRecord> records = drain(sourceContext2, 1);
|
||||
assertEquals(1, records.size());
|
||||
assertInsert(records.get(0), "id", 110);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Step-4: trigger checkpoint-2 during DML operations
|
||||
// ---------------------------------------------------------------------------
|
||||
synchronized (sourceContext2.getCheckpointLock()) {
|
||||
// trigger checkpoint-1
|
||||
source2.snapshotState(new StateSnapshotContextSynchronousImpl(138, 138));
|
||||
}
|
||||
|
||||
assertHistoryState(historyState); // assert the DDL is stored in the history state
|
||||
assertEquals(1, offsetState.list.size());
|
||||
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
|
||||
assertEquals("mysql-binlog-source", JsonPath.read(state, "$.sourcePartition.server"));
|
||||
assertEquals("mysql-bin.000003", JsonPath.read(state, "$.sourceOffset.file"));
|
||||
assertEquals("1", JsonPath.read(state, "$.sourceOffset.row").toString());
|
||||
assertEquals("223344", JsonPath.read(state, "$.sourceOffset.server_id").toString());
|
||||
assertEquals("2", JsonPath.read(state, "$.sourceOffset.event").toString());
|
||||
int pos = JsonPath.read(state, "$.sourceOffset.pos");
|
||||
assertTrue(pos > prevPos);
|
||||
prevPos = pos;
|
||||
|
||||
// execute 2 more DMLs to have more binlog
|
||||
statement.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56)"); // 1001
|
||||
statement.execute("UPDATE products SET weight=1345.67 WHERE id=1001");
|
||||
}
|
||||
|
||||
// cancel the source
|
||||
source2.cancel();
|
||||
source2.close();
|
||||
runThread2.sync();
|
||||
}
|
||||
|
||||
{
|
||||
// ---------------------------------------------------------------------------
|
||||
// Step-5: restore the source from checkpoint-2
|
||||
// ---------------------------------------------------------------------------
|
||||
final DebeziumSourceFunction<SourceRecord> source3 = createMySqlBinlogSource();
|
||||
final TestSourceContext<SourceRecord> sourceContext3 = new TestSourceContext<>();
|
||||
setupSource(source3, true, offsetState, historyState, true, 0, 1);
|
||||
|
||||
// restart the source
|
||||
final CheckedThread runThread3 = new CheckedThread() {
|
||||
@Override
|
||||
public void go() throws Exception {
|
||||
source3.run(sourceContext3);
|
||||
}
|
||||
};
|
||||
runThread3.start();
|
||||
|
||||
// consume the unconsumed binlog
|
||||
List<SourceRecord> records = drain(sourceContext3, 2);
|
||||
assertInsert(records.get(0), "id", 1001);
|
||||
assertUpdate(records.get(1), "id", 1001);
|
||||
|
||||
// make sure there is no more events
|
||||
assertFalse(waitForAvailableRecords(Duration.ofSeconds(3), sourceContext3));
|
||||
|
||||
// can continue to receive new events
|
||||
try (Connection connection = DATABASE.getJdbcConnection();
|
||||
Statement statement = connection.createStatement()) {
|
||||
statement.execute("DELETE FROM products WHERE id=1001");
|
||||
}
|
||||
records = drain(sourceContext3, 1);
|
||||
assertDelete(records.get(0), "id", 1001);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Step-6: trigger checkpoint-2 to make sure we can continue to to further checkpoints
|
||||
// ---------------------------------------------------------------------------
|
||||
synchronized (sourceContext3.getCheckpointLock()) {
|
||||
// checkpoint 3
|
||||
source3.snapshotState(new StateSnapshotContextSynchronousImpl(233, 233));
|
||||
}
|
||||
assertHistoryState(historyState); // assert the DDL is stored in the history state
|
||||
assertEquals(1, offsetState.list.size());
|
||||
String state = new String(offsetState.list.get(0), StandardCharsets.UTF_8);
|
||||
assertEquals("mysql-binlog-source", JsonPath.read(state, "$.sourcePartition.server"));
|
||||
assertEquals("mysql-bin.000003", JsonPath.read(state, "$.sourceOffset.file"));
|
||||
assertEquals("1", JsonPath.read(state, "$.sourceOffset.row").toString());
|
||||
assertEquals("223344", JsonPath.read(state, "$.sourceOffset.server_id").toString());
|
||||
assertEquals("2", JsonPath.read(state, "$.sourceOffset.event").toString());
|
||||
int pos = JsonPath.read(state, "$.sourceOffset.pos");
|
||||
assertTrue(pos > prevPos);
|
||||
|
||||
source3.cancel();
|
||||
source3.close();
|
||||
runThread3.sync();
|
||||
}
|
||||
}
|
||||
|
||||
private void assertHistoryState(TestingListState<String> historyState) {
|
||||
// assert the DDL is stored in the history state
|
||||
assertEquals(4, historyState.list.size());
|
||||
String lastHistory = historyState.list.get(3);
|
||||
assertEquals("mysql-binlog-source", JsonPath.read(lastHistory, "$.source.server"));
|
||||
assertEquals("true", JsonPath.read(lastHistory, "$.position.snapshot").toString());
|
||||
assertTrue(JsonPath.read(lastHistory, "$.databaseName").toString().startsWith("inventory"));
|
||||
assertTrue(JsonPath.read(lastHistory, "$.ddl").toString().startsWith("CREATE TABLE `products`"));
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// ------------------------------------------------------------------------------------------
|
||||
|
||||
private DebeziumSourceFunction<SourceRecord> createMySqlBinlogSource() {
|
||||
return MySqlBinlogSource.<SourceRecord>builder()
|
||||
.hostname(mysqlContainer.getHost())
|
||||
.port(mysqlContainer.getDatabasePort())
|
||||
.database(DATABASE.getDatabaseName())
|
||||
.tableName("products") // monitor table "products"
|
||||
.username(mysqlContainer.getUsername())
|
||||
.password(mysqlContainer.getPassword())
|
||||
.serverId(1234)
|
||||
.deserializer(new ForwardDeserializeSchema())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount) throws Exception {
|
||||
List<T> allRecords = new ArrayList<>();
|
||||
LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
|
||||
while (allRecords.size() < expectedRecordCount) {
|
||||
StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
|
||||
if (record != null) {
|
||||
allRecords.add(record.getValue());
|
||||
} else {
|
||||
throw new RuntimeException("Can't receive " + expectedRecordCount + " elements before timeout.");
|
||||
}
|
||||
}
|
||||
|
||||
return allRecords;
|
||||
}
|
||||
|
||||
private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) throws Exception {
|
||||
final Semaphore semaphore = new Semaphore(0);
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
executor.execute(() -> {
|
||||
synchronized (checkpointLock) {
|
||||
semaphore.release();
|
||||
}
|
||||
});
|
||||
boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
|
||||
executor.shutdownNow();
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a maximum amount of time until the first record is available.
|
||||
*
|
||||
* @param timeout the maximum amount of time to wait; must not be negative
|
||||
* @return {@code true} if records are available,
|
||||
* or {@code false} if the timeout occurred and no records are available
|
||||
*/
|
||||
private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext) throws InterruptedException {
|
||||
long now = System.currentTimeMillis();
|
||||
long stop = now + timeout.toMillis();
|
||||
while (System.currentTimeMillis() < stop) {
|
||||
if (!sourceContext.getCollectedOutputs().isEmpty()) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10); // save CPU
|
||||
}
|
||||
return !sourceContext.getCollectedOutputs().isEmpty();
|
||||
}
|
||||
|
||||
private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
|
||||
setupSource(
|
||||
source,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
true, // enable checkpointing; auto commit should be ignored
|
||||
0,
|
||||
1);
|
||||
}
|
||||
|
||||
private static <T, S1, S2> void setupSource(
|
||||
DebeziumSourceFunction<T> source,
|
||||
boolean isRestored,
|
||||
ListState<S1> restoredOffsetState,
|
||||
ListState<S2> restoredHistoryState,
|
||||
boolean isCheckpointingEnabled,
|
||||
int subtaskIndex,
|
||||
int totalNumSubtasks) throws Exception {
|
||||
|
||||
// run setup procedure in operator life cycle
|
||||
source.setRuntimeContext(new MockStreamingRuntimeContext(isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
|
||||
source.initializeState(new MockFunctionInitializationContext(
|
||||
isRestored,
|
||||
new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
|
||||
source.open(new Configuration());
|
||||
}
|
||||
|
||||
private static class ForwardDeserializeSchema implements DebeziumDeserializationSchema<SourceRecord> {
|
||||
|
||||
private static final long serialVersionUID = 2975058057832211228L;
|
||||
|
||||
@Override
|
||||
public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
|
||||
out.collect(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeInformation<SourceRecord> getProducedType() {
|
||||
return TypeInformation.of(SourceRecord.class);
|
||||
}
|
||||
}
|
||||
private static class MockOperatorStateStore implements OperatorStateStore {
|
||||
|
||||
private final ListState<?> restoredOffsetListState;
|
||||
private final ListState<?> restoredHistoryListState;
|
||||
|
||||
private MockOperatorStateStore(
|
||||
ListState<?> restoredOffsetListState,
|
||||
ListState<?> restoredHistoryListState) {
|
||||
this.restoredOffsetListState = restoredOffsetListState;
|
||||
this.restoredHistoryListState = restoredHistoryListState;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
|
||||
if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) {
|
||||
return (ListState<S>) restoredOffsetListState;
|
||||
} else if (stateDescriptor.getName().equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) {
|
||||
return (ListState<S>) restoredHistoryListState;
|
||||
} else {
|
||||
throw new IllegalStateException("Unknown state.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRegisteredStateNames() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRegisteredBroadcastStateNames() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
private static class MockFunctionInitializationContext implements FunctionInitializationContext {
|
||||
|
||||
private final boolean isRestored;
|
||||
private final OperatorStateStore operatorStateStore;
|
||||
|
||||
private MockFunctionInitializationContext(boolean isRestored, OperatorStateStore operatorStateStore) {
|
||||
this.isRestored = isRestored;
|
||||
this.operatorStateStore = operatorStateStore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRestored() {
|
||||
return isRestored;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperatorStateStore getOperatorStateStore() {
|
||||
return operatorStateStore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyedStateStore getKeyedStateStore() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
private static class BlockingSourceContext<T> extends TestSourceContext<T> {
|
||||
|
||||
private final Semaphore blocker = new Semaphore(0);
|
||||
private final int expectedCount;
|
||||
private int currentCount = 0;
|
||||
|
||||
private BlockingSourceContext(int expectedCount) {
|
||||
this.expectedCount = expectedCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(T t) {
|
||||
super.collect(t);
|
||||
currentCount++;
|
||||
if (currentCount == expectedCount) {
|
||||
try {
|
||||
// block the source to emit records
|
||||
blocker.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final class TestingListState<T> implements ListState<T> {
|
||||
|
||||
private final List<T> list = new ArrayList<>();
|
||||
private boolean clearCalled = false;
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
list.clear();
|
||||
clearCalled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<T> get() throws Exception {
|
||||
return list;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(T value) throws Exception {
|
||||
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
|
||||
list.add(value);
|
||||
}
|
||||
|
||||
public List<T> getList() {
|
||||
return list;
|
||||
}
|
||||
|
||||
boolean isClearCalled() {
|
||||
return clearCalled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(List<T> values) throws Exception {
|
||||
clear();
|
||||
|
||||
addAll(values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAll(List<T> values) throws Exception {
|
||||
if (values != null) {
|
||||
values.forEach(v -> Preconditions.checkNotNull(v, "You cannot add null to a ListState."));
|
||||
|
||||
list.addAll(values);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.connectors.mysql;
|
||||
|
||||
import org.apache.flink.test.util.AbstractTestBase;
|
||||
|
||||
import com.alibaba.ververica.cdc.connectors.mysql.utils.MySqlBinlogContainer;
|
||||
import org.junit.BeforeClass;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.lifecycle.Startables;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Basic class for testing MySQL binlog source, this contains a MySQL container which enables binlog.
|
||||
*/
|
||||
public abstract class MySqlBinlogTestBase extends AbstractTestBase {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogTestBase.class);
|
||||
|
||||
protected static final MySqlBinlogContainer mysqlContainer = (MySqlBinlogContainer) new MySqlBinlogContainer()
|
||||
.withConfigurationOverride("docker/my.cnf")
|
||||
.withSetupSQL("docker/setup.sql")
|
||||
.withDatabaseName("flink-test")
|
||||
.withUsername("flinkuser")
|
||||
.withPassword("flinkpw")
|
||||
.withLogConsumer(new Slf4jLogConsumer(LOG));
|
||||
|
||||
@BeforeClass
|
||||
public static void startContainers() {
|
||||
LOG.info("Starting containers...");
|
||||
Startables.deepStart(Stream.of(mysqlContainer)).join();
|
||||
LOG.info("Containers are started.");
|
||||
}
|
||||
|
||||
protected Connection getJdbcConnection() throws SQLException {
|
||||
return DriverManager.getConnection(
|
||||
mysqlContainer.getJdbcUrl(),
|
||||
mysqlContainer.getUsername(),
|
||||
mysqlContainer.getPassword());
|
||||
}
|
||||
}
|
@ -0,0 +1,216 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.connectors.mysql.utils;
|
||||
|
||||
import io.debezium.data.Envelope;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
public class AssertUtils {
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#CREATE INSERT/CREATE} record.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertInsert(SourceRecord record, boolean keyExpected) {
|
||||
if (keyExpected) {
|
||||
assertNotNull(record.key());
|
||||
assertNotNull(record.keySchema());
|
||||
}
|
||||
else {
|
||||
assertNull(record.key());
|
||||
assertNull(record.keySchema());
|
||||
}
|
||||
|
||||
assertNotNull(record.valueSchema());
|
||||
Struct value = (Struct) record.value();
|
||||
assertNotNull(value);
|
||||
assertEquals(Envelope.Operation.CREATE.code(), value.getString(Envelope.FieldName.OPERATION));
|
||||
assertNotNull(value.get(Envelope.FieldName.AFTER));
|
||||
assertNull(value.get(Envelope.FieldName.BEFORE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#READ READ} record.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertRead(SourceRecord record) {
|
||||
assertNotNull(record.key());
|
||||
assertNotNull(record.keySchema());
|
||||
assertNotNull(record.valueSchema());
|
||||
Struct value = (Struct) record.value();
|
||||
assertNotNull(value);
|
||||
assertEquals(Envelope.Operation.READ.code(), value.getString(Envelope.FieldName.OPERATION));
|
||||
assertNotNull(value.get(Envelope.FieldName.AFTER));
|
||||
assertNull(value.get(Envelope.FieldName.BEFORE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#UPDATE UPDATE} record.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertUpdate(SourceRecord record, boolean keyExpected) {
|
||||
if (keyExpected) {
|
||||
assertNotNull(record.key());
|
||||
assertNotNull(record.keySchema());
|
||||
}
|
||||
else {
|
||||
assertNull(record.key());
|
||||
assertNull(record.keySchema());
|
||||
}
|
||||
assertNotNull(record.valueSchema());
|
||||
Struct value = (Struct) record.value();
|
||||
assertNotNull(value);
|
||||
assertEquals(Envelope.Operation.UPDATE.code(), value.getString(Envelope.FieldName.OPERATION));
|
||||
assertNotNull(value.get(Envelope.FieldName.AFTER));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#DELETE DELETE} record.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertDelete(SourceRecord record, boolean keyExpected) {
|
||||
if (keyExpected) {
|
||||
assertNotNull(record.key());
|
||||
assertNotNull(record.keySchema());
|
||||
}
|
||||
else {
|
||||
assertNull(record.key());
|
||||
assertNull(record.keySchema());
|
||||
}
|
||||
assertNotNull(record.valueSchema());
|
||||
Struct value = (Struct) record.value();
|
||||
assertNotNull(value);
|
||||
assertEquals(Envelope.Operation.DELETE.code(), value.getString(Envelope.FieldName.OPERATION));
|
||||
assertNotNull(value.get(Envelope.FieldName.BEFORE));
|
||||
assertNull(value.get(Envelope.FieldName.AFTER));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a valid tombstone, meaning it has a non-null key and key schema but null
|
||||
* value and value schema.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertTombstone(SourceRecord record) {
|
||||
assertNotNull(record.key());
|
||||
assertNotNull(record.keySchema());
|
||||
assertNull(record.value());
|
||||
assertNull(record.valueSchema());
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} has a valid non-null integer key that matches the expected integer value.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
* @param pkField the single field defining the primary key of the struct; may not be null
|
||||
* @param pk the expected integer value of the primary key in the struct
|
||||
*/
|
||||
public static void hasValidKey(SourceRecord record, String pkField, int pk) {
|
||||
Struct key = (Struct) record.key();
|
||||
assertEquals(pk, key.get(pkField));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#CREATE INSERT/CREATE} record without primary key.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertInsert(SourceRecord record) {
|
||||
assertInsert(record, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#CREATE INSERT/CREATE} record, and that the integer key
|
||||
* matches the expected value.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
* @param pkField the single field defining the primary key of the struct; may not be null
|
||||
* @param pk the expected integer value of the primary key in the struct
|
||||
*/
|
||||
public static void assertInsert(SourceRecord record, String pkField, int pk) {
|
||||
hasValidKey(record, pkField, pk);
|
||||
assertInsert(record, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#CREATE READ} record, and that the integer key
|
||||
* matches the expected value.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
* @param pkField the single field defining the primary key of the struct; may not be null
|
||||
* @param pk the expected integer value of the primary key in the struct
|
||||
*/
|
||||
public static void assertRead(SourceRecord record, String pkField, int pk) {
|
||||
hasValidKey(record, pkField, pk);
|
||||
assertRead(record);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#UPDATE UPDATE} record without PK.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertUpdate(SourceRecord record) {
|
||||
assertUpdate(record, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#UPDATE UPDATE} record, and that the integer key
|
||||
* matches the expected value.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
* @param pkField the single field defining the primary key of the struct; may not be null
|
||||
* @param pk the expected integer value of the primary key in the struct
|
||||
*/
|
||||
public static void assertUpdate(SourceRecord record, String pkField, int pk) {
|
||||
hasValidKey(record, pkField, pk);
|
||||
assertUpdate(record, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#DELETE DELETE} record without PK.
|
||||
* matches the expected value.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
*/
|
||||
public static void assertDelete(SourceRecord record) {
|
||||
assertDelete(record, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the given {@link SourceRecord} is a {@link Envelope.Operation#DELETE DELETE} record, and that the integer key
|
||||
* matches the expected value.
|
||||
*
|
||||
* @param record the source record; may not be null
|
||||
* @param pkField the single field defining the primary key of the struct; may not be null
|
||||
* @param pk the expected integer value of the primary key in the struct
|
||||
*/
|
||||
public static void assertDelete(SourceRecord record, String pkField, int pk) {
|
||||
hasValidKey(record, pkField, pk);
|
||||
assertDelete(record, true);
|
||||
}
|
||||
}
|
@ -0,0 +1,176 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.connectors.mysql.utils;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.testcontainers.containers.ContainerLaunchException;
|
||||
import org.testcontainers.containers.JdbcDatabaseContainer;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Docker container for MySQL. The difference between this class and
|
||||
* {@link org.testcontainers.containers.MySQLContainer} is that TC MySQLContainer has problems
|
||||
* when overriding mysql conf file, i.e. my.cnf.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class MySqlBinlogContainer extends JdbcDatabaseContainer {
|
||||
|
||||
public static final String IMAGE = "mysql";
|
||||
public static final String DEFAULT_TAG = "5.7";
|
||||
public static final Integer MYSQL_PORT = 3306;
|
||||
|
||||
private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
|
||||
private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
|
||||
private static final String MYSQL_ROOT_USER = "root";
|
||||
|
||||
private String databaseName = "test";
|
||||
private String username = "test";
|
||||
private String password = "test";
|
||||
|
||||
public MySqlBinlogContainer() {
|
||||
super(IMAGE + ":" + DEFAULT_TAG);
|
||||
addExposedPort(MYSQL_PORT);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
protected Set<Integer> getLivenessCheckPorts() {
|
||||
return new HashSet<>(getMappedPort(MYSQL_PORT));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
optionallyMapResourceParameterAsVolume(
|
||||
MY_CNF_CONFIG_OVERRIDE_PARAM_NAME,
|
||||
"/etc/mysql/",
|
||||
"mysql-default-conf");
|
||||
|
||||
if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
|
||||
optionallyMapResourceParameterAsVolume(
|
||||
SETUP_SQL_PARAM_NAME,
|
||||
"/docker-entrypoint-initdb.d/",
|
||||
"N/A");
|
||||
}
|
||||
|
||||
addEnv("MYSQL_DATABASE", databaseName);
|
||||
addEnv("MYSQL_USER", username);
|
||||
if (password != null && !password.isEmpty()) {
|
||||
addEnv("MYSQL_PASSWORD", password);
|
||||
addEnv("MYSQL_ROOT_PASSWORD", password);
|
||||
} else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
|
||||
addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
|
||||
} else {
|
||||
throw new ContainerLaunchException("Empty password can be used only with the root user");
|
||||
}
|
||||
setStartupAttempts(3);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDriverClassName() {
|
||||
try {
|
||||
Class.forName("com.mysql.cj.jdbc.Driver");
|
||||
return "com.mysql.cj.jdbc.Driver";
|
||||
} catch (ClassNotFoundException e) {
|
||||
return "com.mysql.jdbc.Driver";
|
||||
}
|
||||
}
|
||||
|
||||
public String getJdbcUrl(String databaseName) {
|
||||
String additionalUrlParams = constructUrlParameters("?", "&");
|
||||
return "jdbc:mysql://" + getHost() + ":" + getDatabasePort() +
|
||||
"/" + databaseName + additionalUrlParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getJdbcUrl() {
|
||||
return getJdbcUrl(databaseName);
|
||||
}
|
||||
|
||||
public int getDatabasePort() {
|
||||
return getMappedPort(MYSQL_PORT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String constructUrlForConnection(String queryString) {
|
||||
String url = super.constructUrlForConnection(queryString);
|
||||
|
||||
if (! url.contains("useSSL=")) {
|
||||
String separator = url.contains("?") ? "&" : "?";
|
||||
url = url + separator + "useSSL=false";
|
||||
}
|
||||
|
||||
if (! url.contains("allowPublicKeyRetrieval=")) {
|
||||
url = url + "&allowPublicKeyRetrieval=true";
|
||||
}
|
||||
|
||||
return url;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDatabaseName() {
|
||||
return databaseName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getTestQueryString() {
|
||||
return "SELECT 1";
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public MySqlBinlogContainer withConfigurationOverride(String s) {
|
||||
parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public MySqlBinlogContainer withSetupSQL(String sqlPath) {
|
||||
parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MySqlBinlogContainer withDatabaseName(final String databaseName) {
|
||||
this.databaseName = databaseName;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MySqlBinlogContainer withUsername(final String username) {
|
||||
this.username = username;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MySqlBinlogContainer withPassword(final String password) {
|
||||
this.password = password;
|
||||
return this;
|
||||
}
|
||||
}
|
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.ververica.cdc.connectors.mysql.utils;
|
||||
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
/**
|
||||
* Create and populate a unique instance of a MySQL database for each run of JUnit test. A user of class
|
||||
* needs to provide a logical name for Debezium and database name. It is expected that there is a init file
|
||||
* in <code>src/test/resources/ddl/<database_name>.sql</code>.
|
||||
* The database name is enriched with a unique suffix that guarantees complete isolation between runs
|
||||
* <code><database_name>_<suffix></code>
|
||||
*
|
||||
* <p>This class is inspired from Debezium project.
|
||||
*
|
||||
*/
|
||||
public class UniqueDatabase {
|
||||
|
||||
private static final String[] CREATE_DATABASE_DDL = new String[]{
|
||||
"CREATE DATABASE $DBNAME$;",
|
||||
"USE $DBNAME$;"
|
||||
};
|
||||
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
|
||||
|
||||
private final MySqlBinlogContainer container;
|
||||
private final String databaseName;
|
||||
private final String templateName;
|
||||
private final String username;
|
||||
private final String password;
|
||||
|
||||
public UniqueDatabase(MySqlBinlogContainer container, String databaseName, String username, String password) {
|
||||
this(container, databaseName, Integer.toUnsignedString(new Random().nextInt(), 36), username, password);
|
||||
}
|
||||
|
||||
private UniqueDatabase(MySqlBinlogContainer container, String databaseName, final String identifier, String username, String password) {
|
||||
this.container = container;
|
||||
this.databaseName = databaseName + "_" + identifier;
|
||||
this.templateName = databaseName;
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public String getDatabaseName() {
|
||||
return databaseName;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Fully qualified table name <code><databaseName>.<tableName></code>
|
||||
*/
|
||||
public String qualifiedTableName(final String tableName) {
|
||||
return String.format("%s.%s", databaseName, tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the database and populates it with initialization SQL script.
|
||||
*/
|
||||
public void createAndInitialize() {
|
||||
final String ddlFile = String.format("ddl/%s.sql", templateName);
|
||||
final URL ddlTestFile = UniqueDatabase.class.getClassLoader().getResource(ddlFile);
|
||||
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
|
||||
try {
|
||||
try (Connection connection = DriverManager.getConnection(container.getJdbcUrl(), username, password);
|
||||
Statement statement = connection.createStatement()) {
|
||||
final List<String> statements = Arrays.stream(
|
||||
Stream.concat(
|
||||
Arrays.stream(CREATE_DATABASE_DDL),
|
||||
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream())
|
||||
.map(String::trim)
|
||||
.filter(x -> !x.startsWith("--") && !x.isEmpty())
|
||||
.map(x -> {
|
||||
final Matcher m = COMMENT_PATTERN.matcher(x);
|
||||
return m.matches() ? m.group(1) : x;
|
||||
})
|
||||
.map(this::convertSQL)
|
||||
.collect(Collectors.joining("\n")).split(";"))
|
||||
.map(x -> x.replace("$$", ";"))
|
||||
.collect(Collectors.toList());
|
||||
for (String stmt : statements) {
|
||||
statement.execute(stmt);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (final Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Connection getJdbcConnection() throws SQLException {
|
||||
return DriverManager.getConnection(container.getJdbcUrl(databaseName), username, password);
|
||||
}
|
||||
|
||||
private String convertSQL(final String sql) {
|
||||
return sql.replace("$DBNAME$", databaseName);
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: inventory
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
|
||||
-- Create and populate our products using a single insert with many rows
|
||||
CREATE TABLE products (
|
||||
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
description VARCHAR(512),
|
||||
weight FLOAT
|
||||
);
|
||||
ALTER TABLE products AUTO_INCREMENT = 101;
|
||||
|
||||
INSERT INTO products
|
||||
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
|
||||
(default,"car battery","12V car battery",8.1),
|
||||
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
|
||||
(default,"hammer","12oz carpenter's hammer",0.75),
|
||||
(default,"hammer","14oz carpenter's hammer",0.875),
|
||||
(default,"hammer","16oz carpenter's hammer",1.0),
|
||||
(default,"rocks","box of assorted rocks",5.3),
|
||||
(default,"jacket","water resistent black wind breaker",0.1),
|
||||
(default,"spare tire","24 inch spare tire",22.2);
|
||||
|
||||
-- Create and populate the products on hand using multiple inserts
|
||||
CREATE TABLE products_on_hand (
|
||||
product_id INTEGER NOT NULL PRIMARY KEY,
|
||||
quantity INTEGER NOT NULL,
|
||||
FOREIGN KEY (product_id) REFERENCES products(id)
|
||||
);
|
||||
|
||||
INSERT INTO products_on_hand VALUES (101,3);
|
||||
INSERT INTO products_on_hand VALUES (102,8);
|
||||
INSERT INTO products_on_hand VALUES (103,18);
|
||||
INSERT INTO products_on_hand VALUES (104,4);
|
||||
INSERT INTO products_on_hand VALUES (105,5);
|
||||
INSERT INTO products_on_hand VALUES (106,0);
|
||||
INSERT INTO products_on_hand VALUES (107,44);
|
||||
INSERT INTO products_on_hand VALUES (108,2);
|
||||
INSERT INTO products_on_hand VALUES (109,5);
|
||||
|
||||
-- Create some customers ...
|
||||
CREATE TABLE customers (
|
||||
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
first_name VARCHAR(255) NOT NULL,
|
||||
last_name VARCHAR(255) NOT NULL,
|
||||
email VARCHAR(255) NOT NULL UNIQUE KEY
|
||||
) AUTO_INCREMENT=1001;
|
||||
|
||||
|
||||
INSERT INTO customers
|
||||
VALUES (default,"Sally","Thomas","sally.thomas@acme.com"),
|
||||
(default,"George","Bailey","gbailey@foobar.com"),
|
||||
(default,"Edward","Walker","ed@walker.com"),
|
||||
(default,"Anne","Kretchmar","annek@noanswer.org");
|
||||
|
||||
-- Create some very simple orders
|
||||
CREATE TABLE orders (
|
||||
order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
order_date DATE NOT NULL,
|
||||
purchaser INTEGER NOT NULL,
|
||||
quantity INTEGER NOT NULL,
|
||||
product_id INTEGER NOT NULL,
|
||||
FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
|
||||
FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
|
||||
) AUTO_INCREMENT = 10001;
|
||||
|
||||
INSERT INTO orders
|
||||
VALUES (default, '2016-01-16', 1001, 1, 102),
|
||||
(default, '2016-01-17', 1002, 2, 105),
|
||||
(default, '2016-02-18', 1004, 3, 109),
|
||||
(default, '2016-02-19', 1002, 2, 106),
|
||||
(default, '16-02-21', 1003, 1, 107);
|
@ -0,0 +1,46 @@
|
||||
# For advice on how to change settings please see
|
||||
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
|
||||
|
||||
[mysqld]
|
||||
#
|
||||
# Remove leading # and set to the amount of RAM for the most important data
|
||||
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
|
||||
# innodb_buffer_pool_size = 128M
|
||||
#
|
||||
# Remove leading # to turn on a very important data integrity option: logging
|
||||
# changes to the binary log between backups.
|
||||
# log_bin
|
||||
#
|
||||
# Remove leading # to set options mainly useful for reporting servers.
|
||||
# The server defaults are faster for transactions and fast SELECTs.
|
||||
# Adjust sizes as needed, experiment to find the optimal values.
|
||||
# join_buffer_size = 128M
|
||||
# sort_buffer_size = 2M
|
||||
# read_rnd_buffer_size = 2M
|
||||
skip-host-cache
|
||||
skip-name-resolve
|
||||
#datadir=/var/lib/mysql
|
||||
#socket=/var/lib/mysql/mysql.sock
|
||||
#secure-file-priv=/var/lib/mysql-files
|
||||
user=mysql
|
||||
|
||||
# Disabling symbolic-links is recommended to prevent assorted security risks
|
||||
symbolic-links=0
|
||||
|
||||
#log-error=/var/log/mysqld.log
|
||||
#pid-file=/var/run/mysqld/mysqld.pid
|
||||
|
||||
# ----------------------------------------------
|
||||
# Enable the binlog for replication & CDC
|
||||
# ----------------------------------------------
|
||||
|
||||
# Enable binary replication log and set the prefix, expiration, and log format.
|
||||
# The prefix is arbitrary, expiration can be short for integration tests but would
|
||||
# be longer on a production system. Row-level info is required for ingest to work.
|
||||
# Server ID is required, but this will vary on production systems
|
||||
server-id = 223344
|
||||
log_bin = mysql-bin
|
||||
expire_logs_days = 1
|
||||
binlog_format = row
|
||||
|
||||
|
@ -0,0 +1,15 @@
|
||||
-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
|
||||
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
|
||||
-- However, in this database we'll grant 2 users different privileges:
|
||||
--
|
||||
-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
|
||||
-- 2) 'mysqluser' - all privileges
|
||||
--
|
||||
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
|
||||
CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
|
||||
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
|
||||
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: emptydb
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
CREATE DATABASE emptydb;
|
Loading…
Reference in New Issue