[common] Use ResolvedSchema instead of deprecated TableSchema API (#782)

pull/687/head
gongzhongqiang 3 years ago committed by GitHub
parent 40034abe01
commit 1b34887c08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@ -54,7 +54,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final String hosts;
private final String connectionOptions;
private final String username;
@ -83,7 +83,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
protected List<String> metadataKeys;
public MongoDBTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
String hosts,
@Nullable String username,
@Nullable String password,
@ -136,7 +136,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
scanContext.createTypeInformation(physicalSchema.toPhysicalRowDataType());
DebeziumDeserializationSchema<RowData> deserializer =
new MongoDBConnectorDeserializationSchema(

@ -21,13 +21,12 @@ package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.ZoneId;
import java.util.HashSet;
@ -208,8 +207,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
? ZoneId.systemDefault()
: ZoneId.of(zoneId);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field");

@ -20,6 +20,7 @@ package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@ -28,7 +29,6 @@ import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
@ -43,7 +43,6 @@ import java.util.Map;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_ALL;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -93,7 +92,7 @@ public class MongoDBTableFactoryTest {
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
MY_HOSTS,
USER,
PASSWORD,
@ -130,7 +129,7 @@ public class MongoDBTableFactoryTest {
MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
MY_HOSTS,
USER,
PASSWORD,
@ -164,7 +163,7 @@ public class MongoDBTableFactoryTest {
MongoDBTableSource expectedSource =
new MongoDBTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA_WITH_METADATA,
MY_HOSTS,
USER,
PASSWORD,
@ -222,7 +221,7 @@ public class MongoDBTableFactoryTest {
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),

@ -235,7 +235,7 @@ public class MySqlSource {
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
// Add default configurations for compatibility when set the legacy mysql connector
// implementation
if (LEGACY_IMPLEMENTATION_VALUE.equals(

@ -288,7 +288,7 @@ public class MySqlSourceConfigFactory implements Serializable {
// override the user-defined debezium properties
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
}
return new MySqlSourceConfig(

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@ -58,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
@ -91,7 +91,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
protected List<String> metadataKeys;
public MySqlTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
@ -136,7 +136,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
}
public MySqlTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,

@ -20,12 +20,11 @@ package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
@ -89,8 +88,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE);
ZoneId serverTimeZone = ZoneId.of(config.get(SERVER_TIME_ZONE));
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
String serverId = validateAndGetServerId(config);
StartupOptions startupOptions = getStartupOptions(config);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
@ -214,7 +212,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
}
}
private void validatePrimaryKeyIfEnableParallel(TableSchema physicalSchema) {
private void validatePrimaryKeyIfEnableParallel(ResolvedSchema physicalSchema) {
if (!physicalSchema.getPrimaryKey().isPresent()) {
throw new ValidationException(
String.format(

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@ -30,7 +31,6 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
@ -54,7 +54,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -101,7 +100,7 @@ public class MySqlTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -140,7 +139,7 @@ public class MySqlTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -176,7 +175,7 @@ public class MySqlTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -210,7 +209,7 @@ public class MySqlTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -247,7 +246,7 @@ public class MySqlTableSourceFactoryTest {
dbzProperties.put("snapshot.mode", "never");
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3307,
MY_LOCALHOST,
MY_DATABASE,
@ -304,7 +303,7 @@ public class MySqlTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -369,7 +368,7 @@ public class MySqlTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -406,8 +405,7 @@ public class MySqlTableSourceFactoryTest {
MySqlTableSource expectedSource =
new MySqlTableSource(
TableSchemaUtils.getPhysicalSchema(
fromResolvedSchema(SCHEMA_WITH_METADATA)),
SCHEMA_WITH_METADATA,
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -646,7 +644,7 @@ public class MySqlTableSourceFactoryTest {
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),

@ -167,7 +167,7 @@ public class OracleSource {
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
}
return new DebeziumSourceFunction<>(

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.oracle.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@ -52,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class OracleTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
@ -74,7 +74,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
protected List<String> metadataKeys;
public OracleTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
@ -113,8 +113,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()

@ -21,12 +21,11 @@ package com.ververica.cdc.connectors.oracle.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import com.ververica.cdc.debezium.table.DebeziumOptions;
@ -110,8 +109,7 @@ public class OracleTableSourceFactory implements DynamicTableSourceFactory {
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
StartupOptions startupOptions = getStartupOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
return new OracleTableSource(
physicalSchema,

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.oracle.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@ -30,7 +31,6 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
@ -42,7 +42,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -94,7 +93,7 @@ public class OracleTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -118,7 +117,7 @@ public class OracleTableSourceFactoryTest {
dbzProperties.put("snapshot.mode", "initial");
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -140,7 +139,7 @@ public class OracleTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -162,7 +161,7 @@ public class OracleTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -188,8 +187,7 @@ public class OracleTableSourceFactoryTest {
actualSource = oracleTableSource.copy();
OracleTableSource expectedSource =
new OracleTableSource(
TableSchemaUtils.getPhysicalSchema(
fromResolvedSchema(SCHEMA_WITH_METADATA)),
SCHEMA_WITH_METADATA,
1521,
MY_LOCALHOST,
MY_DATABASE,
@ -293,7 +291,7 @@ public class OracleTableSourceFactoryTest {
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),

@ -175,7 +175,7 @@ public class PostgreSQLSource {
}
if (dbzProperties != null) {
dbzProperties.forEach(props::put);
props.putAll(dbzProperties);
}
return new DebeziumSourceFunction<>(

@ -21,12 +21,11 @@ package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.util.HashSet;
import java.util.Set;
@ -117,8 +116,7 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
int port = config.get(PORT);
String pluginName = config.get(DECODING_PLUGIN_NAME);
String slotName = config.get(SLOT_NAME);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema();
return new PostgreSQLTableSource(
physicalSchema,

@ -19,7 +19,7 @@
package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@ -52,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMetadata {
private final TableSchema physicalSchema;
private final ResolvedSchema physicalSchema;
private final int port;
private final String hostname;
private final String database;
@ -75,7 +75,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
protected List<String> metadataKeys;
public PostgreSQLTableSource(
TableSchema physicalSchema,
ResolvedSchema physicalSchema,
int port,
String hostname,
String database,
@ -116,8 +116,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
RowType physicalDataType =
(RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
MetadataConverter[] metadataConverters = getMetadataConverters();
TypeInformation<RowData> typeInfo =
scanContext.createTypeInformation(physicalSchema.toRowDataType());
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.postgres.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@ -30,7 +31,6 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
@ -42,7 +42,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -91,7 +90,7 @@ public class PostgreSQLTableFactoryTest {
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
5432,
MY_LOCALHOST,
MY_DATABASE,
@ -118,7 +117,7 @@ public class PostgreSQLTableFactoryTest {
dbzProperties.put("snapshot.mode", "never");
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
SCHEMA,
5444,
MY_LOCALHOST,
MY_DATABASE,
@ -145,8 +144,7 @@ public class PostgreSQLTableFactoryTest {
actualSource = postgreSQLTableSource.copy();
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(
fromResolvedSchema(SCHEMA_WITH_METADATA)),
SCHEMA_WITH_METADATA,
5432,
MY_LOCALHOST,
MY_DATABASE,
@ -235,7 +233,7 @@ public class PostgreSQLTableFactoryTest {
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(schema).toSchema(),
Schema.newBuilder().fromResolvedSchema(schema).build(),
"mock source",
new ArrayList<>(),
options),

@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@ -50,7 +51,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -67,8 +67,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
new ArrayList<>(),
null);
private static final RowType ROW_TYPE =
(RowType) fromResolvedSchema(SCHEMA).toRowDataType().getLogicalType();
private static final RowType ROW_TYPE = (RowType) SCHEMA.toSourceRowDataType().getLogicalType();
@Test
public void testSeDeSchema() {
@ -87,8 +86,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
DeserializationSchema<RowData> actualDeser =
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE,
fromResolvedSchema(SCHEMA).toRowDataType());
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toSourceRowDataType());
assertEquals(expectedDeser, actualDeser);
@ -99,8 +97,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(
new SinkRuntimeProviderContext(false),
fromResolvedSchema(SCHEMA).toRowDataType());
new SinkRuntimeProviderContext(false), SCHEMA.toSourceRowDataType());
assertEquals(expectedSer, actualSer);
}
@ -154,7 +151,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
"mock source",
new ArrayList<>(),
options),
@ -170,7 +167,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
ObjectIdentifier.of("default", "default", "t1"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
"mock source",
new ArrayList<>(),
options),

Loading…
Cancel
Save