[FLINK-35143][pipeline-connector][mysql] Expose newly added tables capture in mysql pipeline connector. (#3411)

Co-authored-by: Muhammet Orazov <916295+morazow@users.noreply.github.com>
Co-authored-by: north.lin <north.lin@yunlsp.com>
pull/3522/head
North Lin 6 months ago committed by GitHub
parent b937db25d5
commit 09f36a4a05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -268,6 +268,13 @@ pipeline:
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
</tbody>
</table>
</div>

@ -275,6 +275,13 @@ pipeline:
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
</tbody>
</table>
</div>

@ -61,6 +61,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
@ -123,6 +124,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@ -158,7 +160,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
.closeIdleReaders(closeIdleReaders)
.includeSchemaChanges(includeSchemaChanges)
.debeziumProperties(getDebeziumProperties(configMap))
.jdbcProperties(getJdbcProperties(configMap));
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
@ -216,7 +219,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);

@ -224,6 +224,14 @@ public class MySqlDataSourceOptions {
+ "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be "
+ "greater than or equal to 1.14 when enabling this feature.");
@Experimental
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to scan the newly added tables or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.");
@Experimental
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")

@ -0,0 +1,582 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId;
import static org.assertj.core.api.Assertions.assertThat;
/** IT tests to cover various newly added tables during capture process in pipeline mode. */
public class MysqlPipelineNewlyAddedTableITCase extends MySqlSourceTestBase {
private final UniqueDatabase customDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
private final ScheduledExecutorService mockBinlogExecutor = Executors.newScheduledThreadPool(1);
@Before
public void before() throws SQLException {
TestValuesTableFactory.clearAllData();
customDatabase.createAndInitialize();
try (MySqlConnection connection = getConnection()) {
connection.setAutoCommit(false);
// prepare initial data for given table
String tableId = customDatabase.getDatabaseName() + ".produce_binlog_table";
connection.execute(
format("CREATE TABLE %s ( id BIGINT PRIMARY KEY, cnt BIGINT);", tableId));
connection.execute(
format("INSERT INTO %s VALUES (0, 100), (1, 101), (2, 102);", tableId));
connection.commit();
// mock continuous binlog during the newly added table capturing process
mockBinlogExecutor.schedule(
() -> {
try {
connection.execute(
format("UPDATE %s SET cnt = cnt +1 WHERE id < 2;", tableId));
connection.commit();
} catch (SQLException e) {
e.printStackTrace();
}
},
500,
TimeUnit.MICROSECONDS);
}
}
@After
public void after() {
mockBinlogExecutor.shutdown();
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", customDatabase.getUsername());
properties.put("database.password", customDatabase.getPassword());
properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}
@Test
public void testAddNewTableOneByOneSingleParallelism() throws Exception {
TestParam testParam =
TestParam.newBuilder(
Collections.singletonList("address_hangzhou"),
4,
Arrays.asList("address_hangzhou", "address_beijing"),
4)
.setFirstRoundInitTables(
Arrays.asList("address_hangzhou", "address_beijing"))
.build();
testAddNewTable(testParam, 1);
}
@Test
public void testAddNewTableOneByOne() throws Exception {
TestParam testParam =
TestParam.newBuilder(
Collections.singletonList("address_hangzhou"),
4,
Arrays.asList("address_hangzhou", "address_beijing"),
4)
.setFirstRoundInitTables(
Arrays.asList("address_hangzhou", "address_beijing"))
.build();
testAddNewTable(testParam, DEFAULT_PARALLELISM);
}
@Test
public void testAddNewTableByPatternSingleParallelism() throws Exception {
TestParam testParam =
TestParam.newBuilder(
Collections.singletonList("address_\\.*"),
8,
Collections.singletonList("address_\\.*"),
8)
.setFirstRoundInitTables(
Arrays.asList("address_hangzhou", "address_beijing"))
.setSecondRoundInitTables(
Arrays.asList("address_shanghai", "address_suzhou"))
.build();
testAddNewTable(testParam, 1);
}
@Test
public void testAddNewTableByPattern() throws Exception {
TestParam testParam =
TestParam.newBuilder(
Collections.singletonList("address_\\.*"),
8,
Collections.singletonList("address_\\.*"),
12)
.setFirstRoundInitTables(
Arrays.asList("address_hangzhou", "address_beijing"))
.setSecondRoundInitTables(
Arrays.asList(
"address_shanghai", "address_suzhou", "address_shenzhen"))
.build();
testAddNewTable(testParam, DEFAULT_PARALLELISM);
}
private void testAddNewTable(TestParam testParam, int parallelism) throws Exception {
// step 1: create mysql tables
if (CollectionUtils.isNotEmpty(testParam.getFirstRoundInitTables())) {
initialAddressTables(getConnection(), testParam.getFirstRoundInitTables());
}
Path savepointDir = Files.createTempDirectory("add-new-table-test");
final String savepointDirectory = savepointDir.toAbsolutePath().toString();
String finishedSavePointPath = null;
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
// step 2: listen tables first time
List<String> listenTablesFirstRound = testParam.getFirstRoundListenTables();
FlinkSourceProvider sourceProvider =
getFlinkSourceProvider(listenTablesFirstRound, parallelism);
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo());
TypeSerializer<Event> serializer =
source.getTransformation().getOutputType().createSerializer(env.getConfig());
CheckpointedCollectResultBuffer<Event> resultBuffer =
new CheckpointedCollectResultBuffer<>(serializer);
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(env, source, resultBuffer, serializer, accumulatorName);
JobClient jobClient = env.executeAsync("beforeAddNewTable");
iterator.setJobClient(jobClient);
List<Event> actual = fetchResults(iterator, testParam.getFirstRoundFetchSize());
Optional<String> listenByPattern =
listenTablesFirstRound.stream()
.filter(table -> StringUtils.contains(table, "\\.*"))
.findAny();
multiAssert(
actual,
listenByPattern.isPresent()
? testParam.getFirstRoundInitTables()
: listenTablesFirstRound);
// step 3: create new tables if needed
if (CollectionUtils.isNotEmpty(testParam.getSecondRoundInitTables())) {
initialAddressTables(getConnection(), testParam.getSecondRoundInitTables());
}
// step 4: trigger a savepoint and cancel the job
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
iterator.close();
// step 5: restore from savepoint
StreamExecutionEnvironment restoredEnv =
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
List<String> listenTablesSecondRound = testParam.getSecondRoundListenTables();
FlinkSourceProvider restoredSourceProvider =
getFlinkSourceProvider(listenTablesSecondRound, parallelism);
DataStreamSource<Event> restoreSource =
restoredEnv.fromSource(
restoredSourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo());
CollectResultIterator<Event> restoredIterator =
addCollector(restoredEnv, restoreSource, resultBuffer, serializer, accumulatorName);
JobClient restoreClient = restoredEnv.executeAsync("AfterAddNewTable");
List<String> newlyAddTables =
listenTablesSecondRound.stream()
.filter(table -> !listenTablesFirstRound.contains(table))
.collect(Collectors.toList());
// it means listen by pattern when newlyAddTables is empty
if (CollectionUtils.isEmpty(newlyAddTables)) {
newlyAddTables = testParam.getSecondRoundInitTables();
}
List<Event> newlyTableEvent =
fetchResults(restoredIterator, testParam.getSecondRoundFetchSize());
multiAssert(newlyTableEvent, newlyAddTables);
restoreClient.cancel().get();
restoredIterator.close();
}
private void multiAssert(List<Event> actualEvents, List<String> listenTables) {
List<Event> expectedCreateTableEvents = new ArrayList<>();
List<Event> expectedDataChangeEvents = new ArrayList<>();
for (String table : listenTables) {
expectedCreateTableEvents.add(
getCreateTableEvent(TableId.tableId(customDatabase.getDatabaseName(), table)));
expectedDataChangeEvents.addAll(
getSnapshotExpected(TableId.tableId(customDatabase.getDatabaseName(), table)));
}
// compare create table events
List<Event> actualCreateTableEvents =
actualEvents.stream()
.filter(event -> event instanceof CreateTableEvent)
.collect(Collectors.toList());
assertThat(actualCreateTableEvents)
.containsExactlyInAnyOrder(expectedCreateTableEvents.toArray(new Event[0]));
// compare data change events
List<Event> actualDataChangeEvents =
actualEvents.stream()
.filter(event -> event instanceof DataChangeEvent)
.collect(Collectors.toList());
assertThat(actualDataChangeEvents)
.containsExactlyInAnyOrder(expectedDataChangeEvents.toArray(new Event[0]));
}
private CreateTableEvent getCreateTableEvent(TableId tableId) {
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.BIGINT().notNull())
.physicalColumn("country", DataTypes.VARCHAR(255).notNull())
.physicalColumn("city", DataTypes.VARCHAR(255).notNull())
.physicalColumn("detail_address", DataTypes.VARCHAR(1024))
.primaryKey(Collections.singletonList("id"))
.build();
return new CreateTableEvent(tableId, schema);
}
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.BIGINT().notNull(),
DataTypes.VARCHAR(255).notNull(),
DataTypes.VARCHAR(255).notNull(),
DataTypes.VARCHAR(1024)
},
new String[] {"id", "country", "city", "detail_address"});
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
String cityName = tableId.getTableName().split("_")[1];
return Arrays.asList(
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
416874195632735147L,
BinaryStringData.fromString("China"),
BinaryStringData.fromString(cityName),
BinaryStringData.fromString(cityName + " West Town address 1")
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
416927583791428523L,
BinaryStringData.fromString("China"),
BinaryStringData.fromString(cityName),
BinaryStringData.fromString(cityName + " West Town address 2")
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
417022095255614379L,
BinaryStringData.fromString("China"),
BinaryStringData.fromString(cityName),
BinaryStringData.fromString(cityName + " West Town address 3")
})));
}
private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory)
throws ExecutionException, InterruptedException {
int retryTimes = 0;
// retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute
while (retryTimes < 600) {
try {
return jobClient.triggerSavepoint(savepointDirectory).get();
} catch (Exception e) {
Optional<CheckpointException> exception =
ExceptionUtils.findThrowable(e, CheckpointException.class);
if (exception.isPresent()
&& exception.get().getMessage().contains("Checkpoint triggering task")) {
Thread.sleep(100);
retryTimes++;
} else {
throw e;
}
}
}
return null;
}
private void initialAddressTables(JdbcConnection connection, List<String> addressTables)
throws SQLException {
try {
connection.setAutoCommit(false);
for (String tableName : addressTables) {
// make initial data for given table
String tableId = customDatabase.getDatabaseName() + "." + tableName;
String cityName = tableName.split("_")[1];
connection.execute(
"CREATE TABLE IF NOT EXISTS "
+ tableId
+ "("
+ " id BIGINT NOT NULL PRIMARY KEY,"
+ " country VARCHAR(255) NOT NULL,"
+ " city VARCHAR(255) NOT NULL,"
+ " detail_address VARCHAR(1024)"
+ ");");
connection.execute(
format(
"INSERT INTO %s "
+ "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),"
+ " (416927583791428523, 'China', '%s', '%s West Town address 2'),"
+ " (417022095255614379, 'China', '%s', '%s West Town address 3');",
tableId, cityName, cityName, cityName, cityName, cityName,
cityName));
}
connection.commit();
} finally {
connection.close();
}
}
private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, int parallelism) {
List<String> fullTableNames =
tables.stream()
.map(table -> customDatabase.getDatabaseName() + "." + table)
.collect(Collectors.toList());
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(SERVER_TIME_ZONE.key(), "UTC");
options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
options.put(SERVER_ID.key(), getServerId(parallelism));
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
Factory.Context context =
new FactoryHelper.DefaultContext(
org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),
null,
this.getClass().getClassLoader());
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
return (FlinkSourceProvider) dataSource.getEventSourceProvider();
}
private <T> CollectResultIterator<T> addCollector(
StreamExecutionEnvironment env,
DataStreamSource<T> source,
AbstractCollectResultBuffer<T> buffer,
TypeSerializer<T> serializer,
String accumulatorName) {
CollectSinkOperatorFactory<T> sinkFactory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) sinkFactory.getOperator();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
buffer, operator.getOperatorIdFuture(), accumulatorName, 0);
CollectStreamSink<T> sink = new CollectStreamSink<>(source, sinkFactory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
env.registerCollectIterator(iterator);
return iterator;
}
private StreamExecutionEnvironment getStreamExecutionEnvironment(
String finishedSavePointPath, int parallelism) {
Configuration configuration = new Configuration();
if (finishedSavePointPath != null) {
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setParallelism(parallelism);
env.enableCheckpointing(500L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L));
return env;
}
private static class TestParam {
private final List<String> firstRoundInitTables;
private final List<String> firstRoundListenTables;
private final Integer firstRoundFetchSize;
private final List<String> secondRoundInitTables;
private final List<String> secondRoundListenTables;
private final Integer secondRoundFetchSize;
private TestParam(Builder builder) {
this.firstRoundInitTables = builder.firstRoundInitTables;
this.firstRoundListenTables = builder.firstRoundListenTables;
this.firstRoundFetchSize = builder.firstRoundFetchSize;
this.secondRoundInitTables = builder.secondRoundInitTables;
this.secondRoundListenTables = builder.secondRoundListenTables;
this.secondRoundFetchSize = builder.secondRoundFetchSize;
}
public static Builder newBuilder(
List<String> firstRoundListenTables,
Integer firstRoundFetchSize,
List<String> secondRoundListenTables,
Integer secondRoundFetchSize) {
return new Builder(
firstRoundListenTables,
firstRoundFetchSize,
secondRoundListenTables,
secondRoundFetchSize);
}
public static class Builder {
private List<String> firstRoundInitTables;
private final List<String> firstRoundListenTables;
private final Integer firstRoundFetchSize;
private List<String> secondRoundInitTables;
private final List<String> secondRoundListenTables;
private final Integer secondRoundFetchSize;
public Builder(
List<String> firstRoundListenTables,
Integer firstRoundFetchSize,
List<String> secondRoundListenTables,
Integer secondRoundFetchSize) {
this.firstRoundListenTables = firstRoundListenTables;
this.firstRoundFetchSize = firstRoundFetchSize;
this.secondRoundListenTables = secondRoundListenTables;
this.secondRoundFetchSize = secondRoundFetchSize;
}
public TestParam build() {
return new TestParam(this);
}
public Builder setFirstRoundInitTables(List<String> firstRoundInitTables) {
this.firstRoundInitTables = firstRoundInitTables;
return this;
}
public Builder setSecondRoundInitTables(List<String> secondRoundInitTables) {
this.secondRoundInitTables = secondRoundInitTables;
return this;
}
}
public List<String> getFirstRoundInitTables() {
return firstRoundInitTables;
}
public List<String> getFirstRoundListenTables() {
return firstRoundListenTables;
}
public Integer getFirstRoundFetchSize() {
return firstRoundFetchSize;
}
public List<String> getSecondRoundInitTables() {
return secondRoundInitTables;
}
public List<String> getSecondRoundListenTables() {
return secondRoundListenTables;
}
public Integer getSecondRoundFetchSize() {
return secondRoundFetchSize;
}
}
}

@ -0,0 +1,328 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: customer
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our users using a single insert with many rows
CREATE TABLE customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- Create a table will not be read
CREATE TABLE prefix_customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO prefix_customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234");
-- table has same name prefix with 'customers.*'
CREATE TABLE customers_1 (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_1
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- create table whose split key is evenly distributed
CREATE TABLE customers_even_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_even_dist
VALUES (101,'user_1','Shanghai','123567891234'),
(102,'user_2','Shanghai','123567891234'),
(103,'user_3','Shanghai','123567891234'),
(104,'user_4','Shanghai','123567891234'),
(105,'user_5','Shanghai','123567891234'),
(106,'user_6','Shanghai','123567891234'),
(107,'user_7','Shanghai','123567891234'),
(108,'user_8','Shanghai','123567891234'),
(109,'user_9','Shanghai','123567891234'),
(110,'user_10','Shanghai','123567891234');
-- create table whose split key is evenly distributed and sparse
CREATE TABLE customers_sparse_dist (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_sparse_dist
VALUES (2,'user_1','Shanghai','123567891234'),
(4,'user_2','Shanghai','123567891234'),
(6,'user_3','Shanghai','123567891234'),
(8,'user_4','Shanghai','123567891234'),
(10,'user_5','Shanghai','123567891234'),
(16,'user_6','Shanghai','123567891234'),
(17,'user_7','Shanghai','123567891234'),
(18,'user_8','Shanghai','123567891234'),
(20,'user_9','Shanghai','123567891234'),
(22,'user_10','Shanghai','123567891234');
-- create table whose split key is evenly distributed and dense
CREATE TABLE customers_dense_dist (
id1 INTEGER NOT NULL,
id2 VARCHAR(255) NOT NULL ,
address VARCHAR(1024),
phone_number VARCHAR(512),
PRIMARY KEY(id1, id2)
);
INSERT INTO customers_dense_dist
VALUES (1,'user_1','Shanghai','123567891234'),
(1,'user_2','Shanghai','123567891234'),
(1,'user_3','Shanghai','123567891234'),
(1,'user_4','Shanghai','123567891234'),
(2,'user_5','Shanghai','123567891234'),
(2,'user_6','Shanghai','123567891234'),
(2,'user_7','Shanghai','123567891234'),
(3,'user_8','Shanghai','123567891234'),
(3,'user_9','Shanghai','123567891234'),
(3,'user_10','Shanghai','123567891234');
CREATE TABLE customers_no_pk (
id INTEGER,
name VARCHAR(255) DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers_no_pk
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(109,"user_4","Shanghai","123567891234"),
(110,"user_5","Shanghai","123567891234"),
(111,"user_6","Shanghai","123567891234"),
(118,"user_7","Shanghai","123567891234"),
(121,"user_8","Shanghai","123567891234"),
(123,"user_9","Shanghai","123567891234"),
(1009,"user_10","Shanghai","123567891234"),
(1010,"user_11","Shanghai","123567891234"),
(1011,"user_12","Shanghai","123567891234"),
(1012,"user_13","Shanghai","123567891234"),
(1013,"user_14","Shanghai","123567891234"),
(1014,"user_15","Shanghai","123567891234"),
(1015,"user_16","Shanghai","123567891234"),
(1016,"user_17","Shanghai","123567891234"),
(1017,"user_18","Shanghai","123567891234"),
(1018,"user_19","Shanghai","123567891234"),
(1019,"user_20","Shanghai","123567891234"),
(2000,"user_21","Shanghai","123567891234");
-- table has combined primary key
CREATE TABLE customer_card (
card_no BIGINT NOT NULL,
level VARCHAR(10) NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
note VARCHAR(1024),
PRIMARY KEY(card_no, level)
);
insert into customer_card
VALUES (20001, 'LEVEL_4', 'user_1', 'user with level 4'),
(20002, 'LEVEL_4', 'user_2', 'user with level 4'),
(20003, 'LEVEL_4', 'user_3', 'user with level 4'),
(20004, 'LEVEL_4', 'user_4', 'user with level 4'),
(20004, 'LEVEL_1', 'user_4', 'user with level 4'),
(20004, 'LEVEL_2', 'user_4', 'user with level 4'),
(20004, 'LEVEL_3', 'user_4', 'user with level 4'),
(30006, 'LEVEL_3', 'user_5', 'user with level 3'),
(30007, 'LEVEL_3', 'user_6', 'user with level 3'),
(30008, 'LEVEL_3', 'user_7', 'user with level 3'),
(30009, 'LEVEL_3', 'user_8', 'user with level 3'),
(30009, 'LEVEL_2', 'user_8', 'user with level 3'),
(30009, 'LEVEL_1', 'user_8', 'user with level 3'),
(40001, 'LEVEL_2', 'user_9', 'user with level 2'),
(40002, 'LEVEL_2', 'user_10', 'user with level 2'),
(40003, 'LEVEL_2', 'user_11', 'user with level 2'),
(50001, 'LEVEL_1', 'user_12', 'user with level 1'),
(50002, 'LEVEL_1', 'user_13', 'user with level 1'),
(50003, 'LEVEL_1', 'user_14', 'user with level 1');
-- table has single line
CREATE TABLE customer_card_single_line (
card_no BIGINT NOT NULL,
level VARCHAR(10) NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
note VARCHAR(1024),
PRIMARY KEY(card_no, level)
);
insert into customer_card_single_line
VALUES (20001, 'LEVEL_1', 'user_1', 'user with level 1');
-- table has combined primary key
CREATE TABLE shopping_cart (
product_no INT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(user_id, product_no, product_kind)
);
insert into shopping_cart
VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
(101, 'KIND_002', 'user_1', 'my shopping cart'),
(102, 'KIND_007', 'user_1', 'my shopping cart'),
(102, 'KIND_008', 'user_1', 'my shopping cart'),
(501, 'KIND_100', 'user_2', 'my shopping list'),
(701, 'KIND_999', 'user_3', 'my shopping list'),
(801, 'KIND_010', 'user_4', 'my shopping list'),
(600, 'KIND_009', 'user_4', 'my shopping list'),
(401, 'KIND_002', 'user_5', 'leo list'),
(401, 'KIND_007', 'user_5', 'leo list'),
(404, 'KIND_008', 'user_5', 'leo list'),
(600, 'KIND_009', 'user_6', 'my shopping cart');
-- table has combined primary key and one of the primary key is evenly
CREATE TABLE evenly_shopping_cart (
product_no INT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(product_kind, product_no, user_id)
);
insert into evenly_shopping_cart
VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
(102, 'KIND_002', 'user_1', 'my shopping cart'),
(103, 'KIND_007', 'user_1', 'my shopping cart'),
(104, 'KIND_008', 'user_1', 'my shopping cart'),
(105, 'KIND_100', 'user_2', 'my shopping list'),
(105, 'KIND_999', 'user_3', 'my shopping list'),
(107, 'KIND_010', 'user_4', 'my shopping list'),
(108, 'KIND_009', 'user_4', 'my shopping list'),
(109, 'KIND_002', 'user_5', 'leo list'),
(111, 'KIND_007', 'user_5', 'leo list'),
(111, 'KIND_008', 'user_5', 'leo list'),
(112, 'KIND_009', 'user_6', 'my shopping cart');
-- table has bigint unsigned auto increment primary key
CREATE TABLE shopping_cart_big (
product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(product_no)
);
insert into shopping_cart_big
VALUES (default, 'KIND_001', 'user_1', 'my shopping cart'),
(default, 'KIND_002', 'user_1', 'my shopping cart'),
(default, 'KIND_003', 'user_1', 'my shopping cart');
-- table has decimal primary key
CREATE TABLE shopping_cart_dec (
product_no DECIMAL(10, 4) NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) DEFAULT 'flink',
PRIMARY KEY(product_no)
);
insert into shopping_cart_dec
VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
(123457.456, 'KIND_002', 'user_2', 'my shopping cart'),
(123458.6789, 'KIND_003', 'user_3', 'my shopping cart'),
(123459.1234, 'KIND_004', 'user_4', null);
-- create table whose primary key are produced by snowflake algorithm
CREATE TABLE address (
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
country VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
detail_address VARCHAR(1024)
);
INSERT INTO address
VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'),
(416927583791428523, 'China', 'Beijing', 'West Town address 2'),
(417022095255614379, 'China', 'Beijing', 'West Town address 3'),
(417111867899200427, 'America', 'New York', 'East Town address 1'),
(417271541558096811, 'America', 'New York', 'East Town address 2'),
(417272886855938987, 'America', 'New York', 'East Town address 3'),
(417420106184475563, 'Germany', 'Berlin', 'West Town address 1'),
(418161258277847979, 'Germany', 'Berlin', 'West Town address 2');
CREATE TABLE default_value_test (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number INTEGER DEFAULT ' 123 '
);
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);
Loading…
Cancel
Save