From 5691b7bf977abd608dcf189359be602d93f2eadf Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Thu, 16 Jan 2025 01:55:04 -0800 Subject: [PATCH] [FLINK-36406][cdc-runtime] Close MetadataApplier when the job stops This closes #3623 --- .../org/apache/flink/cdc/common/sink/MetadataApplier.java | 6 +++++- .../cdc/connectors/paimon/sink/PaimonMetadataApplier.java | 7 +++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java index d112fd192..22424db99 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/MetadataApplier.java @@ -30,7 +30,7 @@ import java.util.stream.Collectors; /** {@code MetadataApplier} is used to apply metadata changes to external systems. */ @PublicEvolving -public interface MetadataApplier extends Serializable { +public interface MetadataApplier extends Serializable, AutoCloseable { /** Apply the given {@link SchemaChangeEvent} to external systems. */ void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException; @@ -50,4 +50,8 @@ public interface MetadataApplier extends Serializable { default Set getSupportedSchemaEvolutionTypes() { return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); } + + /** Closes the metadata applier and its underlying resources. */ + @Override + default void close() throws Exception {} } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 1c79ae8f9..81aa5c8c5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -153,6 +153,13 @@ public class PaimonMetadataApplier implements MetadataApplier { }); } + @Override + public void close() throws Exception { + if (catalog != null) { + catalog.close(); + } + } + private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException { try { if (!catalog.databaseExists(event.tableId().getSchemaName())) {