[FLINK-36610] MySQL CDC supports parsing gh-ost / pt-osc generated schema changes (#3668)

Co-authored-by: MOBIN-F <18814118038@163.com>
pull/3784/head^2
yuxiqian 3 weeks ago committed by GitHub
parent f7f3901fa3
commit 39fd00ce49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -377,6 +377,18 @@ Flink SQL> SELECT * FROM orders;
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
是否尝试解析由 <a href="https://github.com/github/gh-ost">gh-ost</a><a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a> 工具生成的表结构变更事件。
这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
<br>
这是一项实验性功能。
</td>
</tr>
</tbody>
</table>
</div>

@ -286,6 +286,18 @@ pipeline:
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
是否尝试解析由 <a href="https://github.com/github/gh-ost">gh-ost</a><a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a> 工具生成的表结构变更事件。
这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
<br>
这是一项实验性功能。
</td>
</tr>
</tbody>
</table>
</div>

@ -391,6 +391,18 @@ During a snapshot operation, the connector will query each included table to pro
hex: The binary data type is converted to a hexadecimal string and transmitted.
The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission.</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to parse "online" schema changes generated by <a href="https://github.com/github/gh-ost">gh-ost</a> or <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a>.
Schema change events are applied to a "shadow" table and then swapped with the original table later.
<br>
This is an experimental feature, and subject to change in the future.
</td>
</tr>
</tbody>
</table>
</div>

@ -293,6 +293,18 @@ pipeline:
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to parse "online" schema changes generated by <a href="https://github.com/github/gh-ost">gh-ost</a> or <a href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">pt-osc</a>.
Schema change events are applied to a "shadow" table and then swapped with the original table later.
<br>
This is an experimental feature, and subject to change in the future.
</td>
</tr>
</tbody>
</table>
</div>

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.utils;
import org.apache.flink.util.function.SupplierWithException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
/** Some utility methods for creating repeated-checking test cases. */
public class TestCaseUtils {
public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30);
public static final Duration DEFAULT_INTERVAL = Duration.ofSeconds(1);
/** Fetch with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(Supplier<Boolean> fetcher) {
repeatedCheck(fetcher, DEFAULT_TIMEOUT);
}
/** Fetch with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(Supplier<Boolean> fetcher, Duration timeout) {
repeatedCheck(fetcher, timeout, DEFAULT_INTERVAL);
}
/** Fetch with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(
Supplier<Boolean> fetcher, Duration timeout, Duration interval) {
repeatedCheck(fetcher::get, timeout, interval, Collections.emptyList());
}
/** Fetch and wait with a ({@code timeout}, {@code interval}) duration. */
public static <T> void repeatedCheck(
Supplier<T> fetcher, Predicate<T> validator, Duration timeout, Duration interval) {
repeatedCheckAndValidate(
fetcher::get, validator, timeout, interval, Collections.emptyList());
}
/** Waiting for fetching values with a ({@code timeout}, {@code interval}) duration. */
public static void repeatedCheck(
SupplierWithException<Boolean, Throwable> fetcher,
Duration timeout,
Duration interval,
List<Class<? extends Throwable>> allowedThrowsList) {
repeatedCheckAndValidate(fetcher, b -> b, timeout, interval, allowedThrowsList);
}
/** Fetch and validate, with a ({@code timeout}, {@code interval}) duration. */
public static <T> void repeatedCheckAndValidate(
SupplierWithException<T, Throwable> fetcher,
Predicate<T> validator,
Duration timeout,
Duration interval,
List<Class<? extends Throwable>> allowedThrowsList) {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout.toMillis()) {
try {
if (validator.test(fetcher.get())) {
return;
}
} catch (Throwable t) {
if (allowedThrowsList.stream()
.noneMatch(clazz -> clazz.isAssignableFrom(t.getClass()))) {
throw new RuntimeException("Fetcher has thrown an unexpected exception: ", t);
}
}
try {
Thread.sleep(interval.toMillis());
} catch (InterruptedException ignored) {
// ignored
}
}
throw new RuntimeException("Timeout when waiting for state to be ready.");
}
}

@ -67,6 +67,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
@ -139,6 +140,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanBinlogNewlyAddedTableEnabled =
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@ -175,7 +177,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges);
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

@ -281,4 +281,12 @@ public class MySqlDataSourceOptions {
.withDescription(
"List of readable metadata from SourceRecord to be passed to downstream, split by `,`. "
+ "Available readable metadata are: op_ts.");
@Experimental
public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES =
ConfigOptions.key("scan.parse.online.schema.changes.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
}

@ -0,0 +1,591 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.CloseableIterator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId;
import static org.junit.Assert.assertEquals;
/**
* IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
* href="https://github.com/github/gh-ost">github/gh-ost</a>/<a
* href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for
* more details.
*/
public class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase {
private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private static final String GH_OST_DOWNLOAD_LINK =
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz"
: "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";
@BeforeClass
public static void beforeClass() {
LOG.info("Starting MySql8 containers...");
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
LOG.info("Container MySql8 is started.");
}
@AfterClass
public static void afterClass() {
LOG.info("Stopping MySql8 containers...");
MYSQL8_CONTAINER.stop();
PERCONA_TOOLKIT_CONTAINER.stop();
LOG.info("Container MySql8 is stopped.");
}
@Before
public void before() {
customerDatabase.createAndInitialize();
TestValuesTableFactory.clearAllData();
env.setParallelism(4);
env.enableCheckpointing(200);
env.setRestartStrategy(RestartStrategies.noRestart());
}
@After
public void after() {
customerDatabase.dropDatabase();
}
private static void installGhOstCli(Container<?> container) {
try {
execInContainer(
container,
"download gh-ost tarball",
"curl",
"-L",
"-o",
"/tmp/gh-ost.tar.gz",
GH_OST_DOWNLOAD_LINK);
execInContainer(
container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin");
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private static GenericContainer<?> createPerconaToolkitContainer() {
GenericContainer<?> perconaToolkit =
new GenericContainer<>(PERCONA_TOOLKIT)
// keep container alive
.withCommand("tail", "-f", "/dev/null")
.withNetwork(NETWORK)
.withLogConsumer(new Slf4jLogConsumer(LOG));
return perconaToolkit;
}
@Test
public void testGhOstSchemaMigrationFromScratch() throws Exception {
LOG.info("Step 1: Install gh-ost command line utility");
installGhOstCli(MYSQL8_CONTAINER);
LOG.info("Step 2: Start pipeline job");
env.setParallelism(1);
TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers");
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(customerDatabase.getDatabaseName())
.tableList(customerDatabase.getDatabaseName() + "\\.customers")
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue())
.parseOnLineSchemaChanges(true);
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);
List<Event> expected = new ArrayList<>();
Schema schemaV1 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.primaryKey(Collections.singletonList("id"))
.build();
expected.add(new CreateTableEvent(tableId, schemaV1));
expected.addAll(getSnapshotExpected(tableId, schemaV1));
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
expected.stream().map(Object::toString).collect(Collectors.toList()),
actual.stream().map(Object::toString).collect(Collectors.toList()));
// Wait for a little while until we're in Binlog streaming mode.
Thread.sleep(5_000);
LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=add column ext int",
"--allow-on-master", // because we don't have a replica
"--initially-drop-old-table", // drop previously generated temporary tables
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
// The new column `ext` has been inserted now
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10000, 'Alice', 'Beijing', '123567891234', 17);",
customerDatabase.getDatabaseName()));
}
Schema schemaV2 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.physicalColumn("ext", DataTypes.INT())
.primaryKey(Collections.singletonList("id"))
.build();
assertEquals(
Arrays.asList(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("ext", DataTypes.INT(), null)))),
DataChangeEvent.insertEvent(
tableId,
generate(schemaV2, 10000, "Alice", "Beijing", "123567891234", 17))),
fetchResults(events, 2));
LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=modify column ext double",
"--allow-on-master",
"--initially-drop-old-table",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10001, 'Bob', 'Chongqing', '123567891234', 2.718281828);",
customerDatabase.getDatabaseName()));
}
Schema schemaV3 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.physicalColumn("ext", DataTypes.DOUBLE())
.primaryKey(Collections.singletonList("id"))
.build();
assertEquals(
Arrays.asList(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("ext", DataTypes.DOUBLE())),
DataChangeEvent.insertEvent(
tableId,
generate(
schemaV3,
10001,
"Bob",
"Chongqing",
"123567891234",
2.718281828))),
fetchResults(events, 2));
LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=drop column ext",
"--allow-on-master",
"--initially-drop-old-table",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');",
customerDatabase.getDatabaseName()));
}
Schema schemaV4 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.primaryKey(Collections.singletonList("id"))
.build();
assertEquals(
Arrays.asList(
new DropColumnEvent(tableId, Collections.singletonList("ext")),
DataChangeEvent.insertEvent(
tableId,
generate(schemaV4, 10002, "Cicada", "Urumqi", "123567891234"))),
fetchResults(events, 2));
}
@Test
public void testPtOscSchemaMigrationFromScratch() throws Exception {
LOG.info("Step 1: Start pipeline job");
env.setParallelism(1);
TableId tableId = TableId.tableId(customerDatabase.getDatabaseName(), "customers");
MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(customerDatabase.getDatabaseName())
.tableList(customerDatabase.getDatabaseName() + "\\.customers")
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue())
.parseOnLineSchemaChanges(true);
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
CloseableIterator<Event> events =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
Thread.sleep(5_000);
List<Event> expected = new ArrayList<>();
Schema schemaV1 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.primaryKey(Collections.singletonList("id"))
.build();
expected.add(new CreateTableEvent(tableId, schemaV1));
expected.addAll(getSnapshotExpected(tableId, schemaV1));
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
expected.stream().map(Object::toString).collect(Collectors.toList()),
actual.stream().map(Object::toString).collect(Collectors.toList()));
// Wait for a little while until we're in Binlog streaming mode.
Thread.sleep(5_000);
LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"add column ext int",
"--charset=utf8",
"--recursion-method=NONE", // Do not look for slave nodes
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
// The new column `ext` has been inserted now
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10000, 'Alice', 'Beijing', '123567891234', 17);",
customerDatabase.getDatabaseName()));
}
Schema schemaV2 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.physicalColumn("ext", DataTypes.INT())
.primaryKey(Collections.singletonList("id"))
.build();
assertEquals(
Arrays.asList(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("ext", DataTypes.INT(), null)))),
DataChangeEvent.insertEvent(
tableId,
generate(schemaV2, 10000, "Alice", "Beijing", "123567891234", 17))),
fetchResults(events, 2));
LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"modify column ext double",
"--charset=utf8",
"--recursion-method=NONE",
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10001, 'Bob', 'Chongqing', '123567891234', 2.718281828);",
customerDatabase.getDatabaseName()));
}
Schema schemaV3 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.physicalColumn("ext", DataTypes.DOUBLE())
.primaryKey(Collections.singletonList("id"))
.build();
assertEquals(
Arrays.asList(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("ext", DataTypes.DOUBLE())),
DataChangeEvent.insertEvent(
tableId,
generate(
schemaV3,
10001,
"Bob",
"Chongqing",
"123567891234",
2.718281828))),
fetchResults(events, 2));
LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"drop column ext",
"--charset=utf8",
"--recursion-method=NONE",
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');",
customerDatabase.getDatabaseName()));
}
Schema schemaV4 =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("address", DataTypes.VARCHAR(1024))
.physicalColumn("phone_number", DataTypes.VARCHAR(512))
.primaryKey(Collections.singletonList("id"))
.build();
assertEquals(
Arrays.asList(
new DropColumnEvent(tableId, Collections.singletonList("ext")),
DataChangeEvent.insertEvent(
tableId,
generate(schemaV4, 10002, "Cicada", "Urumqi", "123567891234"))),
fetchResults(events, 2));
}
private static void execInContainer(Container<?> container, String prompt, String... commands)
throws IOException, InterruptedException {
{
LOG.info(
"Starting to {} with the following command: `{}`",
prompt,
String.join(" ", commands));
Container.ExecResult execResult = container.execInContainer(commands);
if (execResult.getExitCode() == 0) {
LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout());
} else {
LOG.error(
"Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
prompt,
execResult.getExitCode(),
execResult.getStdout(),
execResult.getStderr());
throw new IOException("Failed to execute commands: " + String.join(" ", commands));
}
}
}
private List<Event> getSnapshotExpected(TableId tableId, Schema schema) {
return Stream.of(
generate(schema, 101, "user_1", "Shanghai", "123567891234"),
generate(schema, 102, "user_2", "Shanghai", "123567891234"),
generate(schema, 103, "user_3", "Shanghai", "123567891234"),
generate(schema, 109, "user_4", "Shanghai", "123567891234"),
generate(schema, 110, "user_5", "Shanghai", "123567891234"),
generate(schema, 111, "user_6", "Shanghai", "123567891234"),
generate(schema, 118, "user_7", "Shanghai", "123567891234"),
generate(schema, 121, "user_8", "Shanghai", "123567891234"),
generate(schema, 123, "user_9", "Shanghai", "123567891234"),
generate(schema, 1009, "user_10", "Shanghai", "123567891234"),
generate(schema, 1010, "user_11", "Shanghai", "123567891234"),
generate(schema, 1011, "user_12", "Shanghai", "123567891234"),
generate(schema, 1012, "user_13", "Shanghai", "123567891234"),
generate(schema, 1013, "user_14", "Shanghai", "123567891234"),
generate(schema, 1014, "user_15", "Shanghai", "123567891234"),
generate(schema, 1015, "user_16", "Shanghai", "123567891234"),
generate(schema, 1016, "user_17", "Shanghai", "123567891234"),
generate(schema, 1017, "user_18", "Shanghai", "123567891234"),
generate(schema, 1018, "user_19", "Shanghai", "123567891234"),
generate(schema, 1019, "user_20", "Shanghai", "123567891234"),
generate(schema, 2000, "user_21", "Shanghai", "123567891234"))
.map(record -> DataChangeEvent.insertEvent(tableId, record))
.collect(Collectors.toList());
}
private BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Arrays.stream(fields)
.map(
e ->
(e instanceof String)
? BinaryStringData.fromString((String) e)
: e)
.toArray());
}
}

@ -53,6 +53,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -83,6 +84,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
private Predicate capturedTableFilter;
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
private static final long READER_CLOSE_TIMEOUT = 30L;
@ -93,6 +95,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureBinlogPhaseTables = new HashSet<>();
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
}
public void submitSplit(MySqlSplit mySqlSplit) {
@ -148,6 +152,14 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
if (isParsingOnLineSchemaChanges) {
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
if (oscRecord.isPresent()) {
sourceRecords.add(oscRecord.get());
continue;
}
}
if (shouldEmit(event.getRecord())) {
sourceRecords.add(event.getRecord());
}
@ -195,6 +207,20 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
}
}
private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord sourceRecord) {
if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
// This is a gh-ost initialized schema change event and should be emitted if the
// peeled tableId matches the predicate.
TableId originalTableId = RecordUtils.getTableId(sourceRecord);
TableId peeledTableId = RecordUtils.peelTableId(originalTableId);
if (capturedTableFilter.test(peeledTableId)) {
return Optional.of(
RecordUtils.setTableId(sourceRecord, originalTableId, peeledTableId));
}
}
return Optional.empty();
}
/**
* Returns the record should emit or not.
*

@ -268,6 +268,12 @@ public class MySqlSourceBuilder<T> {
return this;
}
/** Whether to parse gh-ost utility generated schema change events. Defaults to false. */
public MySqlSourceBuilder<T> parseOnLineSchemaChanges(boolean parseOnLineSchemaChanges) {
this.configFactory.parseOnLineSchemaChanges(parseOnLineSchemaChanges);
return this;
}
/**
* Build the {@link MySqlSource}.
*

@ -66,6 +66,7 @@ public class MySqlSourceConfig implements Serializable {
private final Properties jdbcProperties;
private final Map<ObjectPath, String> chunkKeyColumns;
private final boolean skipSnapshotBackfill;
private final boolean parseOnLineSchemaChanges;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -99,7 +100,8 @@ public class MySqlSourceConfig implements Serializable {
Properties dbzProperties,
Properties jdbcProperties,
Map<ObjectPath, String> chunkKeyColumns,
boolean skipSnapshotBackfill) {
boolean skipSnapshotBackfill,
boolean parseOnLineSchemaChanges) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@ -127,6 +129,7 @@ public class MySqlSourceConfig implements Serializable {
this.jdbcProperties = jdbcProperties;
this.chunkKeyColumns = chunkKeyColumns;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
}
public String getHostname() {
@ -210,6 +213,10 @@ public class MySqlSourceConfig implements Serializable {
return closeIdleReaders;
}
public boolean isParseOnLineSchemaChanges() {
return parseOnLineSchemaChanges;
}
public Properties getDbzProperties() {
return dbzProperties;
}

@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private Properties dbzProperties;
private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
private boolean skipSnapshotBackfill = false;
private boolean parseOnLineSchemaChanges = false;
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@ -291,6 +292,12 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/** Whether to parse gh-ost/pt-osc utility generated schema change events. Defaults to false. */
public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean parseOnLineSchemaChanges) {
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
return this;
}
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
@ -384,6 +391,7 @@ public class MySqlSourceConfigFactory implements Serializable {
props,
jdbcProperties,
chunkKeyColumns,
skipSnapshotBackfill);
skipSnapshotBackfill,
parseOnLineSchemaChanges);
}
}

@ -262,4 +262,12 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially.");
@Experimental
public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES =
ConfigOptions.key("scan.parse.online.schema.changes.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");
}

@ -25,7 +25,11 @@ import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitI
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.data.Envelope;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
@ -50,6 +54,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
@ -384,6 +390,40 @@ public class RecordUtils {
return new TableId(dbName, null, tableName);
}
public static SourceRecord setTableId(
SourceRecord dataRecord, TableId originalTableId, TableId tableId) {
Struct value = (Struct) dataRecord.value();
Document historyRecordDocument;
try {
historyRecordDocument = getHistoryRecord(dataRecord).document();
} catch (IOException e) {
throw new RuntimeException(e);
}
HistoryRecord newHistoryRecord =
new HistoryRecord(
historyRecordDocument.set(
HistoryRecord.Fields.DDL_STATEMENTS,
historyRecordDocument
.get(HistoryRecord.Fields.DDL_STATEMENTS)
.asString()
.replace(originalTableId.table(), tableId.table())));
Struct newSource =
value.getStruct(Envelope.FieldName.SOURCE)
.put(DATABASE_NAME_KEY, tableId.catalog())
.put(TABLE_NAME_KEY, tableId.table());
return dataRecord.newRecord(
dataRecord.topic(),
dataRecord.kafkaPartition(),
dataRecord.keySchema(),
dataRecord.key(),
dataRecord.valueSchema(),
value.put(Envelope.FieldName.SOURCE, newSource)
.put(HISTORY_RECORD_FIELD, newHistoryRecord.toString()),
dataRecord.timestamp(),
dataRecord.headers());
}
public static boolean isTableChangeRecord(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
@ -489,4 +529,75 @@ public class RecordUtils {
}
return Optional.empty();
}
/**
* This utility method checks if given source record is a gh-ost/pt-osc initiated schema change
* event by checking the "alter" ddl.
*/
public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
if (!isSchemaChangeEvent(record)) {
return false;
}
Struct value = (Struct) record.value();
ObjectMapper mapper = new ObjectMapper();
try {
// There will be these schema change events generated in total during one transaction.
//
// gh-ost:
// DROP TABLE IF EXISTS `db`.`_tb1_gho`
// DROP TABLE IF EXISTS `db`.`_tb1_del`
// DROP TABLE IF EXISTS `db`.`_tb1_ghc`
// create /* gh-ost */ table `db`.`_tb1_ghc` ...
// create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
// alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255)
// create /* gh-ost */ table `db`.`_tb1_del` ...
// DROP TABLE IF EXISTS `db`.`_tb1_del`
// rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
// rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
// DROP TABLE IF EXISTS `db`.`_tb1_ghc`
// DROP TABLE IF EXISTS `db`.`_tb1_del`
//
// pt-osc:
// CREATE TABLE `db`.`_test_tb1_new`
// ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
// CREATE TRIGGER `pt_osc_db_test_tb1_del`...
// CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
// CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
// ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
// RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO
// `db`.`test_tb1`
// DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
// DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
// DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
// DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
//
// Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new`
// table.
String ddl =
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
.get(HistoryRecord.Fields.DDL_STATEMENTS)
.asText()
.toLowerCase();
if (ddl.startsWith("alter")) {
String tableName =
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
}
return false;
} catch (JsonProcessingException e) {
return false;
}
}
private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$");
/** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */
public static TableId peelTableId(TableId tableId) {
Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
if (matchingResult.matches()) {
return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1));
}
return tableId;
}
}

@ -98,6 +98,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final Duration heartbeatInterval;
private final String chunkKeyColumn;
final boolean skipSnapshotBackFill;
final boolean parseOnlineSchemaChanges;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -135,7 +136,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
Properties jdbcProperties,
Duration heartbeatInterval,
@Nullable String chunkKeyColumn,
boolean skipSnapshotBackFill) {
boolean skipSnapshotBackFill,
boolean parseOnlineSchemaChanges) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -159,6 +161,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.jdbcProperties = jdbcProperties;
this.parseOnlineSchemaChanges = parseOnlineSchemaChanges;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
@ -220,6 +223,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.heartbeatInterval(heartbeatInterval)
.chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn)
.skipSnapshotBackfill(skipSnapshotBackFill)
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.build();
return SourceProvider.of(parallelSource);
} else {
@ -305,7 +309,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
jdbcProperties,
heartbeatInterval,
chunkKeyColumn,
skipSnapshotBackFill);
skipSnapshotBackFill,
parseOnlineSchemaChanges);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;

@ -102,6 +102,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean skipSnapshotBackFill =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean parseOnLineSchemaChanges =
config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@ -145,7 +147,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
chunkKeyColumn,
skipSnapshotBackFill);
skipSnapshotBackFill,
parseOnLineSchemaChanges);
}
@Override
@ -191,6 +194,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
return options;
}

@ -0,0 +1,584 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT;
/**
* IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
* href="https://github.com/github/gh-ost">github/gh-ost</a>/<a
* href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for
* more details.
*/
public class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase {
private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
private static final String GH_OST_DOWNLOAD_LINK =
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz"
: "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";
@BeforeClass
public static void beforeClass() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
LOG.info("Containers are started.");
}
@AfterClass
public static void afterClass() {
LOG.info("Stopping containers...");
MYSQL8_CONTAINER.stop();
PERCONA_TOOLKIT_CONTAINER.close();
LOG.info("Containers are stopped.");
}
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200);
customerDatabase.createAndInitialize();
System.setOut(new PrintStream(outCaptor));
}
@After
public void after() {
customerDatabase.dropDatabase();
System.setOut(sysOut);
}
private static void installGhOstCli(Container<?> container) {
try {
execInContainer(
container,
"download gh-ost tarball",
"curl",
"-L",
"-o",
"/tmp/gh-ost.tar.gz",
GH_OST_DOWNLOAD_LINK);
execInContainer(
container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin");
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private static GenericContainer<?> createPerconaToolkitContainer() {
GenericContainer<?> perconaToolkit =
new GenericContainer<>(PERCONA_TOOLKIT)
// keep container alive
.withCommand("tail", "-f", "/dev/null")
.withNetwork(NETWORK)
.withLogConsumer(new Slf4jLogConsumer(LOG));
return perconaToolkit;
}
private final PrintStream sysOut = System.out;
private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();
@Test
public void testGhOstSchemaMigrationFromScratch() throws Exception {
LOG.info("Step 1: Install gh-ost command line utility");
installGhOstCli(MYSQL8_CONTAINER);
LOG.info("Step 2: Start pipeline job");
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.databaseList(customerDatabase.getDatabaseName())
.tableList(customerDatabase.getDatabaseName() + ".customers")
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverId("5401-5404")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.includeSchemaChanges(true) // output the schema changes as well
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource")
.setParallelism(4)
.print()
.setParallelism(1);
JobClient resultClient = env.executeAsync();
TestCaseUtils.repeatedCheck(
() -> resultClient.getJobStatus().get().equals(RUNNING),
DEFAULT_TIMEOUT,
DEFAULT_INTERVAL,
Arrays.asList(InterruptedException.class, NoSuchElementException.class));
{
String[] expected =
new String[] {
"{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
};
TestCaseUtils.repeatedCheck(
() -> Arrays.stream(expected).allMatch(outCaptor.toString()::contains));
}
// Wait for a little while until we're in Binlog streaming mode.
Thread.sleep(5_000);
{
LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=add column ext int first",
"--allow-on-master", // because we don't have a replica
"--initially-drop-old-table", // drop previously generated temporary tables
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
// The new column `ext` has been inserted now
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');",
customerDatabase.getDatabaseName()));
}
TestCaseUtils.repeatedCheck(
() ->
outCaptor
.toString()
.contains(
"{\"ext\":17,\"id\":10000,\"name\":\"Alice\",\"address\":\"Beijing\",\"phone_number\":\"123567891234\"}"));
}
{
LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=modify column ext double",
"--allow-on-master",
"--initially-drop-old-table",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');",
customerDatabase.getDatabaseName()));
}
TestCaseUtils.repeatedCheck(
() ->
outCaptor
.toString()
.contains(
"{\"ext\":2.718281828,\"id\":10001,\"name\":\"Bob\",\"address\":\"Chongqing\",\"phone_number\":\"123567891234\"}"));
}
{
LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=drop column ext",
"--allow-on-master",
"--initially-drop-old-table",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');",
customerDatabase.getDatabaseName()));
}
TestCaseUtils.repeatedCheck(
() ->
outCaptor
.toString()
.contains(
"{\"id\":10002,\"name\":\"Cicada\",\"address\":\"Urumqi\",\"phone_number\":\"123567891234\"}"));
}
resultClient.cancel();
}
@Test
public void testPtOscSchemaMigrationFromScratch() throws Exception {
LOG.info("Step 1: Start pipeline job");
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
.hostname(MYSQL8_CONTAINER.getHost())
.port(MYSQL8_CONTAINER.getDatabasePort())
.databaseList(customerDatabase.getDatabaseName())
.tableList(customerDatabase.getDatabaseName() + ".customers")
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverId("5401-5404")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.includeSchemaChanges(true) // output the schema changes as well
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySqlParallelSource")
.setParallelism(4)
.print()
.setParallelism(1);
JobClient resultClient = env.executeAsync();
TestCaseUtils.repeatedCheck(
() -> resultClient.getJobStatus().get().equals(RUNNING),
DEFAULT_TIMEOUT,
DEFAULT_INTERVAL,
Arrays.asList(InterruptedException.class, NoSuchElementException.class));
{
String[] expected =
new String[] {
"{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
};
TestCaseUtils.repeatedCheck(
() -> Arrays.stream(expected).allMatch(outCaptor.toString()::contains));
}
// Wait for a little while until we're in Binlog streaming mode.
Thread.sleep(5000L);
LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"add column ext int FIRST",
"--charset=utf8",
"--recursion-method=NONE", // Do not look for slave nodes
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
// The new column `ext` has been inserted now
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');",
customerDatabase.getDatabaseName()));
TestCaseUtils.repeatedCheck(
() ->
outCaptor
.toString()
.contains(
"{\"ext\":17,\"id\":10000,\"name\":\"Alice\",\"address\":\"Beijing\",\"phone_number\":\"123567891234\"}"));
}
LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"modify column ext double",
"--charset=utf8",
"--recursion-method=NONE",
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');",
customerDatabase.getDatabaseName()));
TestCaseUtils.repeatedCheck(
() ->
outCaptor
.toString()
.contains(
"{\"ext\":2.718281828,\"id\":10001,\"name\":\"Bob\",\"address\":\"Chongqing\",\"phone_number\":\"123567891234\"}"));
}
LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"drop column ext",
"--charset=utf8",
"--recursion-method=NONE",
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');",
customerDatabase.getDatabaseName()));
TestCaseUtils.repeatedCheck(
() ->
outCaptor
.toString()
.contains(
"{\"id\":10002,\"name\":\"Cicada\",\"address\":\"Urumqi\",\"phone_number\":\"123567891234\"}"));
}
}
private static void execInContainer(Container<?> container, String prompt, String... commands)
throws IOException, InterruptedException {
{
LOG.info(
"Starting to {} with the following command: `{}`",
prompt,
String.join(" ", commands));
Container.ExecResult execResult = container.execInContainer(commands);
if (execResult.getExitCode() == 0) {
LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout());
} else {
LOG.error(
"Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
prompt,
execResult.getExitCode(),
execResult.getStdout(),
execResult.getStderr());
throw new IOException("Failed to execute commands: " + String.join(" ", commands));
}
}
}
private List<Event> getSnapshotExpected(TableId tableId, Schema schema) {
return Stream.of(
generate(schema, 101, "user_1", "Shanghai", "123567891234"),
generate(schema, 102, "user_2", "Shanghai", "123567891234"),
generate(schema, 103, "user_3", "Shanghai", "123567891234"),
generate(schema, 109, "user_4", "Shanghai", "123567891234"),
generate(schema, 110, "user_5", "Shanghai", "123567891234"),
generate(schema, 111, "user_6", "Shanghai", "123567891234"),
generate(schema, 118, "user_7", "Shanghai", "123567891234"),
generate(schema, 121, "user_8", "Shanghai", "123567891234"),
generate(schema, 123, "user_9", "Shanghai", "123567891234"),
generate(schema, 1009, "user_10", "Shanghai", "123567891234"),
generate(schema, 1010, "user_11", "Shanghai", "123567891234"),
generate(schema, 1011, "user_12", "Shanghai", "123567891234"),
generate(schema, 1012, "user_13", "Shanghai", "123567891234"),
generate(schema, 1013, "user_14", "Shanghai", "123567891234"),
generate(schema, 1014, "user_15", "Shanghai", "123567891234"),
generate(schema, 1015, "user_16", "Shanghai", "123567891234"),
generate(schema, 1016, "user_17", "Shanghai", "123567891234"),
generate(schema, 1017, "user_18", "Shanghai", "123567891234"),
generate(schema, 1018, "user_19", "Shanghai", "123567891234"),
generate(schema, 1019, "user_20", "Shanghai", "123567891234"),
generate(schema, 2000, "user_21", "Shanghai", "123567891234"))
.map(record -> DataChangeEvent.insertEvent(tableId, record))
.collect(Collectors.toList());
}
private BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Arrays.stream(fields)
.map(
e ->
(e instanceof String)
? BinaryStringData.fromString((String) e)
: e)
.toArray());
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}
protected String getServerId(int base) {
return base + "-" + (base + DEFAULT_PARALLELISM);
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize) {
TestCaseUtils.repeatedCheck(() -> sinkSize(sinkName) >= expectedSize);
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
}

@ -32,9 +32,11 @@ import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
@ -55,6 +57,8 @@ public abstract class MySqlSourceTestBase extends TestLogger {
protected static final int DEFAULT_PARALLELISM = 4;
protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics();
public static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
@ClassRule public static final Network NETWORK = Network.newNetwork();
@Rule
public final MiniClusterWithClientResource miniClusterResource =
@ -96,6 +100,8 @@ public abstract class MySqlSourceTestBase extends TestLogger {
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

@ -0,0 +1,598 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.table;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_INTERVAL;
import static org.apache.flink.cdc.common.utils.TestCaseUtils.DEFAULT_TIMEOUT;
/**
* IT case for Evolving MySQL schema with gh-ost/pt-osc utility. See <a
* href="https://github.com/github/gh-ost">github/gh-ost</a>/<a
* href="https://docs.percona.com/percona-toolkit/pt-online-schema-change.html">doc/pt-osc</a> for
* more details.
*/
public class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
private static final String GH_OST_DOWNLOAD_LINK =
DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64")
? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz"
: "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz";
@BeforeClass
public static void beforeClass() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join();
LOG.info("Containers are started.");
}
@AfterClass
public static void afterClass() {
LOG.info("Stopping containers...");
MYSQL8_CONTAINER.stop();
PERCONA_TOOLKIT_CONTAINER.close();
LOG.info("Containers are stopped.");
}
@Before
public void before() {
TestValuesTableFactory.clearAllData();
env.setParallelism(DEFAULT_PARALLELISM);
env.enableCheckpointing(200);
customerDatabase.createAndInitialize();
}
@After
public void after() {
customerDatabase.dropDatabase();
}
private static void installGhOstCli(Container<?> container) {
try {
execInContainer(
container,
"download gh-ost tarball",
"curl",
"-L",
"-o",
"/tmp/gh-ost.tar.gz",
GH_OST_DOWNLOAD_LINK);
execInContainer(
container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin");
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private static GenericContainer<?> createPerconaToolkitContainer() {
GenericContainer<?> perconaToolkit =
new GenericContainer<>(PERCONA_TOOLKIT)
// keep container alive
.withCommand("tail", "-f", "/dev/null")
.withNetwork(NETWORK)
.withLogConsumer(new Slf4jLogConsumer(LOG));
return perconaToolkit;
}
@Test
public void testGhOstSchemaMigrationFromScratch() throws Exception {
LOG.info("Step 1: Install gh-ost command line utility");
installGhOstCli(MYSQL8_CONTAINER);
LOG.info("Step 2: Start pipeline job");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
MYSQL8_CONTAINER.getHost(),
MYSQL8_CONTAINER.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
customerDatabase.getDatabaseName(),
"customers",
true,
getServerId());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
TestCaseUtils.repeatedCheck(
() -> result.getJobClient().get().getJobStatus().get().equals(RUNNING),
DEFAULT_TIMEOUT,
DEFAULT_INTERVAL,
Arrays.asList(InterruptedException.class, NoSuchElementException.class));
CloseableIterator<Row> iterator = result.collect();
{
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
// Wait for a little while until we're in Binlog streaming mode.
Thread.sleep(5_000);
{
LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=add column ext int first",
"--allow-on-master", // because we don't have a replica
"--initially-drop-old-table", // drop previously generated temporary tables
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
// The new column `ext` has been inserted now
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');",
customerDatabase.getDatabaseName()));
}
String[] expected =
new String[] {
"+I[10000, Alice, Beijing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
{
LOG.info("Step 4: Evolve schema with gh-ost - MODIFY COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=modify column ext double",
"--allow-on-master",
"--initially-drop-old-table",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');",
customerDatabase.getDatabaseName()));
}
String[] expected =
new String[] {
"+I[10001, Bob, Chongqing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
{
LOG.info("Step 5: Evolve schema with gh-ost - DROP COLUMN");
execInContainer(
MYSQL8_CONTAINER,
"evolve schema",
"gh-ost",
"--user=" + TEST_USER,
"--password=" + TEST_PASSWORD,
"--database=" + customerDatabase.getDatabaseName(),
"--table=customers",
"--alter=drop column ext",
"--allow-on-master",
"--initially-drop-old-table",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');",
customerDatabase.getDatabaseName()));
}
String[] expected =
new String[] {
"+I[10002, Cicada, Urumqi, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
}
@Test
public void testPtOscSchemaMigrationFromScratch() throws Exception {
LOG.info("Step 1: Start pipeline job");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " `id` INT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ ")",
MYSQL8_CONTAINER.getHost(),
MYSQL8_CONTAINER.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
customerDatabase.getDatabaseName(),
"customers",
true,
getServerId());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
TestCaseUtils.repeatedCheck(
() -> result.getJobClient().get().getJobStatus().get().equals(RUNNING),
DEFAULT_TIMEOUT,
DEFAULT_INTERVAL,
Arrays.asList(InterruptedException.class, NoSuchElementException.class));
CloseableIterator<Row> iterator = result.collect();
{
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
// Wait for a little while until we're in Binlog streaming mode.
Thread.sleep(5_000);
LOG.info("Step 2: Evolve schema with pt-osc - ADD COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"add column ext int FIRST",
"--charset=utf8",
"--recursion-method=NONE", // Do not look for slave nodes
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
// The new column `ext` has been inserted now
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (17, 10000, 'Alice', 'Beijing', '123567891234');",
customerDatabase.getDatabaseName()));
String[] expected =
new String[] {
"+I[10000, Alice, Beijing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
LOG.info("Step 3: Evolve schema with pt-osc - MODIFY COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"modify column ext double",
"--charset=utf8",
"--recursion-method=NONE",
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (2.718281828, 10001, 'Bob', 'Chongqing', '123567891234');",
customerDatabase.getDatabaseName()));
String[] expected =
new String[] {
"+I[10001, Bob, Chongqing, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
LOG.info("Step 4: Evolve schema with pt-osc - DROP COLUMN");
execInContainer(
PERCONA_TOOLKIT_CONTAINER,
"evolve schema",
"pt-online-schema-change",
"--user=" + TEST_USER,
"--host=" + INTER_CONTAINER_MYSQL_ALIAS,
"--password=" + TEST_PASSWORD,
"P=3306,t=customers,D=" + customerDatabase.getDatabaseName(),
"--alter",
"drop column ext",
"--charset=utf8",
"--recursion-method=NONE",
"--print",
"--execute");
try (Connection connection = customerDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"INSERT INTO `%s`.`customers` VALUES (10002, 'Cicada', 'Urumqi', '123567891234');",
customerDatabase.getDatabaseName()));
String[] expected =
new String[] {
"+I[10002, Cicada, Urumqi, 123567891234]",
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
}
}
private static void execInContainer(Container<?> container, String prompt, String... commands)
throws IOException, InterruptedException {
{
LOG.info(
"Starting to {} with the following command: `{}`",
prompt,
String.join(" ", commands));
Container.ExecResult execResult = container.execInContainer(commands);
if (execResult.getExitCode() == 0) {
LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout());
} else {
LOG.error(
"Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}",
prompt,
execResult.getExitCode(),
execResult.getStdout(),
execResult.getStderr());
throw new IOException("Failed to execute commands: " + String.join(" ", commands));
}
}
}
private List<Event> getSnapshotExpected(TableId tableId, Schema schema) {
return Stream.of(
generate(schema, 101, "user_1", "Shanghai", "123567891234"),
generate(schema, 102, "user_2", "Shanghai", "123567891234"),
generate(schema, 103, "user_3", "Shanghai", "123567891234"),
generate(schema, 109, "user_4", "Shanghai", "123567891234"),
generate(schema, 110, "user_5", "Shanghai", "123567891234"),
generate(schema, 111, "user_6", "Shanghai", "123567891234"),
generate(schema, 118, "user_7", "Shanghai", "123567891234"),
generate(schema, 121, "user_8", "Shanghai", "123567891234"),
generate(schema, 123, "user_9", "Shanghai", "123567891234"),
generate(schema, 1009, "user_10", "Shanghai", "123567891234"),
generate(schema, 1010, "user_11", "Shanghai", "123567891234"),
generate(schema, 1011, "user_12", "Shanghai", "123567891234"),
generate(schema, 1012, "user_13", "Shanghai", "123567891234"),
generate(schema, 1013, "user_14", "Shanghai", "123567891234"),
generate(schema, 1014, "user_15", "Shanghai", "123567891234"),
generate(schema, 1015, "user_16", "Shanghai", "123567891234"),
generate(schema, 1016, "user_17", "Shanghai", "123567891234"),
generate(schema, 1017, "user_18", "Shanghai", "123567891234"),
generate(schema, 1018, "user_19", "Shanghai", "123567891234"),
generate(schema, 1019, "user_20", "Shanghai", "123567891234"),
generate(schema, 2000, "user_21", "Shanghai", "123567891234"))
.map(record -> DataChangeEvent.insertEvent(tableId, record))
.collect(Collectors.toList());
}
private BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Arrays.stream(fields)
.map(
e ->
(e instanceof String)
? BinaryStringData.fromString((String) e)
: e)
.toArray());
}
private String getServerId() {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
return serverId + "-" + (serverId + env.getParallelism());
}
protected String getServerId(int base) {
return base + "-" + (base + DEFAULT_PARALLELISM);
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
private static void waitForSinkSize(String sinkName, int expectedSize)
throws InterruptedException {
while (sinkSize(sinkName) < expectedSize) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
}

@ -51,6 +51,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@ -127,7 +128,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -173,7 +175,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
"testCol",
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -215,7 +218,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -255,7 +259,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -311,7 +316,8 @@ public class MySqlTableSourceFactoryTest {
jdbcProperties,
Duration.ofMillis(15213),
"testCol",
true);
true,
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
assertTrue(actualSource instanceof MySqlTableSource);
MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource;
@ -365,7 +371,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -403,7 +410,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -442,7 +450,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -482,7 +491,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -520,7 +530,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
assertEquals(expectedSource, actualSource);
}
@ -563,7 +574,8 @@ public class MySqlTableSourceFactoryTest {
new Properties(),
HEARTBEAT_INTERVAL.defaultValue(),
null,
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue());
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

Loading…
Cancel
Save