[common] Apply filter on ResolvedSchema to keep physical columns only (#1097)

Co-authored-by: Qingsheng Ren <renqschn@gmail.com>
pull/1110/head
Leonard Xu 3 years ago committed by GitHub
parent 27979bf908
commit df4f6f4660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.debezium.utils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import java.util.stream.Collectors;
/** Utilities to {@link ResolvedSchema}. */
@Internal
public class ResolvedSchemaUtils {
private ResolvedSchemaUtils() {}
/** Return {@link ResolvedSchema} which consists of all physical columns. */
public static ResolvedSchema getPhysicalSchema(ResolvedSchema resolvedSchema) {
return new ResolvedSchema(
resolvedSchema.getColumns().stream()
.filter(Column::isPhysical)
.collect(Collectors.toList()),
resolvedSchema.getWatermarkSpecs(),
resolvedSchema.getPrimaryKey().orElse(null));
}
}

@ -35,6 +35,7 @@ import java.util.Set;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkArgument;
/** Factory for creating configured instance of {@link MongoDBTableSource}. */ /** Factory for creating configured instance of {@link MongoDBTableSource}. */
@ -214,7 +215,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
? ZoneId.systemDefault() ? ZoneId.systemDefault()
: ZoneId.of(zoneId); : ZoneId.of(zoneId);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present"); checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field"); checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");

@ -36,6 +36,7 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import org.junit.Test; import org.junit.Test;
import java.time.ZoneId; import java.time.ZoneId;
@ -169,7 +170,7 @@ public class MongoDBTableFactoryTest {
MongoDBTableSource expectedSource = MongoDBTableSource expectedSource =
new MongoDBTableSource( new MongoDBTableSource(
SCHEMA_WITH_METADATA, ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA),
MY_HOSTS, MY_HOSTS,
USER, USER,
PASSWORD, PASSWORD,

@ -62,6 +62,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkState;
/** Factory for creating configured instance of {@link MySqlTableSource}. */ /** Factory for creating configured instance of {@link MySqlTableSource}. */
@ -90,7 +91,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
String serverId = validateAndGetServerId(config); String serverId = validateAndGetServerId(config);
StartupOptions startupOptions = getStartupOptions(config); StartupOptions startupOptions = getStartupOptions(config);
Duration connectTimeout = config.get(CONNECT_TIMEOUT); Duration connectTimeout = config.get(CONNECT_TIMEOUT);

@ -33,6 +33,7 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import org.junit.Test; import org.junit.Test;
import java.time.Duration; import java.time.Duration;
@ -430,7 +431,7 @@ public class MySqlTableSourceFactoryTest {
MySqlTableSource expectedSource = MySqlTableSource expectedSource =
new MySqlTableSource( new MySqlTableSource(
SCHEMA_WITH_METADATA, ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA),
3306, 3306,
MY_LOCALHOST, MY_LOCALHOST,
MY_DATABASE, MY_DATABASE,

@ -32,6 +32,7 @@ import java.util.Set;
import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
/** Factory for creating configured instance of {@link PostgreSQLTableSource}. */ /** Factory for creating configured instance of {@link PostgreSQLTableSource}. */
public class PostgreSQLTableFactory implements DynamicTableSourceFactory { public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
@ -116,7 +117,8 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
int port = config.get(PORT); int port = config.get(PORT);
String pluginName = config.get(DECODING_PLUGIN_NAME); String pluginName = config.get(DECODING_PLUGIN_NAME);
String slotName = config.get(SLOT_NAME); String slotName = config.get(SLOT_NAME);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
return new PostgreSQLTableSource( return new PostgreSQLTableSource(
physicalSchema, physicalSchema,

@ -38,6 +38,7 @@ import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -150,7 +151,7 @@ public class PostgreSQLTableFactoryTest {
actualSource = postgreSQLTableSource.copy(); actualSource = postgreSQLTableSource.copy();
PostgreSQLTableSource expectedSource = PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource( new PostgreSQLTableSource(
SCHEMA_WITH_METADATA, ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA),
5432, 5432,
MY_LOCALHOST, MY_LOCALHOST,
MY_DATABASE, MY_DATABASE,

@ -33,6 +33,7 @@ import java.util.Set;
import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
/** Factory for creating configured instance of {@link SqlServerTableSource}. */ /** Factory for creating configured instance of {@link SqlServerTableSource}. */
public class SqlServerTableFactory implements DynamicTableSourceFactory { public class SqlServerTableFactory implements DynamicTableSourceFactory {
@ -113,7 +114,8 @@ public class SqlServerTableFactory implements DynamicTableSourceFactory {
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE)); ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
int port = config.get(PORT); int port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config); StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
return new SqlServerTableSource( return new SqlServerTableSource(
physicalSchema, physicalSchema,

@ -35,6 +35,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import org.junit.Test; import org.junit.Test;
import java.time.ZoneId; import java.time.ZoneId;
@ -144,7 +145,7 @@ public class SqlServerTableFactoryTest {
actualSource = sqlServerTableSource.copy(); actualSource = sqlServerTableSource.copy();
SqlServerTableSource expectedSource = SqlServerTableSource expectedSource =
new SqlServerTableSource( new SqlServerTableSource(
SCHEMA_WITH_METADATA, ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA),
1433, 1433,
MY_LOCALHOST, MY_LOCALHOST,
MY_DATABASE, MY_DATABASE,

@ -39,6 +39,7 @@ import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_GET_
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY; import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_BATCH_SCAN_CONCURRENCY;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT; import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_SCAN_TIMEOUT;
import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_TIMEOUT; import static com.ververica.cdc.connectors.tidb.TDBSourceOptions.TIKV_GRPC_TIMEOUT;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
/** Factory for creating configured instance of {@link TiDBTableSource}. */ /** Factory for creating configured instance of {@link TiDBTableSource}. */
public class TiDBTableSourceFactory implements DynamicTableSourceFactory { public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
@ -55,7 +56,8 @@ public class TiDBTableSourceFactory implements DynamicTableSourceFactory {
String tableName = config.get(TABLE_NAME); String tableName = config.get(TABLE_NAME);
String pdAddresses = config.get(PD_ADDRESSES); String pdAddresses = config.get(PD_ADDRESSES);
StartupOptions startupOptions = getStartupOptions(config); StartupOptions startupOptions = getStartupOptions(config);
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
return new TiDBTableSource( return new TiDBTableSource(
physicalSchema, physicalSchema,

Loading…
Cancel
Save