[postgres] Reuse pool connection based on database in postgres cdc

pull/2551/head
Hongshun Wang 1 year ago committed by GitHub
parent 95af1a5305
commit a99e3da34b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,6 +16,8 @@
package com.ververica.cdc.connectors.base.relational.connection; package com.ververica.cdc.connectors.base.relational.connection;
import javax.annotation.Nullable;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
@ -27,14 +29,21 @@ public class ConnectionPoolId implements Serializable {
private final int port; private final int port;
private final String username; private final String username;
@Nullable private final String database;
/** the identifier of a data source pool factory is its class name. */ /** the identifier of a data source pool factory is its class name. */
private final String dataSourcePoolFactoryIdentifier; private final String dataSourcePoolFactoryIdentifier;
public ConnectionPoolId( public ConnectionPoolId(
String host, int port, String username, String dataSourcePoolFactoryIdentifier) { String host,
int port,
String username,
@Nullable String database,
String dataSourcePoolFactoryIdentifier) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.username = username; this.username = username;
this.database = database;
this.dataSourcePoolFactoryIdentifier = dataSourcePoolFactoryIdentifier; this.dataSourcePoolFactoryIdentifier = dataSourcePoolFactoryIdentifier;
} }
@ -50,18 +59,27 @@ public class ConnectionPoolId implements Serializable {
return Objects.equals(host, that.host) return Objects.equals(host, that.host)
&& Objects.equals(port, that.port) && Objects.equals(port, that.port)
&& Objects.equals(username, that.username) && Objects.equals(username, that.username)
&& Objects.equals(database, that.database)
&& Objects.equals( && Objects.equals(
dataSourcePoolFactoryIdentifier, that.dataSourcePoolFactoryIdentifier); dataSourcePoolFactoryIdentifier, that.dataSourcePoolFactoryIdentifier);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(host, port, username, dataSourcePoolFactoryIdentifier); return Objects.hash(host, port, username, database, dataSourcePoolFactoryIdentifier);
} }
@Override @Override
public String toString() { public String toString() {
return username + '@' + host + ':' + port; return username
+ '@'
+ host
+ ':'
+ port
+ ", database="
+ database
+ ", dataSourcePoolFactoryIdentifier="
+ dataSourcePoolFactoryIdentifier;
} }
public String getDataSourcePoolFactoryIdentifier() { public String getDataSourcePoolFactoryIdentifier() {

@ -47,11 +47,8 @@ public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory {
final int connectRetryTimes = sourceConfig.getConnectMaxRetries(); final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
final ConnectionPoolId connectionPoolId = final ConnectionPoolId connectionPoolId =
new ConnectionPoolId( jdbcConnectionPoolFactory.getPoolId(
sourceConfig.getHostname(), config, jdbcConnectionPoolFactory.getClass().getName());
sourceConfig.getPort(),
sourceConfig.getUsername(),
jdbcConnectionPoolFactory.getClass().getName());
HikariDataSource dataSource = HikariDataSource dataSource =
JdbcConnectionPools.getInstance(jdbcConnectionPoolFactory) JdbcConnectionPools.getInstance(jdbcConnectionPoolFactory)

@ -19,6 +19,7 @@ package com.ververica.cdc.connectors.base.relational.connection;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import io.debezium.jdbc.JdbcConfiguration;
/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */ /** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public abstract class JdbcConnectionPoolFactory { public abstract class JdbcConnectionPoolFactory {
@ -64,4 +65,20 @@ public abstract class JdbcConnectionPoolFactory {
* @return a database url. * @return a database url.
*/ */
public abstract String getJdbcUrl(JdbcSourceConfig sourceConfig); public abstract String getJdbcUrl(JdbcSourceConfig sourceConfig);
/**
* The reuse strategy of connection pools. In most situations, connections to different
* databases in same instance (which means same host and port) can reuse same connection pool.
* However, in some situations when different databases in same instance cannot reuse same
* connection pool to connect, such as postgresql, this method should be overridden.
*/
public ConnectionPoolId getPoolId(
JdbcConfiguration config, String dataSourcePoolFactoryIdentifier) {
return new ConnectionPoolId(
config.getHostname(),
config.getPort(),
config.getUser(),
null,
dataSourcePoolFactoryIdentifier);
}
} }

@ -80,7 +80,11 @@ public class JdbcConnectionPoolTest {
getMockMySqlSourceConfig(HOSTNAME, PORT, USER_NAME, PASSWORD, DATABASE, TABLE); getMockMySqlSourceConfig(HOSTNAME, PORT, USER_NAME, PASSWORD, DATABASE, TABLE);
ConnectionPoolId poolId = ConnectionPoolId poolId =
new ConnectionPoolId( new ConnectionPoolId(
HOSTNAME, PORT, USER_NAME, MockConnectionPoolFactory.class.getName()); HOSTNAME,
PORT,
USER_NAME,
DATABASE,
MockConnectionPoolFactory.class.getName());
Assert.assertThrows( Assert.assertThrows(
String.format( String.format(
"DataSourcePoolFactoryIdentifier named %s doesn't exists", "DataSourcePoolFactoryIdentifier named %s doesn't exists",

@ -17,8 +17,10 @@
package com.ververica.cdc.connectors.postgres.source; package com.ververica.cdc.connectors.postgres.source;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.relational.connection.ConnectionPoolId;
import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory; import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import io.debezium.jdbc.JdbcConfiguration;
/** A connection pool factory to create pooled Postgres {@link HikariDataSource}. */ /** A connection pool factory to create pooled Postgres {@link HikariDataSource}. */
public class PostgresConnectionPoolFactory extends JdbcConnectionPoolFactory { public class PostgresConnectionPoolFactory extends JdbcConnectionPoolFactory {
@ -32,4 +34,19 @@ public class PostgresConnectionPoolFactory extends JdbcConnectionPoolFactory {
String database = sourceConfig.getDatabaseList().get(0); String database = sourceConfig.getDatabaseList().get(0);
return String.format(JDBC_URL_PATTERN, hostName, port, database); return String.format(JDBC_URL_PATTERN, hostName, port, database);
} }
/**
* The reuses of connection pools are based on databases in postgresql. Different databases in
* same instance cannot reuse same connection pool to connect.
*/
@Override
public ConnectionPoolId getPoolId(
JdbcConfiguration config, String dataSourcePoolFactoryIdentifier) {
return new ConnectionPoolId(
config.getHostname(),
config.getPort(),
config.getHostname(),
config.getDatabase(),
dataSourcePoolFactoryIdentifier);
}
} }

@ -0,0 +1,129 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.source;
import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.connectors.postgres.testutils.UniqueDatabase;
import io.debezium.relational.TableId;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/** Tests for {@link PostgresDialect}. */
public class PostgresDialectTest extends PostgresTestBase {
private final UniqueDatabase customDatabase =
new UniqueDatabase(
POSTGRES_CONTAINER,
"postgres1",
"customer",
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(
POSTGRES_CONTAINER,
"postgres2",
"inventory",
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
@Test
public void testDiscoverDataCollectionsInMultiDatabases() {
// initial two databases in same postgres instance
customDatabase.createAndInitialize();
inventoryDatabase.createAndInitialize();
// get table named 'customer.customers' from customDatabase which is actual in
// inventoryDatabase
PostgresSourceConfigFactory configFactoryOfCustomDatabase =
getMockPostgresSourceConfigFactory(
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
"customer",
"customers");
PostgresDialect dialectOfcustomDatabase =
new PostgresDialect(configFactoryOfCustomDatabase);
List<TableId> tableIdsOfcustomDatabase =
dialectOfcustomDatabase.discoverDataCollections(
configFactoryOfCustomDatabase.create(0));
Assert.assertEquals(tableIdsOfcustomDatabase.get(0).toString(), "customer.customers");
// get table named 'inventory.products' from customDatabase which is actual in
// inventoryDatabase
// however, nothing is found
PostgresSourceConfigFactory configFactoryOfInventoryDatabase =
getMockPostgresSourceConfigFactory(
inventoryDatabase.getHost(),
inventoryDatabase.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"inventory",
"products");
PostgresDialect dialectOfInventoryDatabase =
new PostgresDialect(configFactoryOfInventoryDatabase);
List<TableId> tableIdsOfInventoryDatabase =
dialectOfInventoryDatabase.discoverDataCollections(
configFactoryOfInventoryDatabase.create(0));
Assert.assertEquals(tableIdsOfInventoryDatabase.get(0).toString(), "inventory.products");
// get table named 'customer.customers' from customDatabase which is actual not in
// customDatabase
// however, something is found
PostgresSourceConfigFactory configFactoryOfInventoryDatabase2 =
getMockPostgresSourceConfigFactory(
inventoryDatabase.getHost(),
inventoryDatabase.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"customer",
"customers");
PostgresDialect dialectOfInventoryDatabase2 =
new PostgresDialect(configFactoryOfInventoryDatabase2);
List<TableId> tableIdsOfInventoryDatabase2 =
dialectOfInventoryDatabase2.discoverDataCollections(
configFactoryOfInventoryDatabase2.create(0));
Assert.assertTrue(tableIdsOfInventoryDatabase2.isEmpty());
}
private static PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
String hostname,
int port,
String username,
String password,
String database,
String schemaName,
String tableName) {
PostgresSourceConfigFactory postgresSourceConfigFactory = new PostgresSourceConfigFactory();
postgresSourceConfigFactory.hostname(hostname);
postgresSourceConfigFactory.port(port);
postgresSourceConfigFactory.username(username);
postgresSourceConfigFactory.password(password);
postgresSourceConfigFactory.database(database);
postgresSourceConfigFactory.schemaList(new String[] {schemaName});
postgresSourceConfigFactory.tableList(schemaName + "." + tableName);
return postgresSourceConfigFactory;
}
}
Loading…
Cancel
Save