diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java index c89407017..6aab99d66 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java @@ -17,6 +17,9 @@ package org.apache.flink.cdc.connectors.postgres.source.utils; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.util.FlinkRuntimeException; + import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.PostgresPartition; @@ -28,8 +31,6 @@ import io.debezium.relational.history.TableChanges; import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.schema.SchemaChangeEvent; import io.debezium.util.Clock; -import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; -import org.apache.flink.util.FlinkRuntimeException; import java.sql.SQLException; import java.time.Instant; @@ -40,9 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -/** - * A CustomPostgresSchema similar to PostgresSchema with customization. - */ +/** A CustomPostgresSchema similar to PostgresSchema with customization. */ public class CustomPostgresSchema { // cache the schema for each table @@ -114,7 +113,12 @@ public class CustomPostgresSchema { dbzConfig.databaseName(), null, // only check context tableIds - (tb) -> tableIds.stream().anyMatch(t -> t.schema().equals(tb.schema()) && t.table().equals(tb.table())), + (tb) -> + tableIds.stream() + .anyMatch( + t -> + t.schema().equals(tb.schema()) + && t.table().equals(tb.table())), null, false); } catch (SQLException e) {