From 5b3dd9884e4ac012ed646715156134904963d148 Mon Sep 17 00:00:00 2001 From: Tigran Manasyan <52424622+tigrulya-exe@users.noreply.github.com> Date: Thu, 27 Oct 2022 22:24:25 +0700 Subject: [PATCH] [common] Bump Flink version to 1.15.2 (#1504) This closes #1363. Co-authored-by: Hang Ruan --- flink-cdc-base/pom.xml | 6 +-- flink-cdc-e2e-tests/pom.xml | 11 ++++ .../utils/FlinkContainerTestEnvironment.java | 5 +- flink-connector-debezium/pom.xml | 2 +- flink-connector-mongodb-cdc/pom.xml | 4 +- flink-connector-mysql-cdc/pom.xml | 6 +-- .../MySqlDeserializationConverterFactory.java | 8 +-- .../cdc/connectors/mysql/MySqlTestUtils.java | 52 +++++++++++++++++++ .../mysql/table/MySqlConnectorITCase.java | 5 +- flink-connector-oceanbase-cdc/pom.xml | 18 +++++-- flink-connector-oracle-cdc/pom.xml | 4 +- flink-connector-postgres-cdc/pom.xml | 4 +- flink-connector-sqlserver-cdc/pom.xml | 4 +- flink-connector-test-util/pom.xml | 2 +- flink-connector-tidb-cdc/pom.xml | 6 +-- pom.xml | 14 +++-- 16 files changed, 118 insertions(+), 33 deletions(-) diff --git a/flink-cdc-base/pom.xml b/flink-cdc-base/pom.xml index a08cd2696..950b1103c 100644 --- a/flink-cdc-base/pom.xml +++ b/flink-cdc-base/pom.xml @@ -54,14 +54,14 @@ under the License. org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime ${flink.version} test org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -82,7 +82,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml index fbd96c750..563581361 100644 --- a/flink-cdc-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/pom.xml @@ -30,6 +30,7 @@ under the License. 1.13.6 1.14.4 + 1.15.2 8.0.27 @@ -209,6 +210,16 @@ under the License. + + org.apache.flink + flink-connector-jdbc + ${flink-1.15} + jdbc-connector_${flink-1.15}.jar + jar + ${project.build.directory}/dependencies + + + com.ververica flink-sql-connector-mysql-cdc diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index edb41fba0..6f043f49c 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -121,7 +121,7 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { @Parameterized.Parameters(name = "flinkVersion: {0}") public static List getFlinkVersion() { - return Arrays.asList("1.13.6", "1.14.4"); + return Arrays.asList("1.13.6", "1.14.4", "1.15.2"); } @Before @@ -258,6 +258,9 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { } private String getFlinkDockerImageTag() { + if ("1.15.2".equals(flinkVersion)) { + return String.format("flink:%s-scala_2.12", flinkVersion); + } return String.format("flink:%s-scala_2.11", flinkVersion); } diff --git a/flink-connector-debezium/pom.xml b/flink-connector-debezium/pom.xml index cc30c0d70..2e894a7f2 100644 --- a/flink-connector-debezium/pom.xml +++ b/flink-connector-debezium/pom.xml @@ -52,7 +52,7 @@ under the License. org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test diff --git a/flink-connector-mongodb-cdc/pom.xml b/flink-connector-mongodb-cdc/pom.xml index 9c9f60b1f..96ce7c58e 100644 --- a/flink-connector-mongodb-cdc/pom.xml +++ b/flink-connector-mongodb-cdc/pom.xml @@ -81,7 +81,7 @@ under the License. org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -96,7 +96,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-connector-mysql-cdc/pom.xml b/flink-connector-mysql-cdc/pom.xml index 5c5e87de8..c65e0a864 100644 --- a/flink-connector-mysql-cdc/pom.xml +++ b/flink-connector-mysql-cdc/pom.xml @@ -108,14 +108,14 @@ under the License. org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime ${flink.version} test org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -136,7 +136,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java index fe9c15326..50edfb89f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java @@ -21,7 +21,6 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; -import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import com.esri.core.geometry.ogc.OGCGeometry; import com.fasterxml.jackson.databind.JsonNode; @@ -115,8 +114,7 @@ public class MySqlDeserializationConverterFactory { private static Optional createArrayConverter( ArrayType arrayType) { - if (LogicalTypeChecks.hasFamily( - arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) { + if (hasFamily(arrayType.getElementType(), LogicalTypeFamily.CHARACTER_STRING)) { // only map MySQL SET type to Flink ARRAY type return Optional.of( new DeserializationRuntimeConverter() { @@ -148,4 +146,8 @@ public class MySqlDeserializationConverterFactory { return Optional.empty(); } } + + private static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) { + return logicalType.getTypeRoot().getFamilies().contains(family); + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java index 71261c9e5..ad21e8ee4 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java @@ -16,19 +16,23 @@ package com.ververica.cdc.connectors.mysql; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import com.ververica.cdc.connectors.utils.TestSourceContext; @@ -43,6 +47,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** Utils to help test. */ public class MySqlTestUtils { @@ -108,6 +113,53 @@ public class MySqlTestUtils { return allRecords; } + public static void waitUntilCondition( + SupplierWithException condition, + Deadline timeout, + long retryIntervalMillis, + String errorMsg) + throws Exception { + while (timeout.hasTimeLeft() && !(Boolean) condition.get()) { + long timeLeft = Math.max(0L, timeout.timeLeft().toMillis()); + Thread.sleep(Math.min(retryIntervalMillis, timeLeft)); + } + + if (!timeout.hasTimeLeft()) { + throw new TimeoutException(errorMsg); + } + } + + public static void waitForJobStatus( + JobClient client, List expectedStatus, Deadline deadline) throws Exception { + waitUntilCondition( + () -> { + JobStatus currentStatus = (JobStatus) client.getJobStatus().get(); + if (expectedStatus.contains(currentStatus)) { + return true; + } else if (currentStatus.isTerminalState()) { + try { + client.getJobExecutionResult().get(); + } catch (Exception var4) { + throw new IllegalStateException( + String.format( + "Job has entered %s state, but expecting %s", + currentStatus, expectedStatus), + var4); + } + + throw new IllegalStateException( + String.format( + "Job has entered a terminal state %s, but expecting %s", + currentStatus, expectedStatus)); + } else { + return false; + } + }, + deadline, + 100L, + "Condition was not met in given timeout."); + } + private static Properties createDebeziumProperties(boolean useLegacyImplementation) { Properties debeziumProps = new Properties(); if (useLegacyImplementation) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index fbe8a8f48..79b5f8ef2 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -57,8 +57,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static com.ververica.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMySqlLatestOffset; +import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.waitForJobStatus; import static org.apache.flink.api.common.JobStatus.RUNNING; -import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus; import static org.junit.Assert.assertEquals; /** Integration tests for MySQL Table source. */ @@ -70,8 +70,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; - private static final MySqlContainer MYSQL8_CONTAINER = - (MySqlContainer) createMySqlContainer(MySqlVersion.V8_0).withExposedPorts(3307); + private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); diff --git a/flink-connector-oceanbase-cdc/pom.xml b/flink-connector-oceanbase-cdc/pom.xml index f848185cb..239033084 100644 --- a/flink-connector-oceanbase-cdc/pom.xml +++ b/flink-connector-oceanbase-cdc/pom.xml @@ -71,22 +71,34 @@ under the License. org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime ${flink.version} test org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test + + + org.testcontainers + testcontainers + + org.apache.flink flink-connector-test-utils ${flink.version} test + + + org.testcontainers + testcontainers + + @@ -99,7 +111,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-connector-oracle-cdc/pom.xml b/flink-connector-oracle-cdc/pom.xml index 7f7b510b6..00f5d5e60 100644 --- a/flink-connector-oracle-cdc/pom.xml +++ b/flink-connector-oracle-cdc/pom.xml @@ -87,7 +87,7 @@ under the License. org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -102,7 +102,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-connector-postgres-cdc/pom.xml b/flink-connector-postgres-cdc/pom.xml index 82d7471cf..f253eeefe 100644 --- a/flink-connector-postgres-cdc/pom.xml +++ b/flink-connector-postgres-cdc/pom.xml @@ -89,7 +89,7 @@ under the License. org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -104,7 +104,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-connector-sqlserver-cdc/pom.xml b/flink-connector-sqlserver-cdc/pom.xml index fed3f7110..a5971590e 100644 --- a/flink-connector-sqlserver-cdc/pom.xml +++ b/flink-connector-sqlserver-cdc/pom.xml @@ -77,7 +77,7 @@ under the License. org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -92,7 +92,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/flink-connector-test-util/pom.xml b/flink-connector-test-util/pom.xml index 013798204..af01718e7 100644 --- a/flink-connector-test-util/pom.xml +++ b/flink-connector-test-util/pom.xml @@ -42,7 +42,7 @@ under the License. org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} diff --git a/flink-connector-tidb-cdc/pom.xml b/flink-connector-tidb-cdc/pom.xml index 6fcdd5a25..f7e7bcf4f 100644 --- a/flink-connector-tidb-cdc/pom.xml +++ b/flink-connector-tidb-cdc/pom.xml @@ -88,14 +88,14 @@ under the License. org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime ${flink.version} test org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test @@ -116,7 +116,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test-jar test diff --git a/pom.xml b/pom.xml index 629b9c2f1..77d46bf48 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ under the License. - 1.14.4 + 1.15.2 1.6.4.Final 3.2.0 2.2.0 @@ -81,7 +81,7 @@ under the License. See more https://github.com/testcontainers/testcontainers-java/issues/4297. --> 1.15.3 1.8 - 2.11 + 2.12 ${java.version} ${java.version} 1.3 @@ -107,7 +107,13 @@ under the License. org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-connector-base + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge ${flink.version} provided @@ -119,7 +125,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided