[mysql] Support MySQL 5.6 (#396)

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/822/head
taox 3 years ago committed by GitHub
parent 8368bfb4de
commit c584ecf04c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -104,13 +104,13 @@ public class MySqlValidator implements Validator {
} else if (versionNumbers[0] < 5) {
isSatisfied = false;
} else {
isSatisfied = versionNumbers[1] >= 7;
isSatisfied = versionNumbers[1] >= 6;
}
if (!isSatisfied) {
throw new ValidationException(
String.format(
"Currently Flink MySql CDC connector only supports MySql "
+ "whose version is larger or equal to 5.7, but actual is %s.%s.",
+ "whose version is larger or equal to 5.6, but actual is %s.%s.",
versionNumbers[0], versionNumbers[1]));
}
}

@ -23,11 +23,13 @@ import org.apache.flink.table.api.ValidationException;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@ -51,6 +53,8 @@ import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.basicSourceBuilder;
import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.setupSource;
import static com.ververica.cdc.connectors.mysql.testutils.MySqlVersion.V5_5;
import static com.ververica.cdc.connectors.mysql.testutils.MySqlVersion.V5_7;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -90,12 +94,13 @@ public class MySqlValidatorTest {
tempFolder.delete();
}
@Ignore("The jdbc driver used in this module cannot connect to MySQL 5.5")
@Test
public void testValidateVersion() {
String version = "5.6";
MySqlVersion version = V5_5;
String message =
String.format(
"Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is %s.",
"Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.6, but actual is %s.",
version);
doValidate(version, "docker/server/my.cnf", message);
}
@ -109,7 +114,7 @@ public class MySqlValidatorTest {
+ "connector to work properly. Change the MySQL configuration to use a binlog_format=ROW "
+ "and restart the connector.",
mode);
doValidate("5.7", buildMySqlConfigFile("[mysqld]\nbinlog_format = " + mode), message);
doValidate(V5_7, buildMySqlConfigFile("[mysqld]\nbinlog_format = " + mode), message);
}
@Test
@ -122,13 +127,14 @@ public class MySqlValidatorTest {
+ "binlog_row_image=FULL and restart the connector.",
mode);
doValidate(
"5.7",
V5_7,
buildMySqlConfigFile("[mysqld]\nbinlog_format = ROW\nbinlog_row_image = " + mode),
message);
}
private void doValidate(String tag, String configPath, String exceptionMessage) {
MySqlContainer container = new MySqlContainer(tag).withConfigurationOverride(configPath);
private void doValidate(MySqlVersion version, String configPath, String exceptionMessage) {
MySqlContainer container =
new MySqlContainer(version).withConfigurationOverride(configPath);
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(container)).join();

@ -0,0 +1,279 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.mysql.MySqlValidatorTest;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder;
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
/** Integration tests to check mysql-cdc works well with different MySQL server version. */
public class MySqlCompatibilityITCase {
private static final Logger LOG = LoggerFactory.getLogger(MySqlCompatibilityITCase.class);
private static TemporaryFolder tempFolder;
private static File resourceFolder;
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
@Before
public void setup() throws Exception {
resourceFolder =
Paths.get(
Objects.requireNonNull(
MySqlValidatorTest.class
.getClassLoader()
.getResource("."))
.toURI())
.toFile();
tempFolder = new TemporaryFolder(resourceFolder);
tempFolder.create();
env.setParallelism(4);
env.enableCheckpointing(200);
}
@Test
public void testMySqlV56() throws Exception {
testDifferentMySqlVersion(MySqlVersion.V5_6, false);
}
@Test
public void testMySqlV56WithGtidModeOn() throws Exception {
testDifferentMySqlVersion(MySqlVersion.V5_6, true);
}
@Test
public void testMySqlV57() throws Exception {
testDifferentMySqlVersion(MySqlVersion.V5_7, false);
}
@Test
public void testMySqlV57WithGtidModeOn() throws Exception {
testDifferentMySqlVersion(MySqlVersion.V5_7, true);
}
@Test
public void testMySqlV8() throws Exception {
testDifferentMySqlVersion(MySqlVersion.V8_0, false);
}
@Test
public void testMySqlV8WithGtidModeOn() throws Exception {
testDifferentMySqlVersion(MySqlVersion.V8_0, true);
}
private void testDifferentMySqlVersion(MySqlVersion version, boolean enableGtid)
throws Exception {
final MySqlContainer mySqlContainer =
(MySqlContainer)
new MySqlContainer(version)
.withConfigurationOverride(
buildMySqlConfigWithTimezone(version, enableGtid))
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(mySqlContainer)).join();
LOG.info("Containers are started.");
UniqueDatabase testDatabase =
new UniqueDatabase(mySqlContainer, "inventory", "mysqluser", "mysqlpw");
testDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE products ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'server-id' = '%s'"
+ ")",
mySqlContainer.getHost(),
mySqlContainer.getDatabasePort(),
testDatabase.getUsername(),
testDatabase.getPassword(),
testDatabase.getDatabaseName(),
"products",
getServerId());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql("SELECT `id`, name, description, weight FROM products");
CloseableIterator<Row> iterator = result.collect();
String[] expectedSnapshot =
new String[] {
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
"+I[102, car battery, 12V car battery, 8.100]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
"+I[106, hammer, 16oz carpenter's hammer, 1.000]",
"+I[107, rocks, box of assorted rocks, 5.300]",
"+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]"
};
assertEqualsInAnyOrder(
Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length));
try (Connection connection = testDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM products WHERE id=111;");
}
String[] expectedBinlog =
new String[] {
"-U[106, hammer, 16oz carpenter's hammer, 1.000]",
"+U[106, hammer, 18oz carpenter hammer, 1.000]",
"-U[107, rocks, box of assorted rocks, 5.300]",
"+U[107, rocks, box of assorted rocks, 5.100]",
"+I[110, jacket, water resistent white wind breaker, 0.200]",
"+I[111, scooter, Big 2-wheel scooter , 5.180]",
"-U[110, jacket, water resistent white wind breaker, 0.200]",
"+U[110, jacket, new water resistent white wind breaker, 0.500]",
"-U[111, scooter, Big 2-wheel scooter , 5.180]",
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertEqualsInOrder(
Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length));
result.getJobClient().get().cancel().get();
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String buildMySqlConfigWithTimezone(MySqlVersion version, boolean enableGtid) {
try {
File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));
StringBuilder mysqlConfBuilder = new StringBuilder();
mysqlConfBuilder.append(
"[mysqld]\n"
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n");
if (!enableGtid) {
mysqlConfBuilder.append("gtid-mode = OFF\n");
} else {
mysqlConfBuilder.append("gtid-mode = ON\n");
mysqlConfBuilder.append("enforce-gtid-consistency = 1\n");
// see
// https://dev.mysql.com/doc/refman/5.7/en/replication-options-gtids.html#sysvar_gtid_mode
if (version == MySqlVersion.V5_6 || version == MySqlVersion.V5_7) {
mysqlConfBuilder.append("log-slave-updates = ON\n");
}
}
if (version == MySqlVersion.V8_0) {
mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
}
Files.write(
cnf,
Collections.singleton(mysqlConfBuilder.toString()),
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
} catch (Exception e) {
throw new RuntimeException("Failed to create my.cnf file.", e);
}
}
}

@ -20,6 +20,7 @@ package com.ververica.cdc.connectors.mysql.testutils;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.HashSet;
import java.util.Set;
@ -33,7 +34,6 @@ import java.util.Set;
public class MySqlContainer extends JdbcDatabaseContainer {
public static final String IMAGE = "mysql";
public static final String DEFAULT_TAG = "5.7";
public static final Integer MYSQL_PORT = 3306;
private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
@ -45,11 +45,11 @@ public class MySqlContainer extends JdbcDatabaseContainer {
private String password = "test";
public MySqlContainer() {
this(DEFAULT_TAG);
this(MySqlVersion.V5_7);
}
public MySqlContainer(String tag) {
super(IMAGE + ":" + tag);
public MySqlContainer(MySqlVersion version) {
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
addExposedPort(MYSQL_PORT);
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.testutils;
/** MySql version enum. */
public enum MySqlVersion {
V5_5("5.5"),
V5_6("5.6"),
V5_7("5.7"),
V8_0("8.0");
private String version;
MySqlVersion(String version) {
this.version = version;
}
public String getVersion() {
return version;
}
@Override
public String toString() {
return "MySqlVersion{" + "version='" + version + '\'' + '}';
}
}
Loading…
Cancel
Save