[FLINK-36964][pipeline-connector/paimon] Fix potential exception when SchemaChange in parallel with Paimon Sink

This closes #3818

Co-authored-by: yuxiqian.yxq <yuxiqian.yxq@alibaba-inc.com>
pull/3784/head^2
Kunni 3 weeks ago committed by GitHub
parent 2fd03e683e
commit 75b8a0cdf3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -285,6 +285,19 @@ limitations under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

@ -24,12 +24,9 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordEventSerializer;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import java.time.ZoneId;
@ -71,12 +68,6 @@ public class PaimonDataSinkFactory implements DataSinkFactory {
Options options = Options.fromMap(catalogOptions);
// Avoid using previous table schema.
options.setString("cache-enabled", "false");
try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) {
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");
} catch (Exception e) {
throw new RuntimeException("failed to create or use paimon catalog", e);
}
ZoneId zoneId = ZoneId.systemDefault();
if (!Objects.equals(
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),

@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.FlushEventAlignmentOperator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
@ -63,7 +64,12 @@ public class PaimonEventSink extends PaimonSink<Event> implements WithPreWriteTo
// All Events after BucketAssignOperator are decorated with BucketWrapper.
.partitionCustom(
(bucket, numPartitions) -> bucket % numPartitions,
(event) -> ((BucketWrapper) event).getBucket());
(event) -> ((BucketWrapper) event).getBucket())
// Avoid disorder of FlushEvent and DataChangeEvent.
.transform(
"FlushEventAlignment",
new BucketWrapperEventTypeInfo(),
new FlushEventAlignmentOperator());
}
@Override

@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
@ -66,13 +67,14 @@ public class PaimonRecordEventSerializer implements PaimonRecordSerializer<Event
new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
} else {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
schemaMaps.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(
SchemaUtils.applySchemaChangeEvent(
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
schemaChangeEvent),
zoneId));
Schema schema = schemaMaps.get(schemaChangeEvent.tableId()).getSchema();
if (!SchemaUtils.isSchemaChangeEventRedundant(schema, schemaChangeEvent)) {
schemaMaps.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(
SchemaUtils.applySchemaChangeEvent(schema, schemaChangeEvent),
zoneId));
}
}
return new PaimonEvent(tableId, null, true);
} else if (event instanceof DataChangeEvent) {

@ -94,7 +94,10 @@ public class PaimonSink<InputT> implements WithPreCommitTopology<InputT, MultiTa
// add correct checkpointId to MultiTableCommittable and recreate CommittableSummary.
return partitioned
.transform("preCommit", typeInformation, new PreCommitOperator())
.transform(
"preCommit",
typeInformation,
new PreCommitOperator(catalogOptions, commitUser))
.setParallelism(committables.getParallelism());
}
}

@ -17,16 +17,25 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** An Operator to add checkpointId to MultiTableCommittable and generate CommittableSummary. */
@ -35,12 +44,23 @@ public class PreCommitOperator
implements OneInputStreamOperator<
CommittableMessage<MultiTableCommittable>,
CommittableMessage<MultiTableCommittable>> {
protected static final Logger LOGGER = LoggerFactory.getLogger(PreCommitOperator.class);
private final String commitUser;
private final Options catalogOptions;
private Catalog catalog;
private StoreMultiCommitter storeMultiCommitter;
/** store a list of MultiTableCommittable in one checkpoint. */
private final List<MultiTableCommittable> results;
private final List<MultiTableCommittable> multiTableCommittables;
public PreCommitOperator() {
results = new ArrayList<>();
public PreCommitOperator(Options catalogOptions, String commitUser) {
multiTableCommittables = new ArrayList<>();
this.catalogOptions = catalogOptions;
this.commitUser = commitUser;
}
@Override
@ -50,8 +70,16 @@ public class PreCommitOperator
@Override
public void processElement(StreamRecord<CommittableMessage<MultiTableCommittable>> element) {
if (catalog == null) {
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.storeMultiCommitter =
new StoreMultiCommitter(
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
Committer.createContext(
commitUser, getMetricGroup(), true, false, null));
}
if (element.getValue() instanceof CommittableWithLineage) {
results.add(
multiTableCommittables.add(
((CommittableWithLineage<MultiTableCommittable>) element.getValue())
.getCommittable());
}
@ -64,34 +92,44 @@ public class PreCommitOperator
@Override
public void prepareSnapshotPreBarrier(long checkpointId) {
// CommittableSummary should be sent before all CommittableWithLineage.
CommittableMessage<MultiTableCommittable> summary =
new CommittableSummary<>(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
checkpointId,
results.size(),
results.size(),
0);
output.collect(new StreamRecord<>(summary));
results.forEach(
committable -> {
// update the right checkpointId for MultiTableCommittable
MultiTableCommittable committableWithCheckPointId =
new MultiTableCommittable(
committable.getDatabase(),
committable.getTable(),
checkpointId,
committable.kind(),
committable.wrappedCommittable());
CommittableMessage<MultiTableCommittable> message =
new CommittableWithLineage<>(
committableWithCheckPointId,
checkpointId,
getRuntimeContext().getIndexOfThisSubtask());
output.collect(new StreamRecord<>(message));
});
results.clear();
for (int i = 0; i < multiTableCommittables.size(); i++) {
MultiTableCommittable multiTableCommittable = multiTableCommittables.get(i);
multiTableCommittables.set(
i,
new MultiTableCommittable(
multiTableCommittable.getDatabase(),
multiTableCommittable.getTable(),
checkpointId,
multiTableCommittable.kind(),
multiTableCommittable.wrappedCommittable()));
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
long checkpointId = context.getCheckpointId();
if (!multiTableCommittables.isEmpty()) {
multiTableCommittables.forEach(
(multiTableCommittable) ->
LOGGER.debug(
"Try to commit for {}.{} : {} in checkpoint {}",
multiTableCommittable.getDatabase(),
multiTableCommittable.getTable(),
multiTableCommittables,
checkpointId));
WrappedManifestCommittable wrappedManifestCommittable =
storeMultiCommitter.combine(checkpointId, checkpointId, multiTableCommittables);
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
multiTableCommittables.clear();
}
}
@Override
public void close() throws Exception {
super.close();
if (storeMultiCommitter != null) {
storeMultiCommitter.close();
}
}
}

@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
@ -53,6 +52,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.utils.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.HashMap;
@ -63,6 +64,8 @@ import java.util.Optional;
public class BucketAssignOperator extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
protected static final Logger LOGGER = LoggerFactory.getLogger(BucketAssignOperator.class);
public final String commitUser;
private final Options catalogOptions;
@ -99,8 +102,8 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event>
super.open();
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.bucketAssignerMap = new HashMap<>();
this.totalTasksNumber = getRuntimeContext().getNumberOfParallelSubtasks();
this.currentTaskNumber = getRuntimeContext().getIndexOfThisSubtask();
this.totalTasksNumber = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
this.currentTaskNumber = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.schemaMaps = new HashMap<>();
}
@ -121,13 +124,16 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event>
public void processElement(StreamRecord<Event> streamRecord) throws Exception {
Event event = streamRecord.getValue();
if (event instanceof FlushEvent) {
output.collect(
new StreamRecord<>(
new BucketWrapperFlushEvent(
currentTaskNumber,
((FlushEvent) event).getSourceSubTaskId(),
((FlushEvent) event).getTableIds(),
((FlushEvent) event).getSchemaChangeEventType())));
for (int i = 0; i < totalTasksNumber; i++) {
output.collect(
new StreamRecord<>(
new BucketWrapperFlushEvent(
i,
((FlushEvent) event).getSourceSubTaskId(),
currentTaskNumber,
((FlushEvent) event).getTableIds(),
((FlushEvent) event).getSchemaChangeEventType())));
}
return;
}
@ -181,24 +187,21 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event>
}
output.collect(
new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event)));
} else if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
schemaMaps.put(
createTableEvent.tableId(),
new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
output.collect(
new StreamRecord<>(
new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event)));
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
Schema schema =
SchemaUtils.applySchemaChangeEvent(
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
.map(TableSchemaInfo::getSchema)
.orElse(null),
schemaChangeEvent);
schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId));
output.collect(
new StreamRecord<>(
new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event)));
// Broadcast SchemachangeEvent.
for (int index = 0; index < totalTasksNumber; index++) {
output.collect(
new StreamRecord<>(
new BucketWrapperChangeEvent(index, (ChangeEvent) event)));
}
}
}

@ -87,6 +87,7 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event>
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucketAssignTaskId());
tableIdListSerializer.serialize(bucketWrapperFlushEvent.getTableIds(), dataOutputView);
schemaChangeEventTypeEnumSerializer.serialize(
bucketWrapperFlushEvent.getSchemaChangeEventType(), dataOutputView);
@ -98,6 +99,7 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton<Event>
EventClass eventClass = enumSerializer.deserialize(source);
if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
return new BucketWrapperFlushEvent(
source.readInt(),
source.readInt(),
source.readInt(),
tableIdListSerializer.deserialize(source),

@ -28,14 +28,21 @@ import java.util.Objects;
public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper {
private final int bucket;
private final int bucketAssignTaskId;
public BucketWrapperFlushEvent(
int bucket,
int subTaskId,
int sourceSubTaskId,
int bucketAssignTaskId,
List<TableId> tableIds,
SchemaChangeEventType schemaChangeEventType) {
super(subTaskId, tableIds, schemaChangeEventType);
super(sourceSubTaskId, tableIds, schemaChangeEventType);
this.bucket = bucket;
this.bucketAssignTaskId = bucketAssignTaskId;
}
public int getBucketAssignTaskId() {
return bucketAssignTaskId;
}
@Override
@ -55,18 +62,22 @@ public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper
return false;
}
BucketWrapperFlushEvent that = (BucketWrapperFlushEvent) o;
return bucket == that.bucket;
return bucket == that.bucket
&& bucketAssignTaskId == that.bucketAssignTaskId
&& getSourceSubTaskId() == that.getSourceSubTaskId();
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), bucket);
return Objects.hash(super.hashCode(), bucket, bucketAssignTaskId);
}
@Override
public String toString() {
return "BucketWrapperFlushEvent{subTaskId="
+ getSourceSubTaskId()
+ ", bucketAssignTaskId="
+ bucketAssignTaskId
+ ", bucket="
+ bucket
+ '}';

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/** Align {@link FlushEvent}s broadcasted by {@link BucketAssignOperator}. */
public class FlushEventAlignmentOperator extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
private transient int totalTasksNumber;
/**
* Key: subtask id of {@link SchemaOperator}, Value: subtask ids of {@link
* BucketAssignOperator}.
*/
private transient Map<Integer, Set<Integer>> sourceTaskIdToAssignBucketSubTaskIds;
private transient int currentSubTaskId;
public FlushEventAlignmentOperator() {
// It's necessary to avoid unpredictable outcomes of Event shuffling.
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
this.totalTasksNumber = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
this.currentSubTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
sourceTaskIdToAssignBucketSubTaskIds = new HashMap<>();
}
@Override
public void processElement(StreamRecord<Event> streamRecord) {
Event event = streamRecord.getValue();
if (event instanceof BucketWrapperFlushEvent) {
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
int sourceSubTaskId = bucketWrapperFlushEvent.getSourceSubTaskId();
Set<Integer> subTaskIds =
sourceTaskIdToAssignBucketSubTaskIds.getOrDefault(
sourceSubTaskId, new HashSet<>());
int subtaskId = bucketWrapperFlushEvent.getBucketAssignTaskId();
subTaskIds.add(subtaskId);
if (subTaskIds.size() == totalTasksNumber) {
LOG.info("{} send FlushEvent of {}", currentSubTaskId, sourceSubTaskId);
output.collect(
new StreamRecord<>(
new FlushEvent(
sourceSubTaskId,
bucketWrapperFlushEvent.getTableIds(),
bucketWrapperFlushEvent.getSchemaChangeEventType())));
sourceTaskIdToAssignBucketSubTaskIds.remove(sourceSubTaskId);
} else {
LOG.info(
"{} collect FlushEvent of {} with subtask {}",
currentSubTaskId,
sourceSubTaskId,
+subtaskId);
sourceTaskIdToAssignBucketSubTaskIds.put(sourceSubTaskId, subTaskIds);
}
} else {
output.collect(streamRecord);
}
}
}

@ -30,8 +30,11 @@ limitations under the License.
<properties>
<flink-1.19>1.19.1</flink-1.19>
<flink-1.20>1.20.0</flink-1.20>
<flink-major-1.19>1.19</flink-major-1.19>
<flink-major-1.20>1.20</flink-major-1.20>
<mysql.driver.version>8.0.27</mysql.driver.version>
<starrocks.connector.version>1.2.10_flink-${flink.major.version}</starrocks.connector.version>
<paimon.version>0.9.0</paimon.version>
</properties>
<dependencies>
@ -98,6 +101,19 @@ limitations under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-${flink.major.version}</artifactId>
<version>${paimon.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
@ -284,6 +300,52 @@ limitations under the License.
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-oceanbase</artifactId>
<version>${project.version}</version>
<destFileName>oceanbase-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
<version>${project.version}</version>
<destFileName>paimon-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.8.3-10.0</version>
<destFileName>flink-shade-hadoop.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-${flink-major-1.19}</artifactId>
<version>${paimon.version}</version>
<destFileName>paimon-sql-connector-${flink-1.19}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-${flink-major-1.20}</artifactId>
<version>${paimon.version}</version>
<destFileName>paimon-sql-connector-${flink-1.20}.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-udf-examples</artifactId>

@ -0,0 +1,349 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.test.utils.TestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** End-to-end tests for mysql cdc to Paimon pipeline job. */
public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class);
public static final Duration TESTCASE_TIMEOUT = Duration.ofMinutes(3);
// ------------------------------------------------------------------------------------------
// MySQL Variables (we always use MySQL as the data source for easier verifying)
// ------------------------------------------------------------------------------------------
protected static final String MYSQL_TEST_USER = "mysqluser";
protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer)
new MySqlContainer(
MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
.withConfigurationOverride("docker/mysql/my.cnf")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withNetwork(NETWORK)
.withNetworkAliases("mysql")
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL, "paimon_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
@BeforeClass
public static void initializeContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(MYSQL)).join();
LOG.info("Containers are started.");
}
@Before
public void before() throws Exception {
super.before();
inventoryDatabase.createAndInitialize();
jobManager.copyFileToContainer(
MountableFile.forHostPath(
TestUtils.getResource(getPaimonSQLConnectorResourceName())),
sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName());
jobManager.copyFileToContainer(
MountableFile.forHostPath(TestUtils.getResource("flink-shade-hadoop.jar")),
sharedVolume.toString() + "/flink-shade-hadoop.jar");
}
@After
public void after() {
super.after();
inventoryDatabase.dropDatabase();
}
@Test
public void testSyncWholeDatabase() throws Exception {
String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID();
String database = inventoryDatabase.getDatabaseName();
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: mysql\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.\\.*\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: paimon\n"
+ " catalog.properties.warehouse: %s\n"
+ " catalog.properties.metastore: filesystem\n"
+ " catalog.properties.cache-enabled: false\n"
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: evolve\n"
+ " parallelism: 4",
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, warehouse);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, paimonCdcConnector, mysqlDriverJar, hadoopJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
validateSinkResult(
warehouse,
database,
"products",
Arrays.asList(
"101, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null",
"102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null",
"103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null",
"104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null",
"105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null",
"106, Six, Ferris, 9.813, null, null, null",
"107, Seven, Grace, 2.117, null, null, null",
"108, Eight, Hesse, 6.819, null, null, null",
"109, Nine, IINA, 5.223, null, null, null"));
validateSinkResult(
warehouse,
database,
"customers",
Arrays.asList(
"101, user_1, Shanghai, 123567891234",
"102, user_2, Shanghai, 123567891234",
"103, user_3, Shanghai, 123567891234",
"104, user_4, Shanghai, 123567891234"));
LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(), MYSQL.getDatabasePort(), database);
List<String> recordsInIncrementalPhase;
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute(
"INSERT INTO products VALUES (default,'Ten','Jukebox',0.2, null, null, null);"); // 110
stat.execute("UPDATE products SET description='Fay' WHERE id=106;");
stat.execute("UPDATE products SET weight='5.125' WHERE id=107;");
// modify table schema
stat.execute("ALTER TABLE products DROP COLUMN point_c;");
stat.execute("DELETE FROM products WHERE id=101;");
stat.execute(
"INSERT INTO products VALUES (default,'Eleven','Kryo',5.18, null, null);"); // 111
stat.execute(
"INSERT INTO products VALUES (default,'Twelve', 'Lily', 2.14, null, null);"); // 112
recordsInIncrementalPhase = createChangesAndValidate(stat);
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}
List<String> recordsInSnapshotPhase =
new ArrayList<>(
Arrays.asList(
"102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, null, null, null, null, null, null, null, null, null",
"103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, null, null, null, null, null, null, null, null, null",
"104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, null, null, null, null, null, null, null, null, null",
"105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null",
"106, Six, Fay, 9.813, null, null, null, null, null, null, null, null, null, null, null, null",
"107, Seven, Grace, 5.125, null, null, null, null, null, null, null, null, null, null, null, null",
"108, Eight, Hesse, 6.819, null, null, null, null, null, null, null, null, null, null, null, null",
"109, Nine, IINA, 5.223, null, null, null, null, null, null, null, null, null, null, null, null",
"110, Ten, Jukebox, 0.2, null, null, null, null, null, null, null, null, null, null, null, null",
"111, Eleven, Kryo, 5.18, null, null, null, null, null, null, null, null, null, null, null, null",
"112, Twelve, Lily, 2.14, null, null, null, null, null, null, null, null, null, null, null, null"));
recordsInSnapshotPhase.addAll(recordsInIncrementalPhase);
validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase);
}
/**
* Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512),
* weight FLOAT, enum_c enum('red', 'white'), json_c JSON.
*/
private List<String> createChangesAndValidate(Statement stat) throws SQLException {
List<String> result = new ArrayList<>();
StringBuilder sqlFields = new StringBuilder();
// Add Column.
for (int addColumnRepeat = 0; addColumnRepeat < 10; addColumnRepeat++) {
stat.execute(
String.format(
"ALTER TABLE products ADD COLUMN point_c_%s VARCHAR(10);",
addColumnRepeat));
sqlFields.append(", '1'");
StringBuilder resultFields = new StringBuilder();
for (int j = 0; j < 10; j++) {
if (j <= addColumnRepeat) {
resultFields.append(", 1");
} else {
resultFields.append(", null");
}
}
for (int j = 0; j < 1000; j++) {
stat.addBatch(
String.format(
"INSERT INTO products VALUES (default,'finally', null, 2.14, null, null %s);",
sqlFields));
int id = addColumnRepeat * 1000 + j + 113;
result.add(
String.format("%s, finally, null, 2.14, null, null%s", id, resultFields));
}
stat.executeBatch();
}
// Modify Column type.
for (int modifyColumnRepeat = 0; modifyColumnRepeat < 10; modifyColumnRepeat++) {
for (int j = 0; j < 1000; j++) {
stat.addBatch(
String.format(
"INSERT INTO products VALUES (default,'finally', null, 2.14, null, null %s);",
sqlFields));
int id = modifyColumnRepeat * 1000 + j + 10113;
result.add(
String.format(
"%s, finally, null, 2.14, null, null%s",
id, ", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1"));
}
stat.executeBatch();
stat.execute(
String.format(
"ALTER TABLE products MODIFY point_c_0 VARCHAR(%s);",
10 + modifyColumnRepeat));
}
return result;
}
private List<String> fetchPaimonTableRows(String warehouse, String database, String table)
throws Exception {
String template =
readLines("docker/peek-paimon.sql").stream()
.filter(line -> !line.startsWith("--"))
.collect(Collectors.joining("\n"));
String sql = String.format(template, warehouse, database, table);
String containerSqlPath = sharedVolume.toString() + "/peek.sql";
jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath);
Container.ExecResult result =
jobManager.execInContainer(
"/opt/flink/bin/sql-client.sh",
"--jar",
sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName(),
"--jar",
sharedVolume.toString() + "/flink-shade-hadoop.jar",
"-f",
containerSqlPath);
if (result.getExitCode() != 0) {
throw new RuntimeException(
"Failed to execute peek script. Stdout: "
+ result.getStdout()
+ "; Stderr: "
+ result.getStderr());
}
return Arrays.stream(result.getStdout().split("\n"))
.filter(line -> line.startsWith("|"))
.skip(1)
.map(MySqlToPaimonE2eITCase::extractRow)
.map(row -> String.format("%s", String.join(", ", row)))
.collect(Collectors.toList());
}
private static String[] extractRow(String row) {
return Arrays.stream(row.split("\\|"))
.map(String::trim)
.filter(col -> !col.isEmpty())
.map(col -> col.equals("<NULL>") ? "null" : col)
.toArray(String[]::new);
}
private void validateSinkResult(
String warehouse, String database, String table, List<String> expected)
throws InterruptedException {
LOG.info("Verifying Paimon {}::{}::{} results...", warehouse, database, table);
long deadline = System.currentTimeMillis() + TESTCASE_TIMEOUT.toMillis();
List<String> results = Collections.emptyList();
while (System.currentTimeMillis() < deadline) {
try {
results = fetchPaimonTableRows(warehouse, database, table);
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
LOG.info(
"Successfully verified {} records in {} seconds.",
expected.size(),
(System.currentTimeMillis() - deadline + TESTCASE_TIMEOUT.toMillis())
/ 1000);
return;
} catch (Exception e) {
LOG.warn("Validate failed, waiting for the next loop...", e);
} catch (AssertionError ignored) {
// AssertionError contains way too much records and might flood the log output.
LOG.warn(
"Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...",
expected.size(),
results.size());
}
Thread.sleep(1000L);
}
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}
protected String getPaimonSQLConnectorResourceName() {
return String.format("paimon-sql-connector-%s.jar", flinkVersion);
}
}

@ -29,6 +29,9 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.Version;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.model.Volume;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -38,16 +41,22 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.FrameConsumerResultCallback;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.ToStringConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
@ -87,6 +96,23 @@ public abstract class PipelineTestEnvironment extends TestLogger {
public static final int JOB_MANAGER_REST_PORT = 8081;
public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
public static final List<String> EXTERNAL_PROPS =
Arrays.asList(
String.format("jobmanager.rpc.address: %s", INTER_CONTAINER_JM_ALIAS),
"jobmanager.bind-host: 0.0.0.0",
"taskmanager.bind-host: 0.0.0.0",
"rest.bind-address: 0.0.0.0",
"rest.address: 0.0.0.0",
"jobmanager.memory.process.size: 1GB",
"query.server.port: 6125",
"blob.server.port: 6124",
"taskmanager.numberOfTaskSlots: 10",
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
"state.backend.type: hashmap",
"env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false",
"restart-strategy.type: off");
public static final String FLINK_PROPERTIES = String.join("\n", EXTERNAL_PROPS);
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ -95,6 +121,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
@Nullable protected RestClusterClient<StandaloneClusterId> restClusterClient;
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
protected Volume sharedVolume = new Volume("/tmp/shared");
protected ToStringConsumer jobManagerConsumer;
@ -114,30 +141,32 @@ public abstract class PipelineTestEnvironment extends TestLogger {
public void before() throws Exception {
LOG.info("Starting containers...");
jobManagerConsumer = new ToStringConsumer();
String flinkProperties = getFlinkProperties();
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume))
.withLogConsumer(jobManagerConsumer);
Startables.deepStart(Stream.of(jobManager)).join();
runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString());
LOG.info("JobManager is started.");
taskManagerConsumer = new ToStringConsumer();
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
.withVolumesFrom(jobManager, BindMode.READ_WRITE)
.withLogConsumer(taskManagerConsumer);
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
LOG.info("Containers are started.");
runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString());
LOG.info("TaskManager is started.");
}
@After
@ -279,4 +308,28 @@ public abstract class PipelineTestEnvironment extends TestLogger {
"execution.checkpointing.interval: 300",
"env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false"));
}
private void runInContainerAsRoot(GenericContainer<?> container, String... command)
throws InterruptedException {
ToStringConsumer stdoutConsumer = new ToStringConsumer();
ToStringConsumer stderrConsumer = new ToStringConsumer();
DockerClient dockerClient = DockerClientFactory.instance().client();
ExecCreateCmdResponse execCreateCmdResponse =
dockerClient
.execCreateCmd(container.getContainerId())
.withUser("root")
.withCmd(command)
.exec();
FrameConsumerResultCallback callback = new FrameConsumerResultCallback();
callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer);
callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer);
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion();
}
protected List<String> readLines(String resource) throws IOException {
final URL url = PipelineTestEnvironment.class.getClassLoader().getResource(resource);
assert url != null;
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
}
}

@ -0,0 +1,55 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: mysql_inventory
-- ----------------------------------------------------------------------------------------------------------------
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT,
enum_c enum('red', 'white') default 'red', -- test some complex types as well,
json_c JSON, -- because we use additional dependencies to deserialize complex types.
point_c POINT
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"One", "Alice", 3.202, 'red', '{"key1": "value1"}', null),
(default,"Two", "Bob", 1.703, 'white', '{"key2": "value2"}', null),
(default,"Three", "Cecily", 4.105, 'red', '{"key3": "value3"}', null),
(default,"Four", "Derrida", 1.857, 'white', '{"key4": "value4"}', null),
(default,"Five", "Evelyn", 5.211, 'red', '{"K": "V", "k": "v"}', null),
(default,"Six", "Ferris", 9.813, null, null, null),
(default,"Seven", "Grace", 2.117, null, null, null),
(default,"Eight", "Hesse", 6.819, null, null, null),
(default,"Nine", "IINA", 5.223, null, null, null);
-- Create and populate our customers using a single insert with many rows
CREATE TABLE customers (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);
INSERT INTO customers
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(104,"user_4","Shanghai","123567891234");

@ -0,0 +1,28 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- Format this file with the following arguments:
-- Warehouse Path, Database Name, and Table Name.
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'table.display.max-column-width' = '100000';
SET 'execution.runtime-mode' = 'batch';
CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = '%s'
);
SELECT * FROM paimon_catalog.%s.%s;
Loading…
Cancel
Save