diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils/ResolvedSchemaUtils.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils/ResolvedSchemaUtils.java new file mode 100644 index 000000000..148df9c69 --- /dev/null +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils/ResolvedSchemaUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.debezium.utils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; + +import java.util.stream.Collectors; + +/** Utilities to {@link ResolvedSchema}. */ +@Internal +public class ResolvedSchemaUtils { + private ResolvedSchemaUtils() {} + + /** Return {@link ResolvedSchema} which consists of all physical columns. */ + public static ResolvedSchema getPhysicalSchema(ResolvedSchema resolvedSchema) { + return new ResolvedSchema( + resolvedSchema.getColumns().stream() + .filter(Column::isPhysical) + .collect(Collectors.toList()), + resolvedSchema.getWatermarkSpecs(), + resolvedSchema.getPrimaryKey().orElse(null)); + } +} diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index fc6963eba..534cae816 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -35,6 +35,7 @@ import java.util.Set; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; import static org.apache.flink.util.Preconditions.checkArgument; /** Factory for creating configured instance of {@link MongoDBTableSource}. */ @@ -214,7 +215,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { ? ZoneId.systemDefault() : ZoneId.of(zoneId); - ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + ResolvedSchema physicalSchema = + getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present"); checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field"); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index e497b9e3d..71e0aa727 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -36,6 +36,7 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils; import org.junit.Test; import java.time.ZoneId; @@ -169,7 +170,7 @@ public class MongoDBTableFactoryTest { MongoDBTableSource expectedSource = new MongoDBTableSource( - SCHEMA_WITH_METADATA, + ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), MY_HOSTS, USER, PASSWORD, diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 25b42fa4d..242b1d52f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -62,6 +62,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME; import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; import static org.apache.flink.util.Preconditions.checkState; /** Factory for creating configured instance of {@link MySqlTableSource}. */ @@ -90,7 +91,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); - ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + ResolvedSchema physicalSchema = + getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); String serverId = validateAndGetServerId(config); StartupOptions startupOptions = getStartupOptions(config); Duration connectTimeout = config.get(CONNECT_TIMEOUT); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index d7a082029..43a84a65e 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -33,6 +33,7 @@ import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.ExceptionUtils; +import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils; import org.junit.Test; import java.time.Duration; @@ -430,7 +431,7 @@ public class MySqlTableSourceFactoryTest { MySqlTableSource expectedSource = new MySqlTableSource( - SCHEMA_WITH_METADATA, + ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), 3306, MY_LOCALHOST, MY_DATABASE, diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index 2d618f871..6e3b09a6e 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -32,6 +32,7 @@ import java.util.Set; import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; /** Factory for creating configured instance of {@link PostgreSQLTableSource}. */ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { @@ -116,7 +117,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { int port = config.get(PORT); String pluginName = config.get(DECODING_PLUGIN_NAME); String slotName = config.get(SLOT_NAME); - ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + ResolvedSchema physicalSchema = + getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); return new PostgreSQLTableSource( physicalSchema, diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 3b492e23b..f9f3db3a1 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -38,6 +38,7 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils; import org.junit.Test; import java.util.ArrayList; @@ -150,7 +151,7 @@ public class PostgreSQLTableFactoryTest { actualSource = postgreSQLTableSource.copy(); PostgreSQLTableSource expectedSource = new PostgreSQLTableSource( - SCHEMA_WITH_METADATA, + ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), 5432, MY_LOCALHOST, MY_DATABASE, diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java index 9ca6be693..fee965fd3 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java @@ -33,6 +33,7 @@ import java.util.Set; import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; /** Factory for creating configured instance of {@link SqlServerTableSource}. */ public class SqlServerTableFactory implements DynamicTableSourceFactory { @@ -113,7 +114,8 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory { ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); int port = config.get(PORT); StartupOptions startupOptions = getStartupOptions(config); - ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + ResolvedSchema physicalSchema = + getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); return new SqlServerTableSource( physicalSchema, diff --git a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java index 4e2721e14..c4b4aa334 100644 --- a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java +++ b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java @@ -35,6 +35,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils; import org.junit.Test; import java.time.ZoneId; @@ -144,7 +145,7 @@ public class SqlServerTableFactoryTest { actualSource = sqlServerTableSource.copy(); SqlServerTableSource expectedSource = new SqlServerTableSource( - SCHEMA_WITH_METADATA, + ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA), 1433, MY_LOCALHOST, MY_DATABASE, diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java index 40d3dd7cf..49684154a 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java @@ -39,6 +39,7 @@ import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_GET_ import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY; import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT; import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_TIMEOUT; +import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; /** Factory for creating configured instance of {@link TiDBTableSource}. */ public class TiDBTableSourceFactory implements DynamicTableSourceFactory { @@ -55,7 +56,8 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory { String tableName = config.get(TABLE_NAME); String pdAddresses = config.get(PD_ADDRESSES); StartupOptions startupOptions = getStartupOptions(config); - ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + ResolvedSchema physicalSchema = + getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); return new TiDBTableSource( physicalSchema,