From 947dffca3182722340048c913c286ea5d3d5956b Mon Sep 17 00:00:00 2001 From: moses <72908278+ChaomingZhangCN@users.noreply.github.com> Date: Thu, 16 Jan 2025 21:05:03 +0800 Subject: [PATCH] [FLINK-37122][build] Try to be compatible with old flink version 1.18.x This closes #3859 --- .../docs/get-started/quickstart/mysql-to-doris.md | 8 ++++---- .../docs/get-started/quickstart/mysql-to-starrocks.md | 6 +++--- .../content/docs/get-started/quickstart/mysql-to-doris.md | 8 ++++---- .../docs/get-started/quickstart/mysql-to-starrocks.md | 8 ++++---- .../testsource/source/DistributedSourceFunction.java | 4 ++-- .../runtime/operators/schema/common/SchemaRegistry.java | 4 +++- .../operators/schema/distributed/SchemaOperator.java | 2 +- .../runtime/operators/schema/regular/SchemaOperator.java | 2 +- .../partitioning/DistributedPrePartitionOperator.java | 2 +- 9 files changed, 23 insertions(+), 21 deletions(-) diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md index c89390fef..6270c2e8a 100644 --- a/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md @@ -33,11 +33,11 @@ under the License. 准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 ### 准备 Flink Standalone 集群 -1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz),解压后得到 flink-1.18.0 目录。 - 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。 +1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz),解压后得到 flink-1.19.1 目录。 + 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.19.1 所在目录。 ```shell - cd flink-1.18.0 + cd flink-1.19.1 ``` 2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。 @@ -338,7 +338,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置, ```shell docker-compose down ``` -在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群: +在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群: ```shell ./bin/stop-cluster.sh diff --git a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md index 46ea45d5e..ca7c26fe1 100644 --- a/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md +++ b/docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md @@ -33,11 +33,11 @@ under the License. 准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 ### 准备 Flink Standalone 集群 -1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,解压后得到 flink-1.18.0 目录。 +1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,解压后得到 flink-1.19.1 目录。 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。 ```shell - cd flink-1.18.0 + cd flink-1.19.1 ``` 2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。 @@ -306,7 +306,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置, docker-compose down ``` -在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群: +在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群: ```shell ./bin/stop-cluster.sh diff --git a/docs/content/docs/get-started/quickstart/mysql-to-doris.md b/docs/content/docs/get-started/quickstart/mysql-to-doris.md index 1aee5fb05..431508750 100644 --- a/docs/content/docs/get-started/quickstart/mysql-to-doris.md +++ b/docs/content/docs/get-started/quickstart/mysql-to-doris.md @@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation. Prepare a Linux or MacOS computer with Docker installed. ### Prepare Flink Standalone cluster -1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,unzip and get flink-1.18.0 directory. - Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located. +1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,unzip and get flink-1.19.1 directory. + Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located. ```shell - cd flink-1.18.0 + cd flink-1.19.1 ``` 2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds. @@ -343,7 +343,7 @@ After finishing the tutorial, run the following command to stop all containers i ```shell docker-compose down ``` -Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`: +Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`: ```shell ./bin/stop-cluster.sh diff --git a/docs/content/docs/get-started/quickstart/mysql-to-starrocks.md b/docs/content/docs/get-started/quickstart/mysql-to-starrocks.md index 55eaeedf2..690da0031 100644 --- a/docs/content/docs/get-started/quickstart/mysql-to-starrocks.md +++ b/docs/content/docs/get-started/quickstart/mysql-to-starrocks.md @@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation. Prepare a Linux or MacOS computer with Docker installed. ### Prepare Flink Standalone cluster -1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,unzip and get flink-1.18.0 directory. - Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located. +1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,unzip and get flink-1.19.1 directory. + Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located. ```shell - cd flink-1.18.0 + cd flink-1.19.1 ``` 2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds. @@ -310,7 +310,7 @@ After finishing the tutorial, run the following command to stop all containers i docker-compose down ``` -Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`: +Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`: ```shell ./bin/stop-cluster.sh diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java index b7b7787f1..4192a4f64 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java @@ -78,8 +78,8 @@ public class DistributedSourceFunction extends RichParallelSourceFunction public void open(Configuration parameters) throws Exception { super.open(parameters); iotaCounter = 0; - subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); - parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); + subTaskId = getRuntimeContext().getIndexOfThisSubtask(); + parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); if (distributedTables) { tables = IntStream.range(0, numOfTables) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 149049177..1e681a79f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -287,7 +287,9 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio @Override public final void executionAttemptFailed( int subTaskId, int attemptNumber, @Nullable Throwable reason) { - failedReasons.put(subTaskId, reason); + if (reason != null) { + failedReasons.put(subTaskId, reason); + } } @Override diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java index ab856ba5f..80adea3f0 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java @@ -97,7 +97,7 @@ public class SchemaOperator extends AbstractStreamOperator @Override public void open() throws Exception { super.open(); - subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + subTaskId = getRuntimeContext().getIndexOfThisSubtask(); upstreamSchemaTable = HashBasedTable.create(); evolvedSchemaMap = new HashMap<>(); tableIdRouter = new TableIdRouter(routingRules); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index e5039543e..7d62d4c07 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -133,7 +133,7 @@ public class SchemaOperator extends AbstractStreamOperator this.schemaOperatorMetrics = new SchemaOperatorMetrics( getRuntimeContext().getMetricGroup(), schemaChangeBehavior); - this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); + this.subTaskId = getRuntimeContext().getIndexOfThisSubtask(); this.originalSchemaMap = new HashMap<>(); this.evolvedSchemaMap = new HashMap<>(); this.router = new TableIdRouter(routingRules); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java index 3cce8923b..bedbe8e8e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java @@ -63,7 +63,7 @@ public class DistributedPrePartitionOperator extends AbstractStreamOperator(); hashFunctionMap = new HashMap<>(); }