[build] Bump up Flink version to 1.12.1 (#95)

pull/107/head
jiabao.sun 4 years ago committed by Jark Wu
parent 05a28a4428
commit 2dfb59f3c7
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -22,7 +22,7 @@ This README is meant as a brief walkthrough on the core features with Flink CDC
We need several steps to setup a Flink cluster with the provided connector. We need several steps to setup a Flink cluster with the provided connector.
1. Setup a Flink cluster with version 1.11+ and Java 8+ installed. 1. Setup a Flink cluster with version 1.12+ and Java 8+ installed.
2. Download the connector SQL jars from the [Download](https://github.com/ververica/flink-cdc-connectors/wiki/Downloads) page (or [build yourself](#building-from-source). 2. Download the connector SQL jars from the [Download](https://github.com/ververica/flink-cdc-connectors/wiki/Downloads) page (or [build yourself](#building-from-source).
3. Put the downloaded jars under `FLINK_HOME/lib/`. 3. Put the downloaded jars under `FLINK_HOME/lib/`.
4. Restart the Flink cluster. 4. Restart the Flink cluster.

@ -96,7 +96,7 @@ public class MySQLTableSource implements ScanTableSource {
@Override @Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType()); TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema( DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
rowType, rowType,
typeInfo, typeInfo,

@ -179,6 +179,7 @@ public class MySQLTableSourceFactoryTest {
ObjectIdentifier.of("default", "default", "t1"), ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"), new CatalogTableImpl(SCHEMA, options, "mock source"),
new Configuration(), new Configuration(),
MySQLTableSourceFactoryTest.class.getClassLoader()); MySQLTableSourceFactoryTest.class.getClassLoader(),
false);
} }
} }

@ -93,7 +93,7 @@ public class PostgreSQLTableSource implements ScanTableSource {
@Override @Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType()); TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema( DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
rowType, rowType,
typeInfo, typeInfo,

@ -161,7 +161,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
" 'password' = '%s'," + " 'password' = '%s'," +
" 'database-name' = '%s'," + " 'database-name' = '%s'," +
" 'schema-name' = '%s'," + " 'schema-name' = '%s'," +
" 'table-name' = '%s'" + " 'table-name' = '%s'," +
" 'debezium.slot.name' = '%s'" +
")", ")",
POSTGERS_CONTAINER.getHost(), POSTGERS_CONTAINER.getHost(),
POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT), POSTGERS_CONTAINER.getMappedPort(POSTGRESQL_PORT),
@ -169,7 +170,8 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
POSTGERS_CONTAINER.getPassword(), POSTGERS_CONTAINER.getPassword(),
POSTGERS_CONTAINER.getDatabaseName(), POSTGERS_CONTAINER.getDatabaseName(),
"inventory", "inventory",
"products"); "products",
"replica_identity_slot");
String sinkDDL = "CREATE TABLE sink (" + String sinkDDL = "CREATE TABLE sink (" +
" name STRING," + " name STRING," +
" weightSum DECIMAL(10,3)," + " weightSum DECIMAL(10,3)," +
@ -199,7 +201,7 @@ public class PostgreSQLConnectorITCase extends PostgresTestBase {
} }
try { try {
result.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); result.await();
} catch (Exception e) { } catch (Exception e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, assertTrue(ExceptionUtils.findThrowableWithMessage(e,
"The \"before\" field of UPDATE/DELETE message is null, " + "The \"before\" field of UPDATE/DELETE message is null, " +

@ -165,6 +165,7 @@ public class PostgreSQLTableFactoryTest {
ObjectIdentifier.of("default", "default", "t1"), ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"), new CatalogTableImpl(SCHEMA, options, "mock source"),
new Configuration(), new Configuration(),
PostgreSQLTableFactoryTest.class.getClassLoader()); PostgreSQLTableFactoryTest.class.getClassLoader(),
false);
} }
} }

@ -62,13 +62,11 @@ public class ChangelogJsonFormatFactory implements DeserializationFormatFactory,
TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions); TimestampFormat timestampFormat = JsonOptions.getTimestampFormat(formatOptions);
return new DecodingFormat<DeserializationSchema<RowData>>() { return new DecodingFormat<DeserializationSchema<RowData>>() {
@SuppressWarnings("unchecked")
@Override @Override
public DeserializationSchema<RowData> createRuntimeDecoder( public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) { DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType(); final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo = final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDataType);
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
return new ChangelogJsonDeserializationSchema( return new ChangelogJsonDeserializationSchema(
rowType, rowType,
rowDataTypeInfo, rowDataTypeInfo,
@ -111,8 +109,7 @@ public class ChangelogJsonFormatFactory implements DeserializationFormatFactory,
final RowType rowType = (RowType) consumedDataType.getLogicalType(); final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new ChangelogJsonSerializationSchema( return new ChangelogJsonSerializationSchema(
rowType, rowType,
timestampFormat timestampFormat);
);
} }
}; };
} }

@ -19,6 +19,7 @@
package com.alibaba.ververica.cdc.formats.json; package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema; import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
@ -51,10 +52,14 @@ public class ChangelogJsonSerializationSchema implements SerializationSchema<Row
private transient GenericRowData reuse; private transient GenericRowData reuse;
public ChangelogJsonSerializationSchema(RowType rowType, TimestampFormat timestampFormat) { public ChangelogJsonSerializationSchema(
RowType rowType,
TimestampFormat timestampFormat) {
this.jsonSerializer = new JsonRowDataSerializationSchema( this.jsonSerializer = new JsonRowDataSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)), createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat); timestampFormat,
JsonOptions.MapNullKeyMode.FAIL,
JsonOptions.MAP_NULL_KEY_LITERAL.defaultValue());
this.timestampFormat = timestampFormat; this.timestampFormat = timestampFormat;
} }

@ -33,7 +33,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory; import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger;
@ -68,7 +68,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
public void testSeDeSchema() { public void testSeDeSchema() {
final ChangelogJsonDeserializationSchema expectedDeser = new ChangelogJsonDeserializationSchema( final ChangelogJsonDeserializationSchema expectedDeser = new ChangelogJsonDeserializationSchema(
ROW_TYPE, ROW_TYPE,
RowDataTypeInfo.of(ROW_TYPE), InternalTypeInfo.of(ROW_TYPE),
true, true,
TimestampFormat.ISO_8601); TimestampFormat.ISO_8601);
final ChangelogJsonSerializationSchema expectedSer = new ChangelogJsonSerializationSchema( final ChangelogJsonSerializationSchema expectedSer = new ChangelogJsonSerializationSchema(
@ -148,7 +148,8 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
ObjectIdentifier.of("default", "default", "t1"), ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock source"), new CatalogTableImpl(SCHEMA, options, "mock source"),
new Configuration(), new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader()); ChangelogJsonFormatFactoryTest.class.getClassLoader(),
false);
} }
private static DynamicTableSink createTableSink(Map<String, String> options) { private static DynamicTableSink createTableSink(Map<String, String> options) {
@ -157,6 +158,7 @@ public class ChangelogJsonFormatFactoryTest extends TestLogger {
ObjectIdentifier.of("default", "default", "t1"), ObjectIdentifier.of("default", "default", "t1"),
new CatalogTableImpl(SCHEMA, options, "mock sink"), new CatalogTableImpl(SCHEMA, options, "mock sink"),
new Configuration(), new Configuration(),
ChangelogJsonFormatFactoryTest.class.getClassLoader()); ChangelogJsonFormatFactoryTest.class.getClassLoader(),
false);
} }
} }

@ -20,7 +20,7 @@ package com.alibaba.ververica.cdc.formats.json;
import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
@ -61,7 +61,7 @@ public class ChangelogJsonSerDeTest {
List<String> lines = readLines("changelog-json-data.txt"); List<String> lines = readLines("changelog-json-data.txt");
ChangelogJsonDeserializationSchema deserializationSchema = new ChangelogJsonDeserializationSchema( ChangelogJsonDeserializationSchema deserializationSchema = new ChangelogJsonDeserializationSchema(
SCHEMA, SCHEMA,
RowDataTypeInfo.of(SCHEMA), InternalTypeInfo.of(SCHEMA),
false, false,
TimestampFormat.SQL); TimestampFormat.SQL);

@ -62,7 +62,7 @@ under the License.
</distributionManagement> </distributionManagement>
<properties> <properties>
<flink.version>1.11.1</flink.version> <flink.version>1.12.1</flink.version>
<debezium.version>1.2.1.Final</debezium.version> <debezium.version>1.2.1.Final</debezium.version>
<testcontainers.version>1.15.1</testcontainers.version> <testcontainers.version>1.15.1</testcontainers.version>
<java.version>1.8</java.version> <java.version>1.8</java.version>

Loading…
Cancel
Save