From 7997f51c559690fe70aee0d14e40590a43d910ee Mon Sep 17 00:00:00 2001 From: Olivier Date: Thu, 16 Jan 2025 13:59:48 +0100 Subject: [PATCH] [FLINK-35067][cdc-connector][postgres] Adding metadata 'row_kind' for Postgres CDC Connector. This closes #3716. Co-authored-by: Leonard Xu --- .../connectors/flink-sources/postgres-cdc.md | 7 ++ .../table/PostgreSQLReadableMetadata.java | 24 +++++ .../table/PostgreSQLConnectorITCase.java | 101 ++++++++++-------- .../table/PostgreSQLTableFactoryTest.java | 21 ++-- 4 files changed, 98 insertions(+), 55 deletions(-) diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 4aca6f884..be3d826d0 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + row_kind + STRING NOT NULL + It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if +the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs. +
'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message. + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java index 1d4c2b1b7..743411c52 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java @@ -18,7 +18,9 @@ package org.apache.flink.cdc.connectors.postgres.table; import org.apache.flink.cdc.debezium.table.MetadataConverter; +import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; @@ -95,6 +97,28 @@ public enum PostgreSQLReadableMetadata { return TimestampData.fromEpochMillis( (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); } + }), + + /** + * It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE + * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message + */ + ROW_KIND( + "row_kind", + DataTypes.STRING().notNull(), + new RowDataMetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(RowData rowData) { + return StringData.fromString(rowData.getRowKind().shortString()); + } + + @Override + public Object read(SourceRecord record) { + throw new UnsupportedOperationException( + "Please call read(RowData rowData) method instead."); + } }); private final String key; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index c28646aa0..4b355cac0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -165,20 +165,24 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { * The final database table looks like this: * * > SELECT * FROM products; - * +-----+--------------------+---------------------------------------------------------+--------+ - * | id | name | description | weight | - * +-----+--------------------+---------------------------------------------------------+--------+ - * | 101 | scooter | Small 2-wheel scooter | 3.14 | - * | 102 | car battery | 12V car battery | 8.1 | - * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | - * | 104 | hammer | 12oz carpenter's hammer | 0.75 | - * | 105 | hammer | 14oz carpenter's hammer | 0.875 | - * | 106 | hammer | 18oz carpenter hammer | 1 | - * | 107 | rocks | box of assorted rocks | 5.1 | - * | 108 | jacket | water resistent black wind breaker | 0.1 | - * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | - * +-----+--------------------+---------------------------------------------------------+--------+ + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | id | name | description | weight | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | 101 | scooter | Small 2-wheel scooter | 3.14 | + * | 102 | car battery | 12V car battery | 8.1 | + * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from + * #40 to #3 | 0.8 | + * | 104 | hammer | 12oz carpenter's hammer | 0.75 | + * | 105 | hammer | 14oz carpenter's hammer | 0.875 | + * | 106 | hammer | 18oz carpenter hammer | 1 | + * | 107 | rocks | box of assorted rocks | 5.1 | + * | 108 | jacket | water resistent black wind breaker | 0.1 | + * | 109 | spare tire | 24 inch spare tire | 22.2 | + * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ * */ @@ -246,7 +250,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { // async submit job TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); - // wait for the source startup, we don't have a better way to wait it, use sleep for now + // wait for the source startup, we don't have a better way to wait it, use sleep + // for now Thread.sleep(10000L); try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); @@ -469,6 +474,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + " schema_name STRING METADATA VIRTUAL," + " table_name STRING METADATA VIRTUAL," + + " row_kind STRING METADATA FROM 'row_kind' VIRTUAL," + " id INT NOT NULL," + " name STRING," + " description STRING," @@ -501,6 +507,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { + " database_name STRING," + " schema_name STRING," + " table_name STRING," + + " row_kind STRING," + " id INT," + " name STRING," + " description STRING," @@ -546,52 +553,52 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { Arrays.asList( "+I(" + databaseName - + ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)", + + ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)", "+I(" + databaseName - + ",inventory,products,102,car battery,12V car battery,8.100)", + + ",inventory,products,+I,102,car battery,12V car battery,8.100)", "+I(" + databaseName - + ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", + + ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", "+I(" + databaseName - + ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)", + + ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)", "+I(" + databaseName - + ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)", + + ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)", "+I(" + databaseName - + ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)", + + ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)", "+I(" + databaseName - + ",inventory,products,107,rocks,box of assorted rocks,5.300)", + + ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)", "+I(" + databaseName - + ",inventory,products,108,jacket,water resistent black wind breaker,0.100)", + + ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)", "+I(" + databaseName - + ",inventory,products,109,spare tire,24 inch spare tire,22.200)", + + ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)", "+I(" + databaseName - + ",inventory,products,110,jacket,water resistent white wind breaker,0.200)", + + ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)", "+I(" + databaseName - + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)", + + ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)", "+U(" + databaseName - + ",inventory,products,106,hammer,18oz carpenter hammer,1.000)", + + ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)", "+U(" + databaseName - + ",inventory,products,107,rocks,box of assorted rocks,5.100)", + + ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)", "+U(" + databaseName - + ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)", + + ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)", "+U(" + databaseName - + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)", + + ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)", "-D(" + databaseName - + ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)"); + + ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)"); List actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); Collections.sort(actual); Collections.sort(expected); @@ -679,20 +686,24 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { * The final database table looks like this: * * > SELECT * FROM products; - * +-----+--------------------+---------------------------------------------------------+--------+ - * | id | name | description | weight | - * +-----+--------------------+---------------------------------------------------------+--------+ - * | 101 | scooter | Small 2-wheel scooter | 3.14 | - * | 102 | car battery | 12V car battery | 8.1 | - * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | - * | 104 | hammer | 12oz carpenter's hammer | 0.75 | - * | 105 | hammer | 14oz carpenter's hammer | 0.875 | - * | 106 | hammer | 18oz carpenter hammer | 1 | - * | 107 | rocks | box of assorted rocks | 5.1 | - * | 108 | jacket | water resistent black wind breaker | 0.1 | - * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | - * +-----+--------------------+---------------------------------------------------------+--------+ + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | id | name | description | weight | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ + * | 101 | scooter | Small 2-wheel scooter | 3.14 | + * | 102 | car battery | 12V car battery | 8.1 | + * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from + * #40 to #3 | 0.8 | + * | 104 | hammer | 12oz carpenter's hammer | 0.75 | + * | 105 | hammer | 14oz carpenter's hammer | 0.875 | + * | 106 | hammer | 18oz carpenter hammer | 1 | + * | 107 | rocks | box of assorted rocks | 5.1 | + * | 108 | jacket | water resistent black wind breaker | 0.1 | + * | 109 | spare tire | 24 inch spare tire | 22.2 | + * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * +-----+--------------------+------------------------------------------------- + * --------+--------+ * */ diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 50cd9b7a4..7b8f88675 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -50,19 +50,19 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -100,6 +100,7 @@ public class PostgreSQLTableFactoryTest { Column.physical("name", DataTypes.STRING()), Column.physical("count", DataTypes.DECIMAL(38, 18)), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), + Column.metadata("row_kind", DataTypes.STRING(), "row_kind", true), Column.metadata( "database_name", DataTypes.STRING(), "database_name", true), Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true), @@ -211,7 +212,7 @@ public class PostgreSQLTableFactoryTest { DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource; postgreSQLTableSource.applyReadableMetadata( - Arrays.asList("op_ts", "database_name", "schema_name", "table_name"), + Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"), SCHEMA_WITH_METADATA.toSourceRowDataType()); actualSource = postgreSQLTableSource.copy(); PostgreSQLTableSource expectedSource = @@ -246,7 +247,7 @@ public class PostgreSQLTableFactoryTest { SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = - Arrays.asList("op_ts", "database_name", "schema_name", "table_name"); + Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"); assertEquals(expectedSource, actualSource);