[debezium] Bump Flink version to 1.13.0 (#176)

pull/199/head
Leonard Xu 4 years ago committed by GitHub
parent d6814f7a27
commit 1436410adf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,10 +24,12 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLTestBase;
import com.alibaba.ververica.cdc.connectors.mysql.utils.UniqueDatabase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import java.sql.Connection;
@ -58,6 +60,8 @@ public class MySQLConnectorITCase extends MySQLTestBase {
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
@Before
public void before() {
TestValuesTableFactory.clearAllData();

@ -21,9 +21,12 @@ package com.alibaba.ververica.cdc.connectors.mysql.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
@ -33,25 +36,30 @@ import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Test for {@link MySQLTableSource} created by {@link MySQLTableSourceFactory}. */
public class MySQLTableSourceFactoryTest {
private static final TableSchema SCHEMA =
TableSchema.builder()
.field("aaa", DataTypes.INT().notNull())
.field("bbb", DataTypes.STRING().notNull())
.field("ccc", DataTypes.DOUBLE())
.field("ddd", DataTypes.DECIMAL(31, 18))
.field("eee", DataTypes.TIMESTAMP(3))
.primaryKey("bbb", "aaa")
.build();
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
@ -68,7 +76,7 @@ public class MySQLTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource =
new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -95,7 +103,7 @@ public class MySQLTableSourceFactoryTest {
dbzProperties.put("snapshot.mode", "never");
MySQLTableSource expectedSource =
new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3307,
MY_LOCALHOST,
MY_DATABASE,
@ -124,7 +132,7 @@ public class MySQLTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(options);
MySQLTableSource expectedSource =
new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3307,
MY_LOCALHOST,
MY_DATABASE,
@ -147,7 +155,7 @@ public class MySQLTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource =
new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -170,7 +178,7 @@ public class MySQLTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource =
new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -193,7 +201,7 @@ public class MySQLTableSourceFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
MySQLTableSource expectedSource =
new MySQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
3306,
MY_LOCALHOST,
MY_DATABASE,
@ -299,7 +307,13 @@ public class MySQLTableSourceFactoryTest {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
MySQLTableSourceFactoryTest.class.getClassLoader(),
false);

@ -23,10 +23,12 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.util.ExceptionUtils;
import com.alibaba.ververica.cdc.connectors.postgres.PostgresTestBase;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import java.sql.Connection;
@ -52,6 +54,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
env,
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
@Before
public void before() {
TestValuesTableFactory.clearAllData();

@ -21,9 +21,12 @@ package com.alibaba.ververica.cdc.connectors.postgres.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
@ -32,25 +35,30 @@ import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Test for {@link PostgreSQLTableSource} created by {@link PostgreSQLTableFactory}. */
public class PostgreSQLTableFactoryTest {
private static final TableSchema SCHEMA =
TableSchema.builder()
.field("aaa", DataTypes.INT().notNull())
.field("bbb", DataTypes.STRING().notNull())
.field("ccc", DataTypes.DOUBLE())
.field("ddd", DataTypes.DECIMAL(31, 18))
.field("eee", DataTypes.TIMESTAMP(3))
.primaryKey("bbb", "aaa")
.build();
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("aaa", DataTypes.INT().notNull()),
Column.physical("bbb", DataTypes.STRING().notNull()),
Column.physical("ccc", DataTypes.DOUBLE()),
Column.physical("ddd", DataTypes.DECIMAL(31, 18)),
Column.physical("eee", DataTypes.TIMESTAMP(3))),
new ArrayList<>(),
UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa")));
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
@ -68,7 +76,7 @@ public class PostgreSQLTableFactoryTest {
DynamicTableSource actualSource = createTableSource(properties);
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
5432,
MY_LOCALHOST,
MY_DATABASE,
@ -94,7 +102,7 @@ public class PostgreSQLTableFactoryTest {
dbzProperties.put("snapshot.mode", "never");
PostgreSQLTableSource expectedSource =
new PostgreSQLTableSource(
TableSchemaUtils.getPhysicalSchema(SCHEMA),
TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)),
5444,
MY_LOCALHOST,
MY_DATABASE,
@ -172,7 +180,13 @@ public class PostgreSQLTableFactoryTest {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
PostgreSQLTableFactoryTest.class.getClassLoader(),
false);

@ -20,8 +20,8 @@ package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;

@ -23,8 +23,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;

@ -19,9 +19,9 @@
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@ -59,7 +59,8 @@ public class ChangelogJsonSerializationSchema implements SerializationSchema<Row
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
JsonOptions.MapNullKeyMode.FAIL,
JsonOptions.MAP_NULL_KEY_LITERAL.defaultValue());
JsonOptions.MAP_NULL_KEY_LITERAL.defaultValue(),
JsonOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER.defaultValue());
this.timestampFormat = timestampFormat;
}

@ -21,11 +21,13 @@ package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
@ -42,10 +44,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import static org.apache.flink.table.api.TableSchema.fromResolvedSchema;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -53,14 +58,17 @@ import static org.junit.Assert.assertTrue;
public class ChangelogJsonFormatFactoryTest extends TestLogger {
@Rule public ExpectedException thrown = ExpectedException.none();
private static final TableSchema SCHEMA =
TableSchema.builder()
.field("a", DataTypes.STRING())
.field("b", DataTypes.INT())
.field("c", DataTypes.BOOLEAN())
.build();
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("a", DataTypes.STRING()),
Column.physical("b", DataTypes.STRING()),
Column.physical("c", DataTypes.BOOLEAN())),
new ArrayList<>(),
null);
private static final RowType ROW_TYPE = (RowType) SCHEMA.toRowDataType().getLogicalType();
private static final RowType ROW_TYPE =
(RowType) fromResolvedSchema(SCHEMA).toRowDataType().getLogicalType();
@Test
public void testSeDeSchema() {
@ -79,7 +87,8 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
DeserializationSchema<RowData> actualDeser =
scanSourceMock.valueFormat.createRuntimeDecoder(
ScanRuntimeProviderContext.INSTANCE, SCHEMA.toRowDataType());
ScanRuntimeProviderContext.INSTANCE,
fromResolvedSchema(SCHEMA).toRowDataType());
assertEquals(expectedDeser, actualDeser);
@ -90,7 +99,8 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
SerializationSchema<RowData> actualSer =
sinkMock.valueFormat.createRuntimeEncoder(
new SinkRuntimeProviderContext(false), SCHEMA.toRowDataType());
new SinkRuntimeProviderContext(false),
fromResolvedSchema(SCHEMA).toRowDataType());
assertEquals(expectedSer, actualSer);
}
@ -142,7 +152,13 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
return FactoryUtil.createTableSource(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader(),
false);
@ -152,7 +168,13 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
return FactoryUtil.createTableSink(
null,
ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock sink"),
new ResolvedCatalogTable(
CatalogTable.of(
fromResolvedSchema(SCHEMA).toSchema(),
"mock source",
new ArrayList<>(),
options),
SCHEMA),
new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader(),
false);

@ -18,7 +18,7 @@
package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

@ -62,7 +62,7 @@ under the License.
</distributionManagement>
<properties>
<flink.version>1.12.1</flink.version>
<flink.version>1.13.0</flink.version>
<debezium.version>1.4.1.Final</debezium.version>
<testcontainers.version>1.15.1</testcontainers.version>
<java.version>1.8</java.version>

Loading…
Cancel
Save