diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index 9de9e8a27..368e2240a 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -251,6 +251,41 @@ During a snapshot operation, the connector will query each included table to pro +Available Metadata +---------------- + +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + +
+ + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the binlog, the value is always 0.
+
+ + Features -------- diff --git a/docs/content/connectors/postgres-cdc.md b/docs/content/connectors/postgres-cdc.md index 3f56fea55..1cb1088f9 100644 --- a/docs/content/connectors/postgres-cdc.md +++ b/docs/content/connectors/postgres-cdc.md @@ -155,6 +155,41 @@ Connector Options Note: `slot.name` is recommended to set for different tables to avoid the potential `PSQLException: ERROR: replication slot "flink" is active for PID 974` error. See more [here](https://debezium.io/documentation/reference/1.5/connectors/postgresql.html#postgresql-property-slot-name). +Available Metadata +---------------- + +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + +
+ + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDataTypeDescription
table_nameSTRING NOT NULLName of the table that contain the row.
database_nameSTRING NOT NULLName of the database that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.
+
+ + Features -------- diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java index a5b44c864..c4d9b8532 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java @@ -31,7 +31,7 @@ import org.apache.kafka.connect.source.SourceRecord; /** Defines the supported metadata columns for {@link MySqlTableSource}. */ public enum MySqlReadableMetadata { - /** Name of the table that contain the row. . */ + /** Name of the table that contain the row. */ TABLE_NAME( "table_name", DataTypes.STRING().notNull(), diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index e57c38cb5..05b8e03e0 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -52,7 +52,7 @@ import static com.ververica.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMy import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.junit.Assert.assertEquals; -/** Integration tests for MySQL binlog SQL source. */ +/** Integration tests for MySQL Table source. */ @RunWith(Parameterized.class) public class MySqlConnectorITCase extends MySqlSourceTestBase { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 94617f444..1ac82d3f9 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -80,7 +80,7 @@ public class MySqlTableSourceFactoryTest { Column.physical("count", DataTypes.DECIMAL(38, 18)), Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true), Column.metadata( - "_database_name", DataTypes.STRING(), "database_name", true)), + "database_name", DataTypes.STRING(), "database_name", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java new file mode 100644 index 000000000..4c8315778 --- /dev/null +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLReadableMetadata.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.postgres.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import com.ververica.cdc.debezium.table.MetadataConverter; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** Defines the supported metadata columns for {@link PostgreSQLTableSource}. */ +public enum PostgreSQLReadableMetadata { + /** Name of the table that contain the row. */ + TABLE_NAME( + "table_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY)); + } + }), + + /** Name of the database that contain the row. */ + DATABASE_NAME( + "database_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return StringData.fromString( + sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY)); + } + }), + + /** + * It indicates the time that the change was made in the database. If the record is read from + * snapshot of the table instead of the change stream, the value is always 0. + */ + OP_TS( + "op_ts", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + return TimestampData.fromEpochMillis( + (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)); + } + }); + + private final String key; + + private final DataType dataType; + + private final MetadataConverter converter; + + PostgreSQLReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public MetadataConverter getConverter() { + return converter; + } +} diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java index dd1eb06b8..1992663d4 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -24,17 +24,25 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.MetadataConverter; import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -42,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A {@link DynamicTableSource} that describes how to create a PostgreSQL source from a logical * description. */ -public class PostgreSQLTableSource implements ScanTableSource { +public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata { private final TableSchema physicalSchema; private final int port; @@ -56,6 +64,16 @@ public class PostgreSQLTableSource implements ScanTableSource { private final String slotName; private final Properties dbzProperties; + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + public PostgreSQLTableSource( TableSchema physicalSchema, int port, @@ -79,6 +97,8 @@ public class PostgreSQLTableSource implements ScanTableSource { this.pluginName = checkNotNull(pluginName); this.slotName = slotName; this.dbzProperties = dbzProperties; + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); } @Override @@ -93,13 +113,16 @@ public class PostgreSQLTableSource implements ScanTableSource { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); TypeInformation typeInfo = scanContext.createTypeInformation(physicalSchema.toRowDataType()); DebeziumDeserializationSchema deserializer = RowDataDebeziumDeserializeSchema.newBuilder() - .setPhysicalRowType(rowType) + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) .setResultTypeInfo(typeInfo) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .build(); @@ -120,20 +143,40 @@ public class PostgreSQLTableSource implements ScanTableSource { return SourceFunctionProvider.of(sourceFunction, false); } + private MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> + Stream.of(PostgreSQLReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(PostgreSQLReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + @Override public DynamicTableSource copy() { - return new PostgreSQLTableSource( - physicalSchema, - port, - hostname, - database, - schemaName, - tableName, - username, - password, - pluginName, - slotName, - dbzProperties); + PostgreSQLTableSource source = + new PostgreSQLTableSource( + physicalSchema, + port, + hostname, + database, + schemaName, + tableName, + username, + password, + pluginName, + slotName, + dbzProperties); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; } @Override @@ -155,7 +198,9 @@ public class PostgreSQLTableSource implements ScanTableSource { && Objects.equals(password, that.password) && Objects.equals(pluginName, that.pluginName) && Objects.equals(slotName, that.slotName) - && Objects.equals(dbzProperties, that.dbzProperties); + && Objects.equals(dbzProperties, that.dbzProperties) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys); } @Override @@ -171,11 +216,28 @@ public class PostgreSQLTableSource implements ScanTableSource { password, pluginName, slotName, - dbzProperties); + dbzProperties, + producedDataType, + metadataKeys); } @Override public String asSummaryString() { return "PostgreSQL-CDC"; } + + @Override + public Map listReadableMetadata() { + return Stream.of(PostgreSQLReadableMetadata.values()) + .collect( + Collectors.toMap( + PostgreSQLReadableMetadata::getKey, + PostgreSQLReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } } diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index fc8b702bf..8fed6accf 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -35,6 +35,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; @@ -44,7 +45,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; -/** Integration tests for MySQL binlog SQL source. */ +/** Integration tests for PostgreSQL Table source. */ public class PostgreSQLConnectorITCase extends PostgresTestBase { private final StreamExecutionEnvironment env = @@ -337,6 +338,103 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testMetadataColumns() throws Throwable { + initializePostgresTable("inventory"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + + " table_name STRING METADATA VIRTUAL," + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.slot.name' = '%s'" + + ")", + POSTGERS_CONTAINER.getHost(), + POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGERS_CONTAINER.getUsername(), + POSTGERS_CONTAINER.getPassword(), + POSTGERS_CONTAINER.getDatabaseName(), + "inventory", + "products", + "meta_data_slot"); + + String sinkDDL = + "CREATE TABLE sink (" + + " database_name STRING," + + " table_name STRING," + + " id INT," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // sync submit job + TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source"); + + waitForSnapshotStarted("sink"); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM inventory.products WHERE id=111;"); + } + + // waiting for change events finished. + waitForSinkSize("sink", 16); + + List expected = + Arrays.asList( + "+I(postgres,products,101,scooter,Small 2-wheel scooter,3.140)", + "+I(postgres,products,102,car battery,12V car battery,8.100)", + "+I(postgres,products,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", + "+I(postgres,products,104,hammer,12oz carpenter's hammer,0.750)", + "+I(postgres,products,105,hammer,14oz carpenter's hammer,0.875)", + "+I(postgres,products,106,hammer,16oz carpenter's hammer,1.000)", + "+I(postgres,products,107,rocks,box of assorted rocks,5.300)", + "+I(postgres,products,108,jacket,water resistent black wind breaker,0.100)", + "+I(postgres,products,109,spare tire,24 inch spare tire,22.200)", + "+I(postgres,products,110,jacket,water resistent white wind breaker,0.200)", + "+I(postgres,products,111,scooter,Big 2-wheel scooter ,5.180)", + "+U(postgres,products,106,hammer,18oz carpenter hammer,1.000)", + "+U(postgres,products,107,rocks,box of assorted rocks,5.100)", + "+U(postgres,products,110,jacket,new water resistent white wind breaker,0.500)", + "+U(postgres,products,111,scooter,Big 2-wheel scooter ,5.170)", + "-D(postgres,products,111,scooter,Big 2-wheel scooter ,5.170)"); + List actual = TestValuesTableFactory.getRawResults("sink"); + Collections.sort(actual); + assertEquals(expected, actual); + result.getJobClient().get().cancel().get(); + } + private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { while (sinkSize(sinkName) == 0) { Thread.sleep(100); diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index df89b1ba7..f06e2597a 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -37,6 +37,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -60,6 +61,18 @@ public class PostgreSQLTableFactoryTest { new ArrayList<>(), UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa"))); + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.BIGINT().notNull()), + Column.physical("name", DataTypes.STRING()), + Column.physical("count", DataTypes.DECIMAL(38, 18)), + Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true), + Column.metadata( + "database_name", DataTypes.STRING(), "database_name", true)), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); + private static final String MY_LOCALHOST = "localhost"; private static final String MY_USERNAME = "flinkuser"; private static final String MY_PASSWORD = "flinkpw"; @@ -73,7 +86,7 @@ public class PostgreSQLTableFactoryTest { Map properties = getAllOptions(); // validation for source - DynamicTableSource actualSource = createTableSource(properties); + DynamicTableSource actualSource = createTableSource(SCHEMA, properties); PostgreSQLTableSource expectedSource = new PostgreSQLTableSource( TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), @@ -117,6 +130,37 @@ public class PostgreSQLTableFactoryTest { assertEquals(expectedSource, actualSource); } + @Test + public void testMetadataColumns() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); + PostgreSQLTableSource postgreSQLTableSource = (PostgreSQLTableSource) actualSource; + postgreSQLTableSource.applyReadableMetadata( + Arrays.asList("op_ts", "database_name"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + actualSource = postgreSQLTableSource.copy(); + PostgreSQLTableSource expectedSource = + new PostgreSQLTableSource( + TableSchemaUtils.getPhysicalSchema( + fromResolvedSchema(SCHEMA_WITH_METADATA)), + 5432, + MY_LOCALHOST, + MY_DATABASE, + MY_SCHEMA, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + "decoderbufs", + "flink", + new Properties()); + expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); + + assertEquals(expectedSource, actualSource); + } + @Test public void testValidation() { // validate illegal port @@ -124,7 +168,7 @@ public class PostgreSQLTableFactoryTest { Map properties = getAllOptions(); properties.put("port", "123b"); - createTableSource(properties); + createTableSource(SCHEMA, properties); fail("exception expected"); } catch (Throwable t) { assertTrue( @@ -140,7 +184,7 @@ public class PostgreSQLTableFactoryTest { properties.remove(requiredOption.key()); try { - createTableSource(properties); + createTableSource(SCHEMA, properties); fail("exception expected"); } catch (Throwable t) { assertTrue( @@ -156,7 +200,7 @@ public class PostgreSQLTableFactoryTest { Map properties = getAllOptions(); properties.put("unknown", "abc"); - createTableSource(properties); + createTableSource(SCHEMA, properties); fail("exception expected"); } catch (Throwable t) { assertTrue( @@ -177,19 +221,24 @@ public class PostgreSQLTableFactoryTest { return options; } - private static DynamicTableSource createTableSource(Map options) { + private static DynamicTableSource createTableSource( + ResolvedSchema schema, Map options) { return FactoryUtil.createTableSource( null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable( CatalogTable.of( - fromResolvedSchema(SCHEMA).toSchema(), + fromResolvedSchema(schema).toSchema(), "mock source", new ArrayList<>(), options), - SCHEMA), + schema), new Configuration(), PostgreSQLTableFactoryTest.class.getClassLoader(), false); } + + private static DynamicTableSource createTableSource(Map options) { + return createTableSource(SCHEMA, options); + } }