[postgres] Support metadata columns for postgres-cdc connector (#500)

Co-authored-by: Leonard Xu <xbjtdcq@163.com>
pull/564/head
gongzhongqiang 3 years ago committed by GitHub
parent 02fd0c9c5b
commit 0bd39e5310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -251,6 +251,41 @@ During a snapshot operation, the connector will query each included table to pro
</table>
</div>
Available Metadata
----------------
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 15%">Key</th>
<th class="text-left" style="width: 30%">DataType</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>table_name</td>
<td>STRING NOT NULL</td>
<td>Name of the table that contain the row.</td>
</tr>
<tr>
<td>database_name</td>
<td>STRING NOT NULL</td>
<td>Name of the database that contain the row.</td>
</tr>
<tr>
<td>op_ts</td>
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the binlog, the value is always 0.</td>
</tr>
</tbody>
</table>
</div>
Features
--------

@ -155,6 +155,41 @@ Connector Options
Note: `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-property-slot-name).
Available Metadata
----------------
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
<div class="highlight">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width: 15%">Key</th>
<th class="text-left" style="width: 30%">DataType</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>table_name</td>
<td>STRING NOT NULL</td>
<td>Name of the table that contain the row.</td>
</tr>
<tr>
<td>database_name</td>
<td>STRING NOT NULL</td>
<td>Name of the database that contain the row.</td>
</tr>
<tr>
<td>op_ts</td>
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
</tbody>
</table>
</div>
Features
--------

@ -31,7 +31,7 @@ import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link MySqlTableSource}. */
public enum MySqlReadableMetadata {
/** Name of the table that contain the row. . */
/** Name of the table that contain the row. */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),

@ -52,7 +52,7 @@ import static com.ververica.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMy
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.junit.Assert.assertEquals;
/** Integration tests for MySQL binlog SQL source. */
/** Integration tests for MySQL Table source. */
@RunWith(Parameterized.class)
public class MySqlConnectorITCase extends MySqlSourceTestBase {

@ -80,7 +80,7 @@ public class MySqlTableSourceFactoryTest {
Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true),
Column.metadata(
"_database_name", DataTypes.STRING(), "database_name", true)),
"database_name", DataTypes.STRING(), "database_name", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.table.MetadataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/** Defines the supported metadata columns for {@link PostgreSQLTableSource}. */
public enum PostgreSQLReadableMetadata {
/** Name of the table that contain the row. */
TABLE_NAME(
"table_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
}
}),
/** Name of the database that contain the row. */
DATABASE_NAME(
"database_name",
DataTypes.STRING().notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return StringData.fromString(
sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
}
}),
/**
* It indicates the time that the change was made in the database. If the record is read from
* snapshot of the table instead of the change stream, the value is always 0.
*/
OP_TS(
"op_ts",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(SourceRecord record) {
Struct messageStruct = (Struct) record.value();
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
});
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
PostgreSQLReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.converter = converter;
}
public String getKey() {
return key;
}
public DataType getDataType() {
return dataType;
}
public MetadataConverter getConverter() {
return converter;
}
}

@ -24,17 +24,25 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@ -42,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical
* description.
*/
public class PostgreSQLTableSource implements ScanTableSource {
public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema;
private final int port;
@ -56,6 +64,16 @@ public class PostgreSQLTableSource implements ScanTableSource {
private final String slotName;
private final Properties dbzProperties;
// --------------------------------------------------------------------------------------------
// Mutable attributes
// --------------------------------------------------------------------------------------------
/** Data type that describes the final output of the source. */
protected DataType producedDataType;
/** Metadata that is appended at the end of a physical source row. */
protected List<String> metadataKeys;
public PostgreSQLTableSource(
TableSchema physicalSchema,
int port,
@ -79,6 +97,8 @@ public class PostgreSQLTableSource implements ScanTableSource {
this.pluginName = checkNotNull(pluginName);
this.slotName = slotName;
this.dbzProperties = dbzProperties;
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
}
@Override
@ -93,13 +113,16 @@ public class PostgreSQLTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType(rowType)
.setPhysicalRowType(physicalDataType)
.setMetadataConverters(metadataConverters)
.setResultTypeInfo(typeInfo)
.setValueValidator(new PostgresValueValidator(schemaName, tableName))
.build();
@ -120,20 +143,40 @@ public class PostgreSQLTableSource implements ScanTableSource {
return SourceFunctionProvider.of(sourceFunction, false);
}
private MetadataConverter[] getMetadataConverters() {
if (metadataKeys.isEmpty()) {
return new MetadataConverter[0];
}
return metadataKeys.stream()
.map(
key ->
Stream.of(PostgreSQLReadableMetadata.values())
.filter(m -> m.getKey().equals(key))
.findFirst()
.orElseThrow(IllegalStateException::new))
.map(PostgreSQLReadableMetadata::getConverter)
.toArray(MetadataConverter[]::new);
}
@Override
public DynamicTableSource copy() {
return new PostgreSQLTableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
dbzProperties);
PostgreSQLTableSource source =
new PostgreSQLTableSource(
physicalSchema,
port,
hostname,
database,
schemaName,
tableName,
username,
password,
pluginName,
slotName,
dbzProperties);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
}
@Override
@ -155,7 +198,9 @@ public class PostgreSQLTableSource implements ScanTableSource {
&& Objects.equals(password, that.password)
&& Objects.equals(pluginName, that.pluginName)
&& Objects.equals(slotName, that.slotName)
&& Objects.equals(dbzProperties, that.dbzProperties);
&& Objects.equals(dbzProperties, that.dbzProperties)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
}
@Override
@ -171,11 +216,28 @@ public class PostgreSQLTableSource implements ScanTableSource {
password,
pluginName,
slotName,
dbzProperties);
dbzProperties,
producedDataType,
metadataKeys);
}
@Override
public String asSummaryString() {
return "PostgreSQL-CDC";
}
@Override
public Map<String, DataType> listReadableMetadata() {
return Stream.of(PostgreSQLReadableMetadata.values())
.collect(
Collectors.toMap(
PostgreSQLReadableMetadata::getKey,
PostgreSQLReadableMetadata::getDataType));
}
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
this.metadataKeys = metadataKeys;
this.producedDataType = producedDataType;
}
}

@ -35,6 +35,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@ -44,7 +45,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Integration tests for MySQL binlog SQL source. */
/** Integration tests for PostgreSQL Table source. */
public class PostgreSQLConnectorITCase extends PostgresTestBase {
private final StreamExecutionEnvironment env =
@ -337,6 +338,103 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testMetadataColumns() throws Throwable {
initializePostgresTable("inventory");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ " id INT NOT NULL,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3)"
+ ") WITH ("
+ " 'connector' = 'postgres-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'schema-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.slot.name' = '%s'"
+ ")",
POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
POSTGERS_CONTAINER.getUsername(),
POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(),
"inventory",
"products",
"meta_data_slot");
String sinkDDL =
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ " id INT,"
+ " name STRING,"
+ " description STRING,"
+ " weight DECIMAL(10,3),"
+ " PRIMARY KEY (id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false',"
+ " 'sink-expected-messages-num' = '20'"
+ ")";
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
// sync submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
waitForSnapshotStarted("sink");
try (Connection connection = getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;");
statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;");
statement.execute(
"INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
statement.execute(
"INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
statement.execute(
"UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.products WHERE id=111;");
}
// waiting for change events finished.
waitForSinkSize("sink", 16);
List<String> expected =
Arrays.asList(
"+I(postgres,products,101,scooter,Small 2-wheel scooter,3.140)",
"+I(postgres,products,102,car battery,12V car battery,8.100)",
"+I(postgres,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)",
"+I(postgres,products,104,hammer,12oz carpenter's hammer,0.750)",
"+I(postgres,products,105,hammer,14oz carpenter's hammer,0.875)",
"+I(postgres,products,106,hammer,16oz carpenter's hammer,1.000)",
"+I(postgres,products,107,rocks,box of assorted rocks,5.300)",
"+I(postgres,products,108,jacket,water resistent black wind breaker,0.100)",
"+I(postgres,products,109,spare tire,24 inch spare tire,22.200)",
"+I(postgres,products,110,jacket,water resistent white wind breaker,0.200)",
"+I(postgres,products,111,scooter,Big 2-wheel scooter ,5.180)",
"+U(postgres,products,106,hammer,18oz carpenter hammer,1.000)",
"+U(postgres,products,107,rocks,box of assorted rocks,5.100)",
"+U(postgres,products,110,jacket,new water resistent white wind breaker,0.500)",
"+U(postgres,products,111,scooter,Big 2-wheel scooter ,5.170)",
"-D(postgres,products,111,scooter,Big 2-wheel scooter ,5.170)");
List<String> actual = TestValuesTableFactory.getRawResults("sink");
Collections.sort(actual);
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);

@ -37,6 +37,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -60,6 +61,18 @@ public class PostgreSQLTableFactoryTest {
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final ResolvedSchema SCHEMA_WITH_METADATA =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.BIGINT().notNull()),
Column.physical("name", DataTypes.STRING()),
Column.physical("count", DataTypes.DECIMAL(38, 18)),
Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true),
Column.metadata(
"database_name", DataTypes.STRING(), "database_name", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
@ -73,7 +86,7 @@ public class PostgreSQLTableFactoryTest {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
@ -117,6 +130,37 @@ public class PostgreSQLTableFactoryTest {
assertEquals(expectedSource, actualSource);
}
@Test
public void testMetadataColumns() {
Map<String, String> properties = getAllOptions();
// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource;
postgreSQLTableSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = postgreSQLTableSource.copy();
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(
fromResolvedSchema(SCHEMA_WITH_METADATA)),
5432,
MY_LOCALHOST,
MY_DATABASE,
MY_SCHEMA,
MY_TABLE,
MY_USERNAME,
MY_PASSWORD,
"decoderbufs",
"flink",
new Properties());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
assertEquals(expectedSource, actualSource);
}
@Test
public void testValidation() {
// validate illegal port
@ -124,7 +168,7 @@ public class PostgreSQLTableFactoryTest {
Map<String, String> properties = getAllOptions();
properties.put("port", "123b");
createTableSource(properties);
createTableSource(SCHEMA, properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
@ -140,7 +184,7 @@ public class PostgreSQLTableFactoryTest {
properties.remove(requiredOption.key());
try {
createTableSource(properties);
createTableSource(SCHEMA, properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
@ -156,7 +200,7 @@ public class PostgreSQLTableFactoryTest {
Map<String, String> properties = getAllOptions();
properties.put("unknown", "abc");
createTableSource(properties);
createTableSource(SCHEMA, properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(
@ -177,19 +221,24 @@ public class PostgreSQLTableFactoryTest {
return options;
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
private static DynamicTableSource createTableSource(
ResolvedSchema schema, Map<String, String> options) {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
fromResolvedSchema(schema).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
schema),
new Configuration(),
PostgreSQLTableFactoryTest.class.getClassLoader(),
false);
}
private static DynamicTableSource createTableSource(Map<String, String> options) {
return createTableSource(SCHEMA, options);
}
}

Loading…
Cancel
Save