[mysql][ploardbx] Support all charsets in column level (#1166)

This closes #1166.
pull/1468/head
Hang Ruan 3 years ago committed by Leonard Xu
parent 3d255fca83
commit 7c9b30276d

@ -305,14 +305,6 @@ public class MySqlSnapshotSplitReadTask
// We thus need to use getObject() to identify if the value was provided and if yes then
// read it again to get correct scale
return rs.getObject(fieldNo) == null ? null : rs.getInt(fieldNo);
}
// DBZ-2673
// It is necessary to check the type names as types like ENUM and SET are
// also reported as JDBC type char
else if ("CHAR".equals(actualColumn.typeName())
|| "VARCHAR".equals(actualColumn.typeName())
|| "TEXT".equals(actualColumn.typeName())) {
return rs.getBytes(fieldNo);
} else {
return rs.getObject(fieldNo);
}

@ -85,6 +85,9 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
private final UniqueDatabase userDatabase2 =
new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, TEST_PASSWORD);
private final UniqueDatabase charsetTestDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =

@ -0,0 +1,416 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/** Test supporting different column charsets for MySQL Table source. */
@RunWith(Parameterized.class)
public class MysqlConnectorCharsetITCase extends MySqlSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
private static final UniqueDatabase charsetTestDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
private final String testName;
private final String[] snapshotExpected;
private final String[] binlogExpected;
public MysqlConnectorCharsetITCase(
String testName, String[] snapshotExpected, String[] binlogExpected) {
this.testName = testName;
this.snapshotExpected = snapshotExpected;
this.binlogExpected = binlogExpected;
}
@Parameterized.Parameters(name = "Test column charset: {0}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {
"ucs2_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
new String[] {
"-D[1, 测试数据]",
"-D[2, Craig Marshall]",
"-D[3, 另一个测试数据]",
"+I[11, 测试数据]",
"+I[12, Craig Marshall]",
"+I[13, 另一个测试数据]"
}
},
new Object[] {
"utf8_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
new String[] {
"-D[1, 测试数据]",
"-D[2, Craig Marshall]",
"-D[3, 另一个测试数据]",
"+I[11, 测试数据]",
"+I[12, Craig Marshall]",
"+I[13, 另一个测试数据]"
}
},
new Object[] {
"ascii_test",
new String[] {"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"},
new String[] {
"-D[1, ascii test!?]",
"-D[2, Craig Marshall]",
"-D[3, {test}]",
"+I[11, ascii test!?]",
"+I[12, Craig Marshall]",
"+I[13, {test}]"
}
},
new Object[] {
"sjis_test",
new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"},
new String[] {
"-D[1, ひびぴ]",
"-D[2, Craig Marshall]",
"-D[3, フブプ]",
"+I[11, ひびぴ]",
"+I[12, Craig Marshall]",
"+I[13, フブプ]"
}
},
new Object[] {
"gbk_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
new String[] {
"-D[1, 测试数据]",
"-D[2, Craig Marshall]",
"-D[3, 另一个测试数据]",
"+I[11, 测试数据]",
"+I[12, Craig Marshall]",
"+I[13, 另一个测试数据]"
}
},
new Object[] {
"cp932_test",
new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"},
new String[] {
"-D[1, ひびぴ]",
"-D[2, Craig Marshall]",
"-D[3, フブプ]",
"+I[11, ひびぴ]",
"+I[12, Craig Marshall]",
"+I[13, フブプ]"
}
},
new Object[] {
"gb2312_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
new String[] {
"-D[1, 测试数据]",
"-D[2, Craig Marshall]",
"-D[3, 另一个测试数据]",
"+I[11, 测试数据]",
"+I[12, Craig Marshall]",
"+I[13, 另一个测试数据]"
}
},
new Object[] {
"ujis_test",
new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"},
new String[] {
"-D[1, ひびぴ]",
"-D[2, Craig Marshall]",
"-D[3, フブプ]",
"+I[11, ひびぴ]",
"+I[12, Craig Marshall]",
"+I[13, フブプ]"
}
},
new Object[] {
"euckr_test",
new String[] {"+I[1, 죠주쥬]", "+I[2, Craig Marshall]", "+I[3, 한국어]"},
new String[] {
"-D[1, 죠주쥬]",
"-D[2, Craig Marshall]",
"-D[3, 한국어]",
"+I[11, 죠주쥬]",
"+I[12, Craig Marshall]",
"+I[13, 한국어]"
}
},
new Object[] {
"latin1_test",
new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"},
new String[] {
"-D[1, ÀÆÉ]",
"-D[2, Craig Marshall]",
"-D[3, Üæû]",
"+I[11, ÀÆÉ]",
"+I[12, Craig Marshall]",
"+I[13, Üæû]"
}
},
new Object[] {
"latin2_test",
new String[] {"+I[1, ÓÔŐÖ]", "+I[2, Craig Marshall]", "+I[3, ŠŞŤŹ]"},
new String[] {
"-D[1, ÓÔŐÖ]",
"-D[2, Craig Marshall]",
"-D[3, ŠŞŤŹ]",
"+I[11, ÓÔŐÖ]",
"+I[12, Craig Marshall]",
"+I[13, ŠŞŤŹ]"
}
},
new Object[] {
"greek_test",
new String[] {"+I[1, αβγδε]", "+I[2, Craig Marshall]", "+I[3, θικλ]"},
new String[] {
"-D[1, αβγδε]",
"-D[2, Craig Marshall]",
"-D[3, θικλ]",
"+I[11, αβγδε]",
"+I[12, Craig Marshall]",
"+I[13, θικλ]"
}
},
new Object[] {
"hebrew_test",
new String[] {"+I[1, בבקשה]", "+I[2, Craig Marshall]", "+I[3, שרפה]"},
new String[] {
"-D[1, בבקשה]",
"-D[2, Craig Marshall]",
"-D[3, שרפה]",
"+I[11, בבקשה]",
"+I[12, Craig Marshall]",
"+I[13, שרפה]"
}
},
new Object[] {
"cp866_test",
new String[] {"+I[1, твой]", "+I[2, Craig Marshall]", "+I[3, любой]"},
new String[] {
"-D[1, твой]",
"-D[2, Craig Marshall]",
"-D[3, любой]",
"+I[11, твой]",
"+I[12, Craig Marshall]",
"+I[13, любой]"
}
},
new Object[] {
"tis620_test",
new String[] {"+I[1, ภาษาไทย]", "+I[2, Craig Marshall]", "+I[3, ฆงจฉ]"},
new String[] {
"-D[1, ภาษาไทย]",
"-D[2, Craig Marshall]",
"-D[3, ฆงจฉ]",
"+I[11, ภาษาไทย]",
"+I[12, Craig Marshall]",
"+I[13, ฆงจฉ]"
}
},
new Object[] {
"cp1250_test",
new String[] {"+I[1, ÓÔŐÖ]", "+I[2, Craig Marshall]", "+I[3, ŠŞŤŹ]"},
new String[] {
"-D[1, ÓÔŐÖ]",
"-D[2, Craig Marshall]",
"-D[3, ŠŞŤŹ]",
"+I[11, ÓÔŐÖ]",
"+I[12, Craig Marshall]",
"+I[13, ŠŞŤŹ]"
}
},
new Object[] {
"cp1251_test",
new String[] {"+I[1, твой]", "+I[2, Craig Marshall]", "+I[3, любой]"},
new String[] {
"-D[1, твой]",
"-D[2, Craig Marshall]",
"-D[3, любой]",
"+I[11, твой]",
"+I[12, Craig Marshall]",
"+I[13, любой]"
}
},
new Object[] {
"cp1257_test",
new String[] {
"+I[1, piedzimst brīvi]", "+I[2, Craig Marshall]", "+I[3, apveltīti ar saprātu]"
},
new String[] {
"-D[1, piedzimst brīvi]", "-D[2, Craig Marshall]",
"-D[3, apveltīti ar saprātu]",
"+I[11, piedzimst brīvi]", "+I[12, Craig Marshall]",
"+I[13, apveltīti ar saprātu]"
}
},
new Object[] {
"macroman_test",
new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"},
new String[] {
"-D[1, ÀÆÉ]",
"-D[2, Craig Marshall]",
"-D[3, Üæû]",
"+I[11, ÀÆÉ]",
"+I[12, Craig Marshall]",
"+I[13, Üæû]"
}
},
new Object[] {
"macce_test",
new String[] {"+I[1, ÓÔŐÖ]", "+I[2, Craig Marshall]", "+I[3, ŮÚŰÜ]"},
new String[] {
"-D[1, ÓÔŐÖ]",
"-D[2, Craig Marshall]",
"-D[3, ŮÚŰÜ]",
"+I[11, ÓÔŐÖ]",
"+I[12, Craig Marshall]",
"+I[13, ŮÚŰÜ]"
}
},
new Object[] {
"big5_test",
new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"},
new String[] {
"-D[1, 大五]",
"-D[2, Craig Marshall]",
"-D[3, 丹店]",
"+I[11, 大五]",
"+I[12, Craig Marshall]",
"+I[13, 丹店]"
}
}
};
}
@BeforeClass
public static void beforeClass() {
charsetTestDatabase.createAndInitialize();
}
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200);
}
@Test
public void testCharset() throws Exception {
String sourceDDL =
String.format(
"CREATE TABLE %s (\n"
+ " table_id BIGINT,\n"
+ " table_name STRING,\n"
+ " primary key(table_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
testName,
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
charsetTestDatabase.getUsername(),
charsetTestDatabase.getPassword(),
charsetTestDatabase.getDatabaseName(),
testName,
true,
getServerId(),
4);
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", testName));
// test snapshot phase
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(
Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length));
// test binlog phase
try (Connection connection = charsetTestDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("UPDATE %s SET table_id = table_id + 10;", testName));
}
assertEqualsInAnyOrder(
Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length));
result.getJobClient().ifPresent(client -> client.cancel());
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
}
}
}

@ -0,0 +1,207 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.polardbx;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
/** Test supporting different column charsets for Polardbx. */
@RunWith(Parameterized.class)
public class PolardbxCharsetITCase extends PolardbxSourceTestBase {
private static final String DATABASE = "charset_test";
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
private final String testName;
private final String[] snapshotExpected;
private final String[] binlogExpected;
public PolardbxCharsetITCase(
String testName, String[] snapshotExpected, String[] binlogExpected) {
this.testName = testName;
this.snapshotExpected = snapshotExpected;
this.binlogExpected = binlogExpected;
}
@Parameterized.Parameters(name = "Test column charset: {0}")
public static Object[] parameters() {
return new Object[][] {
new Object[] {
"utf8_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
new String[] {
"-D[1, 测试数据]",
"-D[2, Craig Marshall]",
"-D[3, 另一个测试数据]",
"+I[11, 测试数据]",
"+I[12, Craig Marshall]",
"+I[13, 另一个测试数据]"
}
},
new Object[] {
"ascii_test",
new String[] {"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"},
new String[] {
"-D[1, ascii test!?]",
"-D[2, Craig Marshall]",
"-D[3, {test}]",
"+I[11, ascii test!?]",
"+I[12, Craig Marshall]",
"+I[13, {test}]"
}
},
new Object[] {
"gbk_test",
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
new String[] {
"-D[1, 测试数据]",
"-D[2, Craig Marshall]",
"-D[3, 另一个测试数据]",
"+I[11, 测试数据]",
"+I[12, Craig Marshall]",
"+I[13, 另一个测试数据]"
}
},
new Object[] {
"latin1_test",
new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"},
new String[] {
"-D[1, ÀÆÉ]",
"-D[2, Craig Marshall]",
"-D[3, Üæû]",
"+I[11, ÀÆÉ]",
"+I[12, Craig Marshall]",
"+I[13, Üæû]"
}
},
new Object[] {
"big5_test",
new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"},
new String[] {
"-D[1, 大五]",
"-D[2, Craig Marshall]",
"-D[3, 丹店]",
"+I[11, 大五]",
"+I[12, Craig Marshall]",
"+I[13, 丹店]"
}
}
};
}
@BeforeClass
public static void beforeClass() throws InterruptedException {
initializePolardbxTables(
DATABASE,
s ->
!StringUtils.isNullOrWhitespaceOnly(s)
&& (s.contains("utf8_test")
|| s.contains("latin1_test")
|| s.contains("gbk_test")
|| s.contains("big5_test")
|| s.contains("ascii_test")));
}
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(4);
env.enableCheckpointing(200);
}
@Test
public void testCharset() throws Exception {
String sourceDDL =
String.format(
"CREATE TABLE %s (\n"
+ " table_id BIGINT,\n"
+ " table_name STRING,\n"
+ " primary key(table_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
testName,
HOST_NAME,
PORT,
USER_NAME,
PASSWORD,
DATABASE,
testName,
true,
getServerId(),
4);
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", testName));
// test snapshot phase
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
assertEqualsInAnyOrder(
Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length));
// test binlog phase
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"/*TDDL:FORBID_EXECUTE_DML_ALL=FALSE*/UPDATE %s.%s SET table_id = table_id + 10;",
DATABASE, testName));
}
assertEqualsInAnyOrder(
Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length));
result.getJobClient().ifPresent(client -> client.cancel());
}
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
while (!iterator.hasNext()) {
Thread.sleep(100);
}
}
}

@ -21,145 +21,32 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Database Polardbx supported the mysql protocol, but there are some different features in ddl. So
* we added fallback in {@link MySqlSchema} when parsing ddl failed and provided these cases to
* test.
*/
public class PolardbxSourceITCase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceITCase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
private static final Integer PORT = 8527;
private static final String HOST_NAME = "127.0.0.1";
private static final String USER_NAME = "polardbx_root";
private static final String PASSWORD = "123456";
public class PolardbxSourceITCase extends PolardbxSourceTestBase {
private static final String DATABASE = "polardbx_ddl_test";
private static final String IMAGE_VERSION = "2.1.0";
private static final DockerImageName POLARDBX_IMAGE =
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);
public static final GenericContainer POLARDBX_CONTAINER =
new GenericContainer<>(POLARDBX_IMAGE)
.withExposedPorts(PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withStartupTimeout(Duration.ofMinutes(3))
.withCreateContainerCmdModifier(
c ->
c.withPortBindings(
new PortBinding(
Ports.Binding.bindPort(PORT),
new ExposedPort(PORT))));
@BeforeClass
public static void startContainers() throws InterruptedException {
// no need to start container when the port 8527 is listening
if (!checkConnection()) {
LOG.info("Polardbx connection is not valid, so try to start containers...");
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");
// here should wait 10s that make sure the polardbx is ready
Thread.sleep(10 * 1000);
}
initializePolardbxTables(DATABASE);
}
private static String getJdbcUrl() {
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
}
protected static Connection getJdbcConnection() throws SQLException {
String jdbcUrl = getJdbcUrl();
LOG.info("jdbcUrl is :" + jdbcUrl);
return DriverManager.getConnection(jdbcUrl, USER_NAME, PASSWORD);
}
private static Boolean checkConnection() {
LOG.info("check polardbx connection validation...");
try {
Connection connection = getJdbcConnection();
return connection.isValid(3);
} catch (SQLException e) {
LOG.warn("polardbx connection is not valid... caused by:" + e.getMessage());
return false;
}
}
/** initialize database and tables with ${databaseName}.sql for testing. */
protected static void initializePolardbxTables(String databaseName)
throws InterruptedException {
final String ddlFile = String.format("ddl/%s.sql", databaseName);
final URL ddlTestFile = PolardbxSourceITCase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
// need to sleep 1s, make sure the jdbc connection can be created
Thread.sleep(1000);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("drop database if exists " + databaseName);
statement.execute("create database if not exists " + databaseName);
statement.execute("use " + databaseName + ";");
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
public static void beforeClass() throws InterruptedException {
initializePolardbxTables(DATABASE, null);
}
@Test
@ -483,65 +370,4 @@ public class PolardbxSourceITCase extends AbstractTestBase {
List<String> realBinlog = fetchRows(iterator, expectedBinlog.length);
assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog);
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
private String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
return captureCustomerTables[0];
} else {
// pattern that matches multiple tables
return format("(%s)", StringUtils.join(captureCustomerTables, "|"));
}
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + 4);
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
private static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
private static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
}

@ -0,0 +1,214 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.polardbx;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Basic class for testing Database Polardbx which supported the mysql protocol. */
public abstract class PolardbxSourceTestBase extends AbstractTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
protected static final Integer PORT = 8527;
protected static final String HOST_NAME = "127.0.0.1";
protected static final String USER_NAME = "polardbx_root";
protected static final String PASSWORD = "123456";
private static final String IMAGE_VERSION = "2.1.0";
private static final DockerImageName POLARDBX_IMAGE =
DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION);
protected static final GenericContainer POLARDBX_CONTAINER =
new GenericContainer<>(POLARDBX_IMAGE)
.withExposedPorts(PORT)
.withLogConsumer(new Slf4jLogConsumer(LOG))
.withStartupTimeout(Duration.ofMinutes(3))
.withCreateContainerCmdModifier(
c ->
c.withPortBindings(
new PortBinding(
Ports.Binding.bindPort(PORT),
new ExposedPort(PORT))));
@BeforeClass
public static void startContainers() throws InterruptedException {
// no need to start container when the port 8527 is listening
if (!checkConnection()) {
LOG.info("Polardbx connection is not valid, so try to start containers...");
Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
LOG.info("Containers are started.");
// here should wait 10s that make sure the polardbx is ready
Thread.sleep(10 * 1000);
}
}
protected static String getJdbcUrl() {
return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
}
protected static Connection getJdbcConnection() throws SQLException {
String jdbcUrl = getJdbcUrl();
LOG.info("jdbcUrl is :" + jdbcUrl);
return DriverManager.getConnection(jdbcUrl, USER_NAME, PASSWORD);
}
protected static Boolean checkConnection() {
LOG.info("check polardbx connection validation...");
try {
Connection connection = getJdbcConnection();
return connection.isValid(3);
} catch (SQLException e) {
LOG.warn("polardbx connection is not valid... caused by:" + e.getMessage());
return false;
}
}
/** initialize database and tables with ${databaseName}.sql for testing. */
protected static void initializePolardbxTables(
String databaseName, Function<String, Boolean> filter) throws InterruptedException {
final String ddlFile = String.format("ddl/%s.sql", databaseName);
final URL ddlTestFile = PolardbxSourceTestBase.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
// need to sleep 1s, make sure the jdbc connection can be created
Thread.sleep(1000);
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("drop database if exists " + databaseName);
statement.execute("create database if not exists " + databaseName);
statement.execute("use " + databaseName + ";");
final List<String> statements =
Arrays.stream(
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.map(String::trim)
.filter(x -> !x.startsWith("--") && !x.isEmpty())
.map(
x -> {
final Matcher m =
COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"))
.split(";"))
.filter(sql -> filter == null || filter.apply(sql))
.collect(Collectors.toList());
for (String stmt : statements) {
statement.execute(stmt);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
protected String getTableNameRegex(String[] captureCustomerTables) {
checkState(captureCustomerTables.length > 0);
if (captureCustomerTables.length == 1) {
return captureCustomerTables[0];
} else {
// pattern that matches multiple tables
return format("(%s)", StringUtils.join(captureCustomerTables, "|"));
}
}
protected String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + 4);
}
// ------------------------------------------------------------------------
// test utilities
// ------------------------------------------------------------------------
protected static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
protected static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
protected static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
protected static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
}

@ -0,0 +1,170 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: charset_test
-- ----------------------------------------------------------------------------------------------------------------
CREATE TABLE `ascii_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET ascii DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ascii;
CREATE TABLE `big5_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET big5 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=big5;
CREATE TABLE `gbk_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET gbk DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=gbk;
CREATE TABLE `sjis_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET sjis DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=sjis;
CREATE TABLE `cp932_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp932 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp932;
CREATE TABLE `gb2312_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET gb2312 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=gb2312;
CREATE TABLE `ujis_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET ujis DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ujis;
CREATE TABLE `euckr_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET euckr DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=euckr;
CREATE TABLE `latin1_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET latin1 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=latin1;
CREATE TABLE `latin2_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET latin2 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=latin2;
CREATE TABLE `greek_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET greek DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=greek;
CREATE TABLE `hebrew_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET hebrew DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=hebrew;
CREATE TABLE `cp866_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp866 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp866;
CREATE TABLE `tis620_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET tis620 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=tis620;
CREATE TABLE `cp1250_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp1250 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1250;
CREATE TABLE `cp1251_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp1251 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1251;
CREATE TABLE `cp1257_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET cp1257 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=cp1257;
CREATE TABLE `macroman_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET macroman DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=macroman;
CREATE TABLE `macce_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET macce DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=macce;
CREATE TABLE `utf8_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET utf8 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8;
CREATE TABLE `ucs2_test` (
`table_id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`table_name` varchar(200) CHARACTER SET ucs2 DEFAULT '' COMMENT '表名称',
PRIMARY KEY (`table_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=ucs2;
INSERT into `ascii_test` values (1, 'ascii test!?'), (2, 'Craig Marshall'), (3, '{test}');
INSERT into `big5_test` values (1, '大五'), (2, 'Craig Marshall'), (3, '丹店');
INSERT into `gbk_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
INSERT into `sjis_test` values (1, 'ひびぴ'), (2, 'Craig Marshall'), (3, 'フブプ');
INSERT into `cp932_test` values (1, 'ひびぴ'), (2, 'Craig Marshall'), (3, 'フブプ');
INSERT into `gb2312_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
INSERT into `ujis_test` values (1, 'ひびぴ'), (2, 'Craig Marshall'), (3, 'フブプ');
INSERT into `euckr_test` values (1, '죠주쥬'), (2, 'Craig Marshall'), (3, '한국어');
INSERT into `latin1_test` values (1, 'ÀÆÉ'), (2, 'Craig Marshall'), (3, 'Üæû');
INSERT into `latin2_test` values (1, 'ÓÔŐÖ'), (2, 'Craig Marshall'), (3, 'ŠŞŤŹ');
INSERT into `greek_test` values (1, 'αβγδε'), (2, 'Craig Marshall'), (3, 'θικλ');
INSERT into `hebrew_test` values (1, 'בבקשה'), (2, 'Craig Marshall'), (3, 'שרפה');
INSERT into `cp866_test` values (1, 'твой'), (2, 'Craig Marshall'), (3, 'любой');
INSERT into `tis620_test` values (1, 'ภาษาไทย'), (2, 'Craig Marshall'), (3, 'ฆงจฉ');
INSERT into `cp1250_test` values (1, 'ÓÔŐÖ'), (2, 'Craig Marshall'), (3, 'ŠŞŤŹ');
INSERT into `cp1251_test` values (1, 'твой'), (2, 'Craig Marshall'), (3, 'любой');
INSERT into `cp1257_test` values (1, 'piedzimst brīvi'), (2, 'Craig Marshall'), (3, 'apveltīti ar saprātu');
INSERT into `macroman_test` values (1, 'ÀÆÉ'), (2, 'Craig Marshall'), (3, 'Üæû');
INSERT into `macce_test` values (1, 'ÓÔŐÖ'), (2, 'Craig Marshall'), (3, 'ŮÚŰÜ');
INSERT into `utf8_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
INSERT into `ucs2_test` values (1, '测试数据'), (2, 'Craig Marshall'), (3, '另一个测试数据');
Loading…
Cancel
Save