diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index 940bbc0ac..50d27ebcc 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -285,6 +285,19 @@ limitations under the License.
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
index 388658fe6..a165946d4 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java
@@ -24,12 +24,9 @@ import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
-import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordEventSerializer;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.options.Options;
import java.time.ZoneId;
@@ -71,12 +68,6 @@ public class PaimonDataSinkFactory implements DataSinkFactory {
Options options = Options.fromMap(catalogOptions);
// Avoid using previous table schema.
options.setString("cache-enabled", "false");
- try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) {
- Preconditions.checkNotNull(
- catalog.listDatabases(), "catalog option of Paimon is invalid.");
- } catch (Exception e) {
- throw new RuntimeException("failed to create or use paimon catalog", e);
- }
ZoneId zoneId = ZoneId.systemDefault();
if (!Objects.equals(
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
index d6837915f..ee789233f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.FlushEventAlignmentOperator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -63,7 +64,12 @@ public class PaimonEventSink extends PaimonSink implements WithPreWriteTo
// All Events after BucketAssignOperator are decorated with BucketWrapper.
.partitionCustom(
(bucket, numPartitions) -> bucket % numPartitions,
- (event) -> ((BucketWrapper) event).getBucket());
+ (event) -> ((BucketWrapper) event).getBucket())
+ // Avoid disorder of FlushEvent and DataChangeEvent.
+ .transform(
+ "FlushEventAlignment",
+ new BucketWrapperEventTypeInfo(),
+ new FlushEventAlignmentOperator());
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
index c3ceb31ac..983a680bd 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
@@ -66,13 +67,14 @@ public class PaimonRecordEventSerializer implements PaimonRecordSerializer implements WithPreCommitTopology,
CommittableMessage> {
+ protected static final Logger LOGGER = LoggerFactory.getLogger(PreCommitOperator.class);
+
+ private final String commitUser;
+
+ private final Options catalogOptions;
+
+ private Catalog catalog;
+
+ private StoreMultiCommitter storeMultiCommitter;
/** store a list of MultiTableCommittable in one checkpoint. */
- private final List results;
+ private final List multiTableCommittables;
- public PreCommitOperator() {
- results = new ArrayList<>();
+ public PreCommitOperator(Options catalogOptions, String commitUser) {
+ multiTableCommittables = new ArrayList<>();
+ this.catalogOptions = catalogOptions;
+ this.commitUser = commitUser;
}
@Override
@@ -50,8 +70,16 @@ public class PreCommitOperator
@Override
public void processElement(StreamRecord> element) {
+ if (catalog == null) {
+ this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ this.storeMultiCommitter =
+ new StoreMultiCommitter(
+ () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
+ Committer.createContext(
+ commitUser, getMetricGroup(), true, false, null));
+ }
if (element.getValue() instanceof CommittableWithLineage) {
- results.add(
+ multiTableCommittables.add(
((CommittableWithLineage) element.getValue())
.getCommittable());
}
@@ -64,34 +92,44 @@ public class PreCommitOperator
@Override
public void prepareSnapshotPreBarrier(long checkpointId) {
- // CommittableSummary should be sent before all CommittableWithLineage.
- CommittableMessage summary =
- new CommittableSummary<>(
- getRuntimeContext().getIndexOfThisSubtask(),
- getRuntimeContext().getNumberOfParallelSubtasks(),
- checkpointId,
- results.size(),
- results.size(),
- 0);
- output.collect(new StreamRecord<>(summary));
-
- results.forEach(
- committable -> {
- // update the right checkpointId for MultiTableCommittable
- MultiTableCommittable committableWithCheckPointId =
- new MultiTableCommittable(
- committable.getDatabase(),
- committable.getTable(),
- checkpointId,
- committable.kind(),
- committable.wrappedCommittable());
- CommittableMessage message =
- new CommittableWithLineage<>(
- committableWithCheckPointId,
- checkpointId,
- getRuntimeContext().getIndexOfThisSubtask());
- output.collect(new StreamRecord<>(message));
- });
- results.clear();
+ for (int i = 0; i < multiTableCommittables.size(); i++) {
+ MultiTableCommittable multiTableCommittable = multiTableCommittables.get(i);
+ multiTableCommittables.set(
+ i,
+ new MultiTableCommittable(
+ multiTableCommittable.getDatabase(),
+ multiTableCommittable.getTable(),
+ checkpointId,
+ multiTableCommittable.kind(),
+ multiTableCommittable.wrappedCommittable()));
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ long checkpointId = context.getCheckpointId();
+ if (!multiTableCommittables.isEmpty()) {
+ multiTableCommittables.forEach(
+ (multiTableCommittable) ->
+ LOGGER.debug(
+ "Try to commit for {}.{} : {} in checkpoint {}",
+ multiTableCommittable.getDatabase(),
+ multiTableCommittable.getTable(),
+ multiTableCommittables,
+ checkpointId));
+ WrappedManifestCommittable wrappedManifestCommittable =
+ storeMultiCommitter.combine(checkpointId, checkpointId, multiTableCommittables);
+ storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
+ multiTableCommittables.clear();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (storeMultiCommitter != null) {
+ storeMultiCommitter.close();
+ }
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index 21bc14e0e..b2cc7c05f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cdc.common.event.ChangeEvent;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
@@ -53,6 +52,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.utils.MathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.HashMap;
@@ -63,6 +64,8 @@ import java.util.Optional;
public class BucketAssignOperator extends AbstractStreamOperator
implements OneInputStreamOperator {
+ protected static final Logger LOGGER = LoggerFactory.getLogger(BucketAssignOperator.class);
+
public final String commitUser;
private final Options catalogOptions;
@@ -99,8 +102,8 @@ public class BucketAssignOperator extends AbstractStreamOperator
super.open();
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.bucketAssignerMap = new HashMap<>();
- this.totalTasksNumber = getRuntimeContext().getNumberOfParallelSubtasks();
- this.currentTaskNumber = getRuntimeContext().getIndexOfThisSubtask();
+ this.totalTasksNumber = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ this.currentTaskNumber = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.schemaMaps = new HashMap<>();
}
@@ -121,13 +124,16 @@ public class BucketAssignOperator extends AbstractStreamOperator
public void processElement(StreamRecord streamRecord) throws Exception {
Event event = streamRecord.getValue();
if (event instanceof FlushEvent) {
- output.collect(
- new StreamRecord<>(
- new BucketWrapperFlushEvent(
- currentTaskNumber,
- ((FlushEvent) event).getSourceSubTaskId(),
- ((FlushEvent) event).getTableIds(),
- ((FlushEvent) event).getSchemaChangeEventType())));
+ for (int i = 0; i < totalTasksNumber; i++) {
+ output.collect(
+ new StreamRecord<>(
+ new BucketWrapperFlushEvent(
+ i,
+ ((FlushEvent) event).getSourceSubTaskId(),
+ currentTaskNumber,
+ ((FlushEvent) event).getTableIds(),
+ ((FlushEvent) event).getSchemaChangeEventType())));
+ }
return;
}
@@ -181,24 +187,21 @@ public class BucketAssignOperator extends AbstractStreamOperator
}
output.collect(
new StreamRecord<>(new BucketWrapperChangeEvent(bucket, (ChangeEvent) event)));
- } else if (event instanceof CreateTableEvent) {
- CreateTableEvent createTableEvent = (CreateTableEvent) event;
- schemaMaps.put(
- createTableEvent.tableId(),
- new TableSchemaInfo(createTableEvent.getSchema(), zoneId));
- output.collect(
- new StreamRecord<>(
- new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event)));
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
Schema schema =
SchemaUtils.applySchemaChangeEvent(
- schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
+ Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
+ .map(TableSchemaInfo::getSchema)
+ .orElse(null),
schemaChangeEvent);
schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId));
- output.collect(
- new StreamRecord<>(
- new BucketWrapperChangeEvent(currentTaskNumber, (ChangeEvent) event)));
+ // Broadcast SchemachangeEvent.
+ for (int index = 0; index < totalTasksNumber; index++) {
+ output.collect(
+ new StreamRecord<>(
+ new BucketWrapperChangeEvent(index, (ChangeEvent) event)));
+ }
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
index 6ff1573e1..827c23d88 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
@@ -87,6 +87,7 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton
BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
dataOutputView.writeInt(bucketWrapperFlushEvent.getBucket());
dataOutputView.writeInt(bucketWrapperFlushEvent.getSourceSubTaskId());
+ dataOutputView.writeInt(bucketWrapperFlushEvent.getBucketAssignTaskId());
tableIdListSerializer.serialize(bucketWrapperFlushEvent.getTableIds(), dataOutputView);
schemaChangeEventTypeEnumSerializer.serialize(
bucketWrapperFlushEvent.getSchemaChangeEventType(), dataOutputView);
@@ -98,6 +99,7 @@ public class BucketWrapperEventSerializer extends TypeSerializerSingleton
EventClass eventClass = enumSerializer.deserialize(source);
if (eventClass.equals(EventClass.BUCKET_WRAPPER_FLUSH_EVENT)) {
return new BucketWrapperFlushEvent(
+ source.readInt(),
source.readInt(),
source.readInt(),
tableIdListSerializer.deserialize(source),
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
index 25e9f9152..c53cf8688 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java
@@ -28,14 +28,21 @@ import java.util.Objects;
public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper {
private final int bucket;
+ private final int bucketAssignTaskId;
public BucketWrapperFlushEvent(
int bucket,
- int subTaskId,
+ int sourceSubTaskId,
+ int bucketAssignTaskId,
List tableIds,
SchemaChangeEventType schemaChangeEventType) {
- super(subTaskId, tableIds, schemaChangeEventType);
+ super(sourceSubTaskId, tableIds, schemaChangeEventType);
this.bucket = bucket;
+ this.bucketAssignTaskId = bucketAssignTaskId;
+ }
+
+ public int getBucketAssignTaskId() {
+ return bucketAssignTaskId;
}
@Override
@@ -55,18 +62,22 @@ public class BucketWrapperFlushEvent extends FlushEvent implements BucketWrapper
return false;
}
BucketWrapperFlushEvent that = (BucketWrapperFlushEvent) o;
- return bucket == that.bucket;
+ return bucket == that.bucket
+ && bucketAssignTaskId == that.bucketAssignTaskId
+ && getSourceSubTaskId() == that.getSourceSubTaskId();
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), bucket);
+ return Objects.hash(super.hashCode(), bucket, bucketAssignTaskId);
}
@Override
public String toString() {
return "BucketWrapperFlushEvent{subTaskId="
+ getSourceSubTaskId()
+ + ", bucketAssignTaskId="
+ + bucketAssignTaskId
+ ", bucket="
+ bucket
+ '}';
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java
new file mode 100644
index 000000000..70f651e43
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/FlushEventAlignmentOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Align {@link FlushEvent}s broadcasted by {@link BucketAssignOperator}. */
+public class FlushEventAlignmentOperator extends AbstractStreamOperator
+ implements OneInputStreamOperator {
+
+ private transient int totalTasksNumber;
+
+ /**
+ * Key: subtask id of {@link SchemaOperator}, Value: subtask ids of {@link
+ * BucketAssignOperator}.
+ */
+ private transient Map> sourceTaskIdToAssignBucketSubTaskIds;
+
+ private transient int currentSubTaskId;
+
+ public FlushEventAlignmentOperator() {
+ // It's necessary to avoid unpredictable outcomes of Event shuffling.
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.totalTasksNumber = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
+ this.currentSubTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+ sourceTaskIdToAssignBucketSubTaskIds = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord streamRecord) {
+ Event event = streamRecord.getValue();
+ if (event instanceof BucketWrapperFlushEvent) {
+ BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent) event;
+ int sourceSubTaskId = bucketWrapperFlushEvent.getSourceSubTaskId();
+ Set subTaskIds =
+ sourceTaskIdToAssignBucketSubTaskIds.getOrDefault(
+ sourceSubTaskId, new HashSet<>());
+ int subtaskId = bucketWrapperFlushEvent.getBucketAssignTaskId();
+ subTaskIds.add(subtaskId);
+ if (subTaskIds.size() == totalTasksNumber) {
+ LOG.info("{} send FlushEvent of {}", currentSubTaskId, sourceSubTaskId);
+ output.collect(
+ new StreamRecord<>(
+ new FlushEvent(
+ sourceSubTaskId,
+ bucketWrapperFlushEvent.getTableIds(),
+ bucketWrapperFlushEvent.getSchemaChangeEventType())));
+ sourceTaskIdToAssignBucketSubTaskIds.remove(sourceSubTaskId);
+ } else {
+ LOG.info(
+ "{} collect FlushEvent of {} with subtask {}",
+ currentSubTaskId,
+ sourceSubTaskId,
+ +subtaskId);
+ sourceTaskIdToAssignBucketSubTaskIds.put(sourceSubTaskId, subTaskIds);
+ }
+ } else {
+ output.collect(streamRecord);
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 382a787dd..c808f45d7 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -30,8 +30,11 @@ limitations under the License.
1.19.1
1.20.0
+ 1.19
+ 1.20
8.0.27
1.2.10_flink-${flink.major.version}
+ 0.9.0
@@ -98,6 +101,19 @@ limitations under the License.
test-jar
test
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-paimon
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.paimon
+ paimon-flink-${flink.major.version}
+ ${paimon.version}
+ test
+
org.apache.flink
flink-connector-oceanbase-cdc
@@ -284,6 +300,52 @@ limitations under the License.
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-oceanbase
+ ${project.version}
+ oceanbase-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-paimon
+ ${project.version}
+ paimon-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+ org.apache.flink
+ flink-shaded-hadoop-2-uber
+ 2.8.3-10.0
+ flink-shade-hadoop.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+ org.apache.paimon
+ paimon-flink-${flink-major-1.19}
+ ${paimon.version}
+ paimon-sql-connector-${flink-1.19}.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+ org.apache.paimon
+ paimon-flink-${flink-major-1.20}
+ ${paimon.version}
+ paimon-sql-connector-${flink-1.20}.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
org.apache.flink
flink-cdc-pipeline-udf-examples
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
new file mode 100644
index 000000000..b569aed36
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** End-to-end tests for mysql cdc to Paimon pipeline job. */
+public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class);
+
+ public static final Duration TESTCASE_TIMEOUT = Duration.ofMinutes(3);
+
+ // ------------------------------------------------------------------------------------------
+ // MySQL Variables (we always use MySQL as the data source for easier verifying)
+ // ------------------------------------------------------------------------------------------
+ protected static final String MYSQL_TEST_USER = "mysqluser";
+ protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+
+ @ClassRule
+ public static final MySqlContainer MYSQL =
+ (MySqlContainer)
+ new MySqlContainer(
+ MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
+ .withConfigurationOverride("docker/mysql/my.cnf")
+ .withSetupSQL("docker/mysql/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mysql")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ protected final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL, "paimon_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ @BeforeClass
+ public static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ inventoryDatabase.createAndInitialize();
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(
+ TestUtils.getResource(getPaimonSQLConnectorResourceName())),
+ sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName());
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(TestUtils.getResource("flink-shade-hadoop.jar")),
+ sharedVolume.toString() + "/flink-shade-hadoop.jar");
+ }
+
+ @After
+ public void after() {
+ super.after();
+ inventoryDatabase.dropDatabase();
+ }
+
+ @Test
+ public void testSyncWholeDatabase() throws Exception {
+ String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID();
+ String database = inventoryDatabase.getDatabaseName();
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: mysql\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: paimon\n"
+ + " catalog.properties.warehouse: %s\n"
+ + " catalog.properties.metastore: filesystem\n"
+ + " catalog.properties.cache-enabled: false\n"
+ + "\n"
+ + "pipeline:\n"
+ + " schema.change.behavior: evolve\n"
+ + " parallelism: 4",
+ MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, warehouse);
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+ Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, paimonCdcConnector, mysqlDriverJar, hadoopJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ validateSinkResult(
+ warehouse,
+ database,
+ "products",
+ Arrays.asList(
+ "101, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null",
+ "102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null",
+ "103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null",
+ "104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null",
+ "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null",
+ "106, Six, Ferris, 9.813, null, null, null",
+ "107, Seven, Grace, 2.117, null, null, null",
+ "108, Eight, Hesse, 6.819, null, null, null",
+ "109, Nine, IINA, 5.223, null, null, null"));
+
+ validateSinkResult(
+ warehouse,
+ database,
+ "customers",
+ Arrays.asList(
+ "101, user_1, Shanghai, 123567891234",
+ "102, user_2, Shanghai, 123567891234",
+ "103, user_3, Shanghai, 123567891234",
+ "104, user_4, Shanghai, 123567891234"));
+
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(), MYSQL.getDatabasePort(), database);
+ List recordsInIncrementalPhase;
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+
+ stat.execute(
+ "INSERT INTO products VALUES (default,'Ten','Jukebox',0.2, null, null, null);"); // 110
+ stat.execute("UPDATE products SET description='Fay' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.125' WHERE id=107;");
+
+ // modify table schema
+ stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+ stat.execute("DELETE FROM products WHERE id=101;");
+
+ stat.execute(
+ "INSERT INTO products VALUES (default,'Eleven','Kryo',5.18, null, null);"); // 111
+ stat.execute(
+ "INSERT INTO products VALUES (default,'Twelve', 'Lily', 2.14, null, null);"); // 112
+ recordsInIncrementalPhase = createChangesAndValidate(stat);
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+ List recordsInSnapshotPhase =
+ new ArrayList<>(
+ Arrays.asList(
+ "102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, null, null, null, null, null, null, null, null, null",
+ "103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, null, null, null, null, null, null, null, null, null",
+ "104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, null, null, null, null, null, null, null, null, null",
+ "105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null",
+ "106, Six, Fay, 9.813, null, null, null, null, null, null, null, null, null, null, null, null",
+ "107, Seven, Grace, 5.125, null, null, null, null, null, null, null, null, null, null, null, null",
+ "108, Eight, Hesse, 6.819, null, null, null, null, null, null, null, null, null, null, null, null",
+ "109, Nine, IINA, 5.223, null, null, null, null, null, null, null, null, null, null, null, null",
+ "110, Ten, Jukebox, 0.2, null, null, null, null, null, null, null, null, null, null, null, null",
+ "111, Eleven, Kryo, 5.18, null, null, null, null, null, null, null, null, null, null, null, null",
+ "112, Twelve, Lily, 2.14, null, null, null, null, null, null, null, null, null, null, null, null"));
+ recordsInSnapshotPhase.addAll(recordsInIncrementalPhase);
+ validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase);
+ }
+
+ /**
+ * Basic Schema: id INTEGER NOT NULL, name VARCHAR(255) NOT NULL, description VARCHAR(512),
+ * weight FLOAT, enum_c enum('red', 'white'), json_c JSON.
+ */
+ private List createChangesAndValidate(Statement stat) throws SQLException {
+ List result = new ArrayList<>();
+ StringBuilder sqlFields = new StringBuilder();
+
+ // Add Column.
+ for (int addColumnRepeat = 0; addColumnRepeat < 10; addColumnRepeat++) {
+ stat.execute(
+ String.format(
+ "ALTER TABLE products ADD COLUMN point_c_%s VARCHAR(10);",
+ addColumnRepeat));
+ sqlFields.append(", '1'");
+ StringBuilder resultFields = new StringBuilder();
+ for (int j = 0; j < 10; j++) {
+ if (j <= addColumnRepeat) {
+ resultFields.append(", 1");
+ } else {
+ resultFields.append(", null");
+ }
+ }
+ for (int j = 0; j < 1000; j++) {
+ stat.addBatch(
+ String.format(
+ "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null %s);",
+ sqlFields));
+ int id = addColumnRepeat * 1000 + j + 113;
+ result.add(
+ String.format("%s, finally, null, 2.14, null, null%s", id, resultFields));
+ }
+ stat.executeBatch();
+ }
+
+ // Modify Column type.
+ for (int modifyColumnRepeat = 0; modifyColumnRepeat < 10; modifyColumnRepeat++) {
+ for (int j = 0; j < 1000; j++) {
+ stat.addBatch(
+ String.format(
+ "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null %s);",
+ sqlFields));
+ int id = modifyColumnRepeat * 1000 + j + 10113;
+ result.add(
+ String.format(
+ "%s, finally, null, 2.14, null, null%s",
+ id, ", 1, 1, 1, 1, 1, 1, 1, 1, 1, 1"));
+ }
+ stat.executeBatch();
+ stat.execute(
+ String.format(
+ "ALTER TABLE products MODIFY point_c_0 VARCHAR(%s);",
+ 10 + modifyColumnRepeat));
+ }
+
+ return result;
+ }
+
+ private List fetchPaimonTableRows(String warehouse, String database, String table)
+ throws Exception {
+ String template =
+ readLines("docker/peek-paimon.sql").stream()
+ .filter(line -> !line.startsWith("--"))
+ .collect(Collectors.joining("\n"));
+ String sql = String.format(template, warehouse, database, table);
+ String containerSqlPath = sharedVolume.toString() + "/peek.sql";
+ jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath);
+
+ Container.ExecResult result =
+ jobManager.execInContainer(
+ "/opt/flink/bin/sql-client.sh",
+ "--jar",
+ sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName(),
+ "--jar",
+ sharedVolume.toString() + "/flink-shade-hadoop.jar",
+ "-f",
+ containerSqlPath);
+ if (result.getExitCode() != 0) {
+ throw new RuntimeException(
+ "Failed to execute peek script. Stdout: "
+ + result.getStdout()
+ + "; Stderr: "
+ + result.getStderr());
+ }
+
+ return Arrays.stream(result.getStdout().split("\n"))
+ .filter(line -> line.startsWith("|"))
+ .skip(1)
+ .map(MySqlToPaimonE2eITCase::extractRow)
+ .map(row -> String.format("%s", String.join(", ", row)))
+ .collect(Collectors.toList());
+ }
+
+ private static String[] extractRow(String row) {
+ return Arrays.stream(row.split("\\|"))
+ .map(String::trim)
+ .filter(col -> !col.isEmpty())
+ .map(col -> col.equals("") ? "null" : col)
+ .toArray(String[]::new);
+ }
+
+ private void validateSinkResult(
+ String warehouse, String database, String table, List expected)
+ throws InterruptedException {
+ LOG.info("Verifying Paimon {}::{}::{} results...", warehouse, database, table);
+ long deadline = System.currentTimeMillis() + TESTCASE_TIMEOUT.toMillis();
+ List results = Collections.emptyList();
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ results = fetchPaimonTableRows(warehouse, database, table);
+ Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
+ LOG.info(
+ "Successfully verified {} records in {} seconds.",
+ expected.size(),
+ (System.currentTimeMillis() - deadline + TESTCASE_TIMEOUT.toMillis())
+ / 1000);
+ return;
+ } catch (Exception e) {
+ LOG.warn("Validate failed, waiting for the next loop...", e);
+ } catch (AssertionError ignored) {
+ // AssertionError contains way too much records and might flood the log output.
+ LOG.warn(
+ "Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...",
+ expected.size(),
+ results.size());
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ protected String getPaimonSQLConnectorResourceName() {
+ return String.format("paimon-sql-connector-%s.jar", flinkVersion);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index eb7798e0c..9f07845b4 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -29,6 +29,9 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.util.TestLogger;
import com.fasterxml.jackson.core.Version;
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import com.github.dockerjava.api.model.Volume;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -38,16 +41,22 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.ToStringConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
@@ -87,6 +96,23 @@ public abstract class PipelineTestEnvironment extends TestLogger {
public static final int JOB_MANAGER_REST_PORT = 8081;
public static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
public static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+ public static final List EXTERNAL_PROPS =
+ Arrays.asList(
+ String.format("jobmanager.rpc.address: %s", INTER_CONTAINER_JM_ALIAS),
+ "jobmanager.bind-host: 0.0.0.0",
+ "taskmanager.bind-host: 0.0.0.0",
+ "rest.bind-address: 0.0.0.0",
+ "rest.address: 0.0.0.0",
+ "jobmanager.memory.process.size: 1GB",
+ "query.server.port: 6125",
+ "blob.server.port: 6124",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "execution.checkpointing.interval: 300",
+ "state.backend.type: hashmap",
+ "env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false",
+ "restart-strategy.type: off");
+ public static final String FLINK_PROPERTIES = String.join("\n", EXTERNAL_PROPS);
@ClassRule public static final Network NETWORK = Network.newNetwork();
@@ -95,6 +121,7 @@ public abstract class PipelineTestEnvironment extends TestLogger {
@Nullable protected RestClusterClient restClusterClient;
protected GenericContainer> jobManager;
protected GenericContainer> taskManager;
+ protected Volume sharedVolume = new Volume("/tmp/shared");
protected ToStringConsumer jobManagerConsumer;
@@ -114,30 +141,32 @@ public abstract class PipelineTestEnvironment extends TestLogger {
public void before() throws Exception {
LOG.info("Starting containers...");
jobManagerConsumer = new ToStringConsumer();
-
- String flinkProperties = getFlinkProperties();
-
jobManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("jobmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
- .withEnv("FLINK_PROPERTIES", flinkProperties)
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withCreateContainerCmdModifier(cmd -> cmd.withVolumes(sharedVolume))
.withLogConsumer(jobManagerConsumer);
+ Startables.deepStart(Stream.of(jobManager)).join();
+ runInContainerAsRoot(jobManager, "chmod", "0777", "-R", sharedVolume.toString());
+ LOG.info("JobManager is started.");
+
taskManagerConsumer = new ToStringConsumer();
taskManager =
new GenericContainer<>(getFlinkDockerImageTag())
.withCommand("taskmanager")
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
- .withEnv("FLINK_PROPERTIES", flinkProperties)
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
.dependsOn(jobManager)
+ .withVolumesFrom(jobManager, BindMode.READ_WRITE)
.withLogConsumer(taskManagerConsumer);
-
- Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();
- LOG.info("Containers are started.");
+ runInContainerAsRoot(taskManager, "chmod", "0777", "-R", sharedVolume.toString());
+ LOG.info("TaskManager is started.");
}
@After
@@ -279,4 +308,28 @@ public abstract class PipelineTestEnvironment extends TestLogger {
"execution.checkpointing.interval: 300",
"env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false"));
}
+
+ private void runInContainerAsRoot(GenericContainer> container, String... command)
+ throws InterruptedException {
+ ToStringConsumer stdoutConsumer = new ToStringConsumer();
+ ToStringConsumer stderrConsumer = new ToStringConsumer();
+ DockerClient dockerClient = DockerClientFactory.instance().client();
+ ExecCreateCmdResponse execCreateCmdResponse =
+ dockerClient
+ .execCreateCmd(container.getContainerId())
+ .withUser("root")
+ .withCmd(command)
+ .exec();
+ FrameConsumerResultCallback callback = new FrameConsumerResultCallback();
+ callback.addConsumer(OutputFrame.OutputType.STDOUT, stdoutConsumer);
+ callback.addConsumer(OutputFrame.OutputType.STDERR, stderrConsumer);
+ dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(callback).awaitCompletion();
+ }
+
+ protected List readLines(String resource) throws IOException {
+ final URL url = PipelineTestEnvironment.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllLines(path);
+ }
}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/paimon_inventory.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/paimon_inventory.sql
new file mode 100644
index 000000000..82dc6fd09
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/paimon_inventory.sql
@@ -0,0 +1,55 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE: mysql_inventory
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c enum('red', 'white') default 'red', -- test some complex types as well,
+ json_c JSON, -- because we use additional dependencies to deserialize complex types.
+ point_c POINT
+);
+ALTER TABLE products AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default,"One", "Alice", 3.202, 'red', '{"key1": "value1"}', null),
+ (default,"Two", "Bob", 1.703, 'white', '{"key2": "value2"}', null),
+ (default,"Three", "Cecily", 4.105, 'red', '{"key3": "value3"}', null),
+ (default,"Four", "Derrida", 1.857, 'white', '{"key4": "value4"}', null),
+ (default,"Five", "Evelyn", 5.211, 'red', '{"K": "V", "k": "v"}', null),
+ (default,"Six", "Ferris", 9.813, null, null, null),
+ (default,"Seven", "Grace", 2.117, null, null, null),
+ (default,"Eight", "Hesse", 6.819, null, null, null),
+ (default,"Nine", "IINA", 5.223, null, null, null);
+
+-- Create and populate our customers using a single insert with many rows
+CREATE TABLE customers (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ address VARCHAR(1024),
+ phone_number VARCHAR(512)
+);
+
+INSERT INTO customers
+VALUES (101,"user_1","Shanghai","123567891234"),
+ (102,"user_2","Shanghai","123567891234"),
+ (103,"user_3","Shanghai","123567891234"),
+ (104,"user_4","Shanghai","123567891234");
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/peek-paimon.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/peek-paimon.sql
new file mode 100644
index 000000000..aa41d6731
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/docker/peek-paimon.sql
@@ -0,0 +1,28 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- Format this file with the following arguments:
+-- Warehouse Path, Database Name, and Table Name.
+
+SET 'sql-client.execution.result-mode' = 'tableau';
+SET 'table.display.max-column-width' = '100000';
+SET 'execution.runtime-mode' = 'batch';
+
+CREATE CATALOG paimon_catalog WITH (
+ 'type' = 'paimon',
+ 'warehouse' = '%s'
+);
+
+SELECT * FROM paimon_catalog.%s.%s;
\ No newline at end of file