From 42370460cfaa8007e4f78b2071352ec4b5e6e3fb Mon Sep 17 00:00:00 2001 From: hql0312 <1079199821@qq.com> Date: Mon, 23 Dec 2024 17:59:03 +0800 Subject: [PATCH] [hotfix][postgres] Flink CDC Postgres Connector,subsribe regular expression will be slow --- .../source/utils/CustomPostgresSchema.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java index c89407017..6aab99d66 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java @@ -17,6 +17,9 @@ package org.apache.flink.cdc.connectors.postgres.source.utils; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.util.FlinkRuntimeException; + import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.PostgresPartition; @@ -28,8 +31,6 @@ import io.debezium.relational.history.TableChanges; import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.schema.SchemaChangeEvent; import io.debezium.util.Clock; -import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; -import org.apache.flink.util.FlinkRuntimeException; import java.sql.SQLException; import java.time.Instant; @@ -40,9 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -/** - * A CustomPostgresSchema similar to PostgresSchema with customization. - */ +/** A CustomPostgresSchema similar to PostgresSchema with customization. */ public class CustomPostgresSchema { // cache the schema for each table @@ -114,7 +113,12 @@ public class CustomPostgresSchema { dbzConfig.databaseName(), null, // only check context tableIds - (tb) -> tableIds.stream().anyMatch(t -> t.schema().equals(tb.schema()) && t.table().equals(tb.table())), + (tb) -> + tableIds.stream() + .anyMatch( + t -> + t.schema().equals(tb.schema()) + && t.table().equals(tb.table())), null, false); } catch (SQLException e) {