[FLINK-35067][cdc-connector][postgres] Adding metadata 'row_kind' for Postgres CDC Connector.

This closes #3716.

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/3658/head
Olivier 2 weeks ago committed by GitHub
parent a16abd5d24
commit 7997f51c55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -387,6 +387,13 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td> <td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td> <td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr> </tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
</tr>
</tbody> </tbody>
</table> </table>

@ -18,7 +18,9 @@
package org.apache.flink.cdc.connectors.postgres.table; package org.apache.flink.cdc.connectors.postgres.table;
import org.apache.flink.cdc.debezium.table.MetadataConverter; import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
@ -95,6 +97,28 @@ public enum PostgreSQLReadableMetadata {
return TimestampData.fromEpochMillis( return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
} }
}),
/**
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
*/
ROW_KIND(
"row_kind",
DataTypes.STRING().notNull(),
new RowDataMetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(RowData rowData) {
return StringData.fromString(rowData.getRowKind().shortString());
}
@Override
public Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"Please call read(RowData rowData) method instead.");
}
}); });
private final String key; private final String key;

@ -165,12 +165,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
* The final database table looks like this: * The final database table looks like this:
* *
* > SELECT * FROM products; * > SELECT * FROM products;
* +-----+--------------------+---------------------------------------------------------+--------+ * +-----+--------------------+-------------------------------------------------
* --------+--------+
* | id | name | description | weight | * | id | name | description | weight |
* +-----+--------------------+---------------------------------------------------------+--------+ * +-----+--------------------+-------------------------------------------------
* --------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 | * | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 | * | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
* #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 | * | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 | * | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 | * | 106 | hammer | 18oz carpenter hammer | 1 |
@ -178,7 +181,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
* | 108 | jacket | water resistent black wind breaker | 0.1 | * | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 | * | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 | * | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+ * +-----+--------------------+-------------------------------------------------
* --------+--------+
* </pre> * </pre>
*/ */
@ -246,7 +250,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
// async submit job // async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
// wait for the source startup, we don't have a better way to wait it, use sleep for now // wait for the source startup, we don't have a better way to wait it, use sleep
// for now
Thread.sleep(10000L); Thread.sleep(10000L);
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
@ -469,6 +474,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL," + " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " schema_name STRING METADATA VIRTUAL," + " schema_name STRING METADATA VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL," + " table_name STRING METADATA VIRTUAL,"
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
+ " id INT NOT NULL," + " id INT NOT NULL,"
+ " name STRING," + " name STRING,"
+ " description STRING," + " description STRING,"
@ -501,6 +507,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
+ " database_name STRING," + " database_name STRING,"
+ " schema_name STRING," + " schema_name STRING,"
+ " table_name STRING," + " table_name STRING,"
+ " row_kind STRING,"
+ " id INT," + " id INT,"
+ " name STRING," + " name STRING,"
+ " description STRING," + " description STRING,"
@ -546,52 +553,52 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
Arrays.asList( Arrays.asList(
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,101,scooter,Small 2-wheel scooter,3.140)", + ",inventory,products,+I,101,scooter,Small 2-wheel scooter,3.140)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,102,car battery,12V car battery,8.100)", + ",inventory,products,+I,102,car battery,12V car battery,8.100)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", + ",inventory,products,+I,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,104,hammer,12oz carpenter's hammer,0.750)", + ",inventory,products,+I,104,hammer,12oz carpenter's hammer,0.750)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,105,hammer,14oz carpenter's hammer,0.875)", + ",inventory,products,+I,105,hammer,14oz carpenter's hammer,0.875)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,106,hammer,16oz carpenter's hammer,1.000)", + ",inventory,products,+I,106,hammer,16oz carpenter's hammer,1.000)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.300)", + ",inventory,products,+I,107,rocks,box of assorted rocks,5.300)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,108,jacket,water resistent black wind breaker,0.100)", + ",inventory,products,+I,108,jacket,water resistent black wind breaker,0.100)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,109,spare tire,24 inch spare tire,22.200)", + ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,110,jacket,water resistent white wind breaker,0.200)", + ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)",
"+I(" "+I("
+ databaseName + databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.180)", + ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)",
"+U(" "+U("
+ databaseName + databaseName
+ ",inventory,products,106,hammer,18oz carpenter hammer,1.000)", + ",inventory,products,+U,106,hammer,18oz carpenter hammer,1.000)",
"+U(" "+U("
+ databaseName + databaseName
+ ",inventory,products,107,rocks,box of assorted rocks,5.100)", + ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)",
"+U(" "+U("
+ databaseName + databaseName
+ ",inventory,products,110,jacket,new water resistent white wind breaker,0.500)", + ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)",
"+U(" "+U("
+ databaseName + databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)", + ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(" "-D("
+ databaseName + databaseName
+ ",inventory,products,111,scooter,Big 2-wheel scooter ,5.170)"); + ",inventory,products,-D,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink"); List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
Collections.sort(actual); Collections.sort(actual);
Collections.sort(expected); Collections.sort(expected);
@ -679,12 +686,15 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
* The final database table looks like this: * The final database table looks like this:
* *
* > SELECT * FROM products; * > SELECT * FROM products;
* +-----+--------------------+---------------------------------------------------------+--------+ * +-----+--------------------+-------------------------------------------------
* --------+--------+
* | id | name | description | weight | * | id | name | description | weight |
* +-----+--------------------+---------------------------------------------------------+--------+ * +-----+--------------------+-------------------------------------------------
* --------+--------+
* | 101 | scooter | Small 2-wheel scooter | 3.14 | * | 101 | scooter | Small 2-wheel scooter | 3.14 |
* | 102 | car battery | 12V car battery | 8.1 | * | 102 | car battery | 12V car battery | 8.1 |
* | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 | * | 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from
* #40 to #3 | 0.8 |
* | 104 | hammer | 12oz carpenter's hammer | 0.75 | * | 104 | hammer | 12oz carpenter's hammer | 0.75 |
* | 105 | hammer | 14oz carpenter's hammer | 0.875 | * | 105 | hammer | 14oz carpenter's hammer | 0.875 |
* | 106 | hammer | 18oz carpenter hammer | 1 | * | 106 | hammer | 18oz carpenter hammer | 1 |
@ -692,7 +702,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
* | 108 | jacket | water resistent black wind breaker | 0.1 | * | 108 | jacket | water resistent black wind breaker | 0.1 |
* | 109 | spare tire | 24 inch spare tire | 22.2 | * | 109 | spare tire | 24 inch spare tire | 22.2 |
* | 110 | jacket | new water resistent white wind breaker | 0.5 | * | 110 | jacket | new water resistent white wind breaker | 0.5 |
* +-----+--------------------+---------------------------------------------------------+--------+ * +-----+--------------------+-------------------------------------------------
* --------+--------+
* </pre> * </pre>
*/ */

@ -50,19 +50,19 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES; import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction; import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -100,6 +100,7 @@ public class PostgreSQLTableFactoryTest {
Column.physical("name", DataTypes.STRING()), Column.physical("name", DataTypes.STRING()),
Column.physical("count", DataTypes.DECIMAL(38, 18)), Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata("row_kind", DataTypes.STRING(), "row_kind", true),
Column.metadata( Column.metadata(
"database_name", DataTypes.STRING(), "database_name", true), "database_name", DataTypes.STRING(), "database_name", true),
Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true), Column.metadata("schema_name", DataTypes.STRING(), "schema_name", true),
@ -211,7 +212,7 @@ public class PostgreSQLTableFactoryTest {
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource; PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource;
postgreSQLTableSource.applyReadableMetadata( postgreSQLTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "schema_name", "table_name"), Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType()); SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = postgreSQLTableSource.copy(); actualSource = postgreSQLTableSource.copy();
PostgreSQLTableSource expectedSource = PostgreSQLTableSource expectedSource =
@ -246,7 +247,7 @@ public class PostgreSQLTableFactoryTest {
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue()); SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name"); Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name");
assertEquals(expectedSource, actualSource); assertEquals(expectedSource, actualSource);

Loading…
Cancel
Save