[cdc-connector][postgres] Apply DBZ-5398 postgres connector to handle functional unique index (#2842)

This closes #2710.
pull/2539/head
Hongshun Wang 1 year ago committed by GitHub
parent 9abf2cf900
commit e3d6c7e0aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -49,6 +49,7 @@ import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
/**
* {@link JdbcConnection} connection extension used for connecting to Postgres instances.
@ -61,6 +62,9 @@ import java.util.concurrent.atomic.AtomicLong;
* ConnectionFactory
* <li>override connection() to return a unwrapped PgConnection (otherwise, it will complain
* about HikariProxyConnection cannot be cast to class org.postgresql.core.BaseConnection)
* <li>override isTableUniqueIndexIncluded: Copied DBZ-5398 from Debezium 2.0.0.Final to fix
* https://github.com/ververica/flink-cdc-connectors/issues/2710. Remove this comment
* after bumping debezium version to 2.0.0.Final.
* </ul>
*/
public class PostgresConnection extends JdbcConnection {
@ -72,6 +76,10 @@ public class PostgresConnection extends JdbcConnection {
public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat";
public static final String CONNECTION_GENERAL = "Debezium General";
private static final Pattern FUNCTION_DEFAULT_PATTERN =
Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
private static final Pattern EXPRESSION_DEFAULT_PATTERN =
Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] ?.+)+)+\\)");
private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class);
private static final String URL_PATTERN =
@ -830,6 +838,15 @@ public class PostgresConnection extends JdbcConnection {
return "TABLE".equals(tableType) || "PARTITIONED TABLE".equals(tableType);
}
@Override
protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
if (columnName != null) {
return !FUNCTION_DEFAULT_PATTERN.matcher(columnName).matches()
&& !EXPRESSION_DEFAULT_PATTERN.matcher(columnName).matches();
}
return false;
}
@FunctionalInterface
public interface PostgresValueConverterBuilder {
PostgresValueConverter build(TypeRegistry registry);

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.postgres;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
@ -41,7 +42,9 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -219,6 +222,16 @@ public abstract class PostgresTestBase extends AbstractTestBase {
return postgresSourceConfigFactory;
}
public static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(

@ -895,16 +895,6 @@ public class PostgresSourceITCase extends PostgresTestBase {
return rows.stream().map(stringifier).collect(Collectors.toList());
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
/**
* Make some changes on the specified customer table. Changelog in string could be accessed by
* {@link #firstPartStreamEvents}.

@ -23,6 +23,9 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.postgres.PostgresTestBase;
@ -35,6 +38,7 @@ import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -707,4 +711,69 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testUniqueIndexIncludingFunction() throws Exception {
// Clear the influence of usesLegacyRows which set USE_LEGACY_TO_STRING = true.
// In this test, print +I,-U, +U to see more clearly.
RowUtils.USE_LEGACY_TO_STRING = false;
initializePostgresTable(POSTGRES_CONTAINER, "index_type_test");
String sourceDDL =
String.format(
"CREATE TABLE functional_unique_index ("
+ " id INTEGER NOT NULL,"
+ " char_c STRING"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
// In the snapshot phase of increment snapshot mode, table without
// primary key is not allowed now.Thus, when
// scan.incremental.snapshot.enabled = true, use 'latest-offset'
// startup mode.
+ (parallelismSnapshot
? " 'scan.startup.mode' = 'latest-offset',"
: "")
+ " 'slot.name' = '%s'"
+ ")",
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword(),
POSTGRES_CONTAINER.getDatabaseName(),
"indexes",
"functional_unique_index",
parallelismSnapshot,
getSlotName());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult tableResult = tEnv.executeSql("SELECT * FROM functional_unique_index");
List<String> expected = new ArrayList<>();
if (!parallelismSnapshot) {
expected.add("+I[1, a]");
}
// wait a bit to make sure the replication slot is ready
Thread.sleep(5000L);
// generate WAL
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
statement.execute("UPDATE indexes.functional_unique_index SET char_c=NULL WHERE id=1;");
}
expected.addAll(Arrays.asList("-U[1, a]", "+U[1, null]"));
CloseableIterator<Row> iterator = tableResult.collect();
assertEqualsInAnyOrder(expected, fetchRows(iterator, expected.size()));
tableResult.getJobClient().get().cancel().get();
RowUtils.USE_LEGACY_TO_STRING = true;
}
}

@ -53,4 +53,4 @@ INSERT INTO full_types
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500, 'SRID=3187;POINT(174.9479 -36.7208)'::geometry,
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);
'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography);

@ -0,0 +1,35 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: key_type_test
-- ----------------------------------------------------------------------------------------------------------------
-- Generate a number of tables to cover as many of the PG index situation (primary key, unique index) as possible
DROP SCHEMA IF EXISTS indexes CASCADE;
CREATE SCHEMA indexes;
SET search_path TO indexes;
-- Generate a table without primary key but a functional unique index
CREATE TABLE functional_unique_index
(
id INTEGER NOT NULL,
char_c CHAR
);
create unique index test_tbl_idx
on functional_unique_index(id, COALESCE(char_c, ''::text));
ALTER TABLE functional_unique_index
REPLICA IDENTITY FULL;
INSERT INTO functional_unique_index
VALUES (1, 'a');
Loading…
Cancel
Save