From 32934393cefc6227adb172192807a0aa0b3afe19 Mon Sep 17 00:00:00 2001 From: hiliuxg <39675622+hiliuxg@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:50:49 +0800 Subject: [PATCH] [FLINK-36985][pipeline-connector/paimon] Tolerante ColumnAlreadyExistException when apply AddColumnEvent in paimon This closes #3828. --- .../cdc/connectors/paimon/sink/PaimonMetadataApplier.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 928e7b6df..ec8a06d6d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -201,7 +201,11 @@ public class PaimonMetadataApplier implements MetadataApplier { } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { - throw new SchemaEvolveException(event, e.getMessage(), e); + if (e instanceof Catalog.ColumnAlreadyExistException) { + LOG.warn("{}, skip it.", e.getMessage()); + } else { + throw new SchemaEvolveException(event, e.getMessage(), e); + } } }