[FLINK-37122][build] Try to be compatible with old flink version 1.18.x

This closes #3859
pull/3658/head
moses 2 weeks ago committed by GitHub
parent 7997f51c55
commit 947dffca31
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -33,11 +33,11 @@ under the License.
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
### 准备 Flink Standalone 集群 ### 准备 Flink Standalone 集群
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz),解压后得到 flink-1.18.0 目录。 1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz),解压后得到 flink-1.19.1 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.19.1 所在目录。
```shell ```shell
cd flink-1.18.0 cd flink-1.19.1
``` ```
2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。 2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。
@ -338,7 +338,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,
```shell ```shell
docker-compose down docker-compose down
``` ```
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群: 在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群:
```shell ```shell
./bin/stop-cluster.sh ./bin/stop-cluster.sh

@ -33,11 +33,11 @@ under the License.
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。 准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
### 准备 Flink Standalone 集群 ### 准备 Flink Standalone 集群
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,解压后得到 flink-1.18.0 目录。 1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,解压后得到 flink-1.19.1 目录。
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。 使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
```shell ```shell
cd flink-1.18.0 cd flink-1.19.1
``` ```
2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。 2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint每隔 3 秒做一次 checkpoint。
@ -306,7 +306,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,
docker-compose down docker-compose down
``` ```
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群: 在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群:
```shell ```shell
./bin/stop-cluster.sh ./bin/stop-cluster.sh

@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation.
Prepare a Linux or MacOS computer with Docker installed. Prepare a Linux or MacOS computer with Docker installed.
### Prepare Flink Standalone cluster ### Prepare Flink Standalone cluster
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) unzip and get flink-1.18.0 directory. 1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) unzip and get flink-1.19.1 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located. Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located.
```shell ```shell
cd flink-1.18.0 cd flink-1.19.1
``` ```
2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds. 2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
@ -343,7 +343,7 @@ After finishing the tutorial, run the following command to stop all containers i
```shell ```shell
docker-compose down docker-compose down
``` ```
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`: Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`:
```shell ```shell
./bin/stop-cluster.sh ./bin/stop-cluster.sh

@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation.
Prepare a Linux or MacOS computer with Docker installed. Prepare a Linux or MacOS computer with Docker installed.
### Prepare Flink Standalone cluster ### Prepare Flink Standalone cluster
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) unzip and get flink-1.18.0 directory. 1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) unzip and get flink-1.19.1 directory.
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located. Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located.
```shell ```shell
cd flink-1.18.0 cd flink-1.19.1
``` ```
2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds. 2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
@ -310,7 +310,7 @@ After finishing the tutorial, run the following command to stop all containers i
docker-compose down docker-compose down
``` ```
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`: Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`:
```shell ```shell
./bin/stop-cluster.sh ./bin/stop-cluster.sh

@ -78,8 +78,8 @@ public class DistributedSourceFunction extends RichParallelSourceFunction<Event>
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
iotaCounter = 0; iotaCounter = 0;
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); subTaskId = getRuntimeContext().getIndexOfThisSubtask();
parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
if (distributedTables) { if (distributedTables) {
tables = tables =
IntStream.range(0, numOfTables) IntStream.range(0, numOfTables)

@ -287,8 +287,10 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio
@Override @Override
public final void executionAttemptFailed( public final void executionAttemptFailed(
int subTaskId, int attemptNumber, @Nullable Throwable reason) { int subTaskId, int attemptNumber, @Nullable Throwable reason) {
if (reason != null) {
failedReasons.put(subTaskId, reason); failedReasons.put(subTaskId, reason);
} }
}
@Override @Override
public final void executionAttemptReady( public final void executionAttemptReady(

@ -97,7 +97,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
@Override @Override
public void open() throws Exception { public void open() throws Exception {
super.open(); super.open();
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); subTaskId = getRuntimeContext().getIndexOfThisSubtask();
upstreamSchemaTable = HashBasedTable.create(); upstreamSchemaTable = HashBasedTable.create();
evolvedSchemaMap = new HashMap<>(); evolvedSchemaMap = new HashMap<>();
tableIdRouter = new TableIdRouter(routingRules); tableIdRouter = new TableIdRouter(routingRules);

@ -133,7 +133,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
this.schemaOperatorMetrics = this.schemaOperatorMetrics =
new SchemaOperatorMetrics( new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior); getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
this.originalSchemaMap = new HashMap<>(); this.originalSchemaMap = new HashMap<>();
this.evolvedSchemaMap = new HashMap<>(); this.evolvedSchemaMap = new HashMap<>();
this.router = new TableIdRouter(routingRules); this.router = new TableIdRouter(routingRules);

@ -63,7 +63,7 @@ public class DistributedPrePartitionOperator extends AbstractStreamOperator<Part
@Override @Override
public void open() throws Exception { public void open() throws Exception {
super.open(); super.open();
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); subTaskId = getRuntimeContext().getIndexOfThisSubtask();
schemaMap = new HashMap<>(); schemaMap = new HashMap<>();
hashFunctionMap = new HashMap<>(); hashFunctionMap = new HashMap<>();
} }

Loading…
Cancel
Save