From a99e3da34b614637560928e21bde43e76d815766 Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Tue, 10 Oct 2023 17:50:21 +0800 Subject: [PATCH] [postgres] Reuse pool connection based on database in postgres cdc --- .../connection/ConnectionPoolId.java | 24 +++- .../connection/JdbcConnectionFactory.java | 7 +- .../connection/JdbcConnectionPoolFactory.java | 17 +++ .../base/JdbcConnectionPoolTest.java | 6 +- .../source/PostgresConnectionPoolFactory.java | 17 +++ .../postgres/source/PostgresDialectTest.java | 129 ++++++++++++++++++ 6 files changed, 191 insertions(+), 9 deletions(-) create mode 100644 flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresDialectTest.java diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java index b8e0f944e..f96a8e934 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/ConnectionPoolId.java @@ -16,6 +16,8 @@ package com.ververica.cdc.connectors.base.relational.connection; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Objects; @@ -27,14 +29,21 @@ public class ConnectionPoolId implements Serializable { private final int port; private final String username; + @Nullable private final String database; + /** the identifier of a data source pool factory is its class name. */ private final String dataSourcePoolFactoryIdentifier; public ConnectionPoolId( - String host, int port, String username, String dataSourcePoolFactoryIdentifier) { + String host, + int port, + String username, + @Nullable String database, + String dataSourcePoolFactoryIdentifier) { this.host = host; this.port = port; this.username = username; + this.database = database; this.dataSourcePoolFactoryIdentifier = dataSourcePoolFactoryIdentifier; } @@ -50,18 +59,27 @@ public class ConnectionPoolId implements Serializable { return Objects.equals(host, that.host) && Objects.equals(port, that.port) && Objects.equals(username, that.username) + && Objects.equals(database, that.database) && Objects.equals( dataSourcePoolFactoryIdentifier, that.dataSourcePoolFactoryIdentifier); } @Override public int hashCode() { - return Objects.hash(host, port, username, dataSourcePoolFactoryIdentifier); + return Objects.hash(host, port, username, database, dataSourcePoolFactoryIdentifier); } @Override public String toString() { - return username + '@' + host + ':' + port; + return username + + '@' + + host + + ':' + + port + + ", database=" + + database + + ", dataSourcePoolFactoryIdentifier=" + + dataSourcePoolFactoryIdentifier; } public String getDataSourcePoolFactoryIdentifier() { diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java index 368340ef5..2f20152e3 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionFactory.java @@ -47,11 +47,8 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory { final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); final ConnectionPoolId connectionPoolId = - new ConnectionPoolId( - sourceConfig.getHostname(), - sourceConfig.getPort(), - sourceConfig.getUsername(), - jdbcConnectionPoolFactory.getClass().getName()); + jdbcConnectionPoolFactory.getPoolId( + config, jdbcConnectionPoolFactory.getClass().getName()); HikariDataSource dataSource = JdbcConnectionPools.getInstance(jdbcConnectionPoolFactory) diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPoolFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPoolFactory.java index c59850da1..99749f5d0 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPoolFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/connection/JdbcConnectionPoolFactory.java @@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.base.relational.connection; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; +import io.debezium.jdbc.JdbcConfiguration; /** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */ public abstract class JdbcConnectionPoolFactory { @@ -64,4 +65,20 @@ public abstract class JdbcConnectionPoolFactory { * @return a database url. */ public abstract String getJdbcUrl(JdbcSourceConfig sourceConfig); + + /** + * The reuse strategy of connection pools. In most situations, connections to different + * databases in same instance (which means same host and port) can reuse same connection pool. + * However, in some situations when different databases in same instance cannot reuse same + * connection pool to connect, such as postgresql, this method should be overridden. + */ + public ConnectionPoolId getPoolId( + JdbcConfiguration config, String dataSourcePoolFactoryIdentifier) { + return new ConnectionPoolId( + config.getHostname(), + config.getPort(), + config.getUser(), + null, + dataSourcePoolFactoryIdentifier); + } } diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/JdbcConnectionPoolTest.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/JdbcConnectionPoolTest.java index 8c07ef79a..68bb52598 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/JdbcConnectionPoolTest.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/JdbcConnectionPoolTest.java @@ -80,7 +80,11 @@ public class JdbcConnectionPoolTest { getMockMySqlSourceConfig(HOSTNAME, PORT, USER_NAME, PASSWORD, DATABASE, TABLE); ConnectionPoolId poolId = new ConnectionPoolId( - HOSTNAME, PORT, USER_NAME, MockConnectionPoolFactory.class.getName()); + HOSTNAME, + PORT, + USER_NAME, + DATABASE, + MockConnectionPoolFactory.class.getName()); Assert.assertThrows( String.format( "DataSourcePoolFactoryIdentifier named %s doesn't exists", diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresConnectionPoolFactory.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresConnectionPoolFactory.java index b326e655f..3a4815960 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresConnectionPoolFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresConnectionPoolFactory.java @@ -17,8 +17,10 @@ package com.ververica.cdc.connectors.postgres.source; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; +import com.ververica.cdc.connectors.base.relational.connection.ConnectionPoolId; import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory; import com.zaxxer.hikari.HikariDataSource; +import io.debezium.jdbc.JdbcConfiguration; /** A connection pool factory to create pooled Postgres {@link HikariDataSource}. */ public class PostgresConnectionPoolFactory extends JdbcConnectionPoolFactory { @@ -32,4 +34,19 @@ public class PostgresConnectionPoolFactory extends JdbcConnectionPoolFactory { String database = sourceConfig.getDatabaseList().get(0); return String.format(JDBC_URL_PATTERN, hostName, port, database); } + + /** + * The reuses of connection pools are based on databases in postgresql. Different databases in + * same instance cannot reuse same connection pool to connect. + */ + @Override + public ConnectionPoolId getPoolId( + JdbcConfiguration config, String dataSourcePoolFactoryIdentifier) { + return new ConnectionPoolId( + config.getHostname(), + config.getPort(), + config.getHostname(), + config.getDatabase(), + dataSourcePoolFactoryIdentifier); + } } diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresDialectTest.java new file mode 100644 index 000000000..fbc0e51bc --- /dev/null +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/source/PostgresDialectTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.source; + +import com.ververica.cdc.connectors.postgres.PostgresTestBase; +import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import com.ververica.cdc.connectors.postgres.testutils.UniqueDatabase; +import io.debezium.relational.TableId; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** Tests for {@link PostgresDialect}. */ +public class PostgresDialectTest extends PostgresTestBase { + + private final UniqueDatabase customDatabase = + new UniqueDatabase( + POSTGRES_CONTAINER, + "postgres1", + "customer", + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + + private final UniqueDatabase inventoryDatabase = + new UniqueDatabase( + POSTGRES_CONTAINER, + "postgres2", + "inventory", + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + + @Test + public void testDiscoverDataCollectionsInMultiDatabases() { + + // initial two databases in same postgres instance + customDatabase.createAndInitialize(); + inventoryDatabase.createAndInitialize(); + + // get table named 'customer.customers' from customDatabase which is actual in + // inventoryDatabase + PostgresSourceConfigFactory configFactoryOfCustomDatabase = + getMockPostgresSourceConfigFactory( + customDatabase.getHost(), + customDatabase.getDatabasePort(), + customDatabase.getUsername(), + customDatabase.getPassword(), + customDatabase.getDatabaseName(), + "customer", + "customers"); + PostgresDialect dialectOfcustomDatabase = + new PostgresDialect(configFactoryOfCustomDatabase); + List tableIdsOfcustomDatabase = + dialectOfcustomDatabase.discoverDataCollections( + configFactoryOfCustomDatabase.create(0)); + Assert.assertEquals(tableIdsOfcustomDatabase.get(0).toString(), "customer.customers"); + + // get table named 'inventory.products' from customDatabase which is actual in + // inventoryDatabase + // however, nothing is found + PostgresSourceConfigFactory configFactoryOfInventoryDatabase = + getMockPostgresSourceConfigFactory( + inventoryDatabase.getHost(), + inventoryDatabase.getDatabasePort(), + inventoryDatabase.getUsername(), + inventoryDatabase.getPassword(), + inventoryDatabase.getDatabaseName(), + "inventory", + "products"); + PostgresDialect dialectOfInventoryDatabase = + new PostgresDialect(configFactoryOfInventoryDatabase); + List tableIdsOfInventoryDatabase = + dialectOfInventoryDatabase.discoverDataCollections( + configFactoryOfInventoryDatabase.create(0)); + Assert.assertEquals(tableIdsOfInventoryDatabase.get(0).toString(), "inventory.products"); + + // get table named 'customer.customers' from customDatabase which is actual not in + // customDatabase + // however, something is found + PostgresSourceConfigFactory configFactoryOfInventoryDatabase2 = + getMockPostgresSourceConfigFactory( + inventoryDatabase.getHost(), + inventoryDatabase.getDatabasePort(), + inventoryDatabase.getUsername(), + inventoryDatabase.getPassword(), + inventoryDatabase.getDatabaseName(), + "customer", + "customers"); + PostgresDialect dialectOfInventoryDatabase2 = + new PostgresDialect(configFactoryOfInventoryDatabase2); + List tableIdsOfInventoryDatabase2 = + dialectOfInventoryDatabase2.discoverDataCollections( + configFactoryOfInventoryDatabase2.create(0)); + Assert.assertTrue(tableIdsOfInventoryDatabase2.isEmpty()); + } + + private static PostgresSourceConfigFactory getMockPostgresSourceConfigFactory( + String hostname, + int port, + String username, + String password, + String database, + String schemaName, + String tableName) { + PostgresSourceConfigFactory postgresSourceConfigFactory = new PostgresSourceConfigFactory(); + postgresSourceConfigFactory.hostname(hostname); + postgresSourceConfigFactory.port(port); + postgresSourceConfigFactory.username(username); + postgresSourceConfigFactory.password(password); + postgresSourceConfigFactory.database(database); + postgresSourceConfigFactory.schemaList(new String[] {schemaName}); + postgresSourceConfigFactory.tableList(schemaName + "." + tableName); + return postgresSourceConfigFactory; + } +}