[hotfix][postgres] Flink CDC Postgres Connector,subsribe regular expression will be slow

pull/3808/head
hql0312 1 month ago
parent 852cdc2738
commit 42370460cf

@ -17,6 +17,9 @@
package org.apache.flink.cdc.connectors.postgres.source.utils; package org.apache.flink.cdc.connectors.postgres.source.utils;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext; import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition; import io.debezium.connector.postgresql.PostgresPartition;
@ -28,8 +31,6 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.util.FlinkRuntimeException;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Instant; import java.time.Instant;
@ -40,9 +41,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /** A CustomPostgresSchema similar to PostgresSchema with customization. */
* A CustomPostgresSchema similar to PostgresSchema with customization.
*/
public class CustomPostgresSchema { public class CustomPostgresSchema {
// cache the schema for each table // cache the schema for each table
@ -114,7 +113,12 @@ public class CustomPostgresSchema {
dbzConfig.databaseName(), dbzConfig.databaseName(),
null, null,
// only check context tableIds // only check context tableIds
(tb) -> tableIds.stream().anyMatch(t -> t.schema().equals(tb.schema()) && t.table().equals(tb.table())), (tb) ->
tableIds.stream()
.anyMatch(
t ->
t.schema().equals(tb.schema())
&& t.table().equals(tb.table())),
null, null,
false); false);
} catch (SQLException e) { } catch (SQLException e) {

Loading…
Cancel
Save