From 5917f787040cf39dcf51ecfc7605597aa89feb65 Mon Sep 17 00:00:00 2001
From: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Date: Tue, 30 Jul 2024 09:41:23 +0800
Subject: [PATCH] [FLINK-35736][test] Add migration test scripts & CI workflows
This closes #3447
---
.github/workflows/flink_cdc.yml | 110 ++++++++++--
tools/mig-test/.gitignore | 8 +
tools/mig-test/README.md | 38 ++++
tools/mig-test/conf/docker-compose.yml | 24 +++
tools/mig-test/conf/pipeline-route.yaml | 41 +++++
tools/mig-test/conf/pipeline.yaml | 33 ++++
tools/mig-test/datastream/README.md | 11 ++
tools/mig-test/datastream/compile_jobs.rb | 26 +++
.../datastream/datastream-2.4.2/.gitignore | 38 ++++
.../datastream/datastream-2.4.2/pom.xml | 151 ++++++++++++++++
.../src/main/java/DataStreamJob.java | 54 ++++++
.../datastream/datastream-3.0.0/.gitignore | 38 ++++
.../datastream/datastream-3.0.0/pom.xml | 151 ++++++++++++++++
.../src/main/java/DataStreamJob.java | 54 ++++++
.../datastream/datastream-3.0.1/.gitignore | 38 ++++
.../datastream/datastream-3.0.1/pom.xml | 151 ++++++++++++++++
.../src/main/java/DataStreamJob.java | 54 ++++++
.../datastream/datastream-3.1.0/.gitignore | 38 ++++
.../datastream/datastream-3.1.0/pom.xml | 151 ++++++++++++++++
.../src/main/java/DataStreamJob.java | 54 ++++++
.../datastream/datastream-3.1.1/.gitignore | 38 ++++
.../datastream/datastream-3.1.1/pom.xml | 151 ++++++++++++++++
.../src/main/java/DataStreamJob.java | 54 ++++++
.../datastream-3.2-SNAPSHOT/.gitignore | 38 ++++
.../datastream-3.2-SNAPSHOT/pom.xml | 151 ++++++++++++++++
.../src/main/java/DataStreamJob.java | 54 ++++++
.../mig-test/datastream/run_migration_test.rb | 138 +++++++++++++++
tools/mig-test/misc/patch_flink_conf.rb | 32 ++++
tools/mig-test/prepare_libs.rb | 123 +++++++++++++
tools/mig-test/run_migration_test.rb | 162 ++++++++++++++++++
30 files changed, 2192 insertions(+), 12 deletions(-)
create mode 100644 tools/mig-test/.gitignore
create mode 100644 tools/mig-test/README.md
create mode 100644 tools/mig-test/conf/docker-compose.yml
create mode 100644 tools/mig-test/conf/pipeline-route.yaml
create mode 100644 tools/mig-test/conf/pipeline.yaml
create mode 100644 tools/mig-test/datastream/README.md
create mode 100644 tools/mig-test/datastream/compile_jobs.rb
create mode 100644 tools/mig-test/datastream/datastream-2.4.2/.gitignore
create mode 100644 tools/mig-test/datastream/datastream-2.4.2/pom.xml
create mode 100644 tools/mig-test/datastream/datastream-2.4.2/src/main/java/DataStreamJob.java
create mode 100644 tools/mig-test/datastream/datastream-3.0.0/.gitignore
create mode 100644 tools/mig-test/datastream/datastream-3.0.0/pom.xml
create mode 100644 tools/mig-test/datastream/datastream-3.0.0/src/main/java/DataStreamJob.java
create mode 100644 tools/mig-test/datastream/datastream-3.0.1/.gitignore
create mode 100644 tools/mig-test/datastream/datastream-3.0.1/pom.xml
create mode 100644 tools/mig-test/datastream/datastream-3.0.1/src/main/java/DataStreamJob.java
create mode 100644 tools/mig-test/datastream/datastream-3.1.0/.gitignore
create mode 100644 tools/mig-test/datastream/datastream-3.1.0/pom.xml
create mode 100644 tools/mig-test/datastream/datastream-3.1.0/src/main/java/DataStreamJob.java
create mode 100644 tools/mig-test/datastream/datastream-3.1.1/.gitignore
create mode 100644 tools/mig-test/datastream/datastream-3.1.1/pom.xml
create mode 100644 tools/mig-test/datastream/datastream-3.1.1/src/main/java/DataStreamJob.java
create mode 100644 tools/mig-test/datastream/datastream-3.2-SNAPSHOT/.gitignore
create mode 100644 tools/mig-test/datastream/datastream-3.2-SNAPSHOT/pom.xml
create mode 100644 tools/mig-test/datastream/datastream-3.2-SNAPSHOT/src/main/java/DataStreamJob.java
create mode 100644 tools/mig-test/datastream/run_migration_test.rb
create mode 100644 tools/mig-test/misc/patch_flink_conf.rb
create mode 100644 tools/mig-test/prepare_libs.rb
create mode 100644 tools/mig-test/run_migration_test.rb
diff --git a/.github/workflows/flink_cdc.yml b/.github/workflows/flink_cdc.yml
index 22b019335..1cd5d2903 100644
--- a/.github/workflows/flink_cdc.yml
+++ b/.github/workflows/flink_cdc.yml
@@ -115,19 +115,8 @@ jobs:
- name: Run license check
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
- migration_test:
- runs-on: ubuntu-latest
- steps:
- - name: Check out repository code
- uses: actions/checkout@v4
- with:
- submodules: true
- - name: Compile snapshot CDC version
- run: mvn --no-snapshot-updates -B install -DskipTests
- - name: Run migration tests
- run: cd flink-cdc-migration-tests && mvn clean verify
-
compile_and_test:
+ needs: license_check
# Only run the CI pipeline for the flink-cdc-connectors repository
# if: github.repository == 'apache/flink-cdc-connectors'
runs-on: ubuntu-latest
@@ -263,3 +252,100 @@ jobs:
done
fi
exit 0
+
+
+ migration_test_ut:
+ needs: license_check
+ runs-on: ubuntu-latest
+ steps:
+ - name: Check out repository code
+ uses: actions/checkout@v4
+ with:
+ submodules: true
+ - name: Compile snapshot CDC version
+ run: mvn --no-snapshot-updates -B install -DskipTests
+ - name: Run migration tests
+ run: cd flink-cdc-migration-tests && mvn clean verify
+
+ pipeline_migration_test:
+ needs: migration_test_ut
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ java-version: [ '8', '11' ]
+
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up Ruby
+ uses: ruby/setup-ruby@v1
+ with:
+ ruby-version: 3.0
+ bundler-cache: true # runs 'bundle install' and caches installed gems automatically
+ - uses: actions/setup-java@v4
+ with:
+ java-version: ${{ matrix.java-version }}
+ distribution: temurin
+ cache: maven
+ - name: Install dependencies
+ run: gem install terminal-table
+ - name: Prepare CDC versions
+ run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
+ - name: Prepare Flink distro
+ run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
+ working-directory: ./tools/mig-test
+ - name: Patch Flink configs
+ run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
+ working-directory: ./tools/mig-test
+ - name: Start containers
+ run: cd conf && docker-compose up -d
+ working-directory: ./tools/mig-test
+ - name: Run migration tests
+ run: FLINK_HOME=./flink-1.18.1/ ruby run_migration_test.rb
+ working-directory: ./tools/mig-test
+ - name: Stop containers
+ if: always()
+ run: cd conf && docker-compose down
+ working-directory: ./tools/mig-test
+
+ data_stream_migration_test:
+ needs: migration_test_ut
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ java-version: [ '8', '11' ]
+
+ steps:
+ - uses: actions/checkout@v4
+ - name: Set up Ruby
+ uses: ruby/setup-ruby@v1
+ with:
+ ruby-version: 3.0
+ bundler-cache: true # runs 'bundle install' and caches installed gems automatically
+ - uses: actions/setup-java@v4
+ with:
+ java-version: ${{ matrix.java-version }}
+ distribution: temurin
+ cache: maven
+ - name: Install dependencies
+ run: gem install terminal-table
+ - name: Prepare CDC versions
+ run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
+ - name: Prepare Flink distro
+ run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
+ working-directory: ./tools/mig-test
+ - name: Patch Flink configs
+ run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
+ working-directory: ./tools/mig-test
+ - name: Compile Dummy DataStream Jobs
+ run: cd datastream && ruby compile_jobs.rb
+ working-directory: ./tools/mig-test
+ - name: Start containers
+ run: cd conf && docker-compose up -d
+ working-directory: ./tools/mig-test
+ - name: Run migration tests
+ run: cd datastream && FLINK_HOME=../flink-1.18.1/ ruby run_migration_test.rb
+ working-directory: ./tools/mig-test
+ - name: Stop containers
+ if: always()
+ run: cd conf && docker-compose down
+ working-directory: ./tools/mig-test
diff --git a/tools/mig-test/.gitignore b/tools/mig-test/.gitignore
new file mode 100644
index 000000000..d0ca45c15
--- /dev/null
+++ b/tools/mig-test/.gitignore
@@ -0,0 +1,8 @@
+*.sql
+savepoints/**
+cdc-versions/**
+cache/**
+.idea/**
+Gemfile.lock
+/logs/
+conf/temp.yaml
\ No newline at end of file
diff --git a/tools/mig-test/README.md b/tools/mig-test/README.md
new file mode 100644
index 000000000..39925cb95
--- /dev/null
+++ b/tools/mig-test/README.md
@@ -0,0 +1,38 @@
+# Flink CDC Migration Test Utilities
+
+## Pipeline Jobs
+### Preparation
+
+1. Install Ruby (macOS has embedded it by default)
+2. (Optional) Run `gem install terminal-table` for better display
+
+### Compile snapshot CDC versions
+3. Set `CDC_SOURCE_HOME` to the root directory of the Flink CDC git repository
+4. Go to `tools/mig-test` and run `ruby prepare_libs.rb` to download released / compile snapshot CDC versions
+
+### Run migration tests
+5. Enter `conf/` and run `docker compose up -d` to start up test containers
+6. Set `FLINK_HOME` to the home directory of Flink
+7. Go back to `tools/mig-test` and run `ruby run_migration_test.rb` to start testing
+
+### Result
+The migration result will be displayed in the console like this:
+
+```
++--------------------------------------------------------------------+
+| Migration Test Result |
++--------------+-------+-------+-------+--------------+--------------+
+| | 3.0.0 | 3.0.1 | 3.1.0 | 3.1-SNAPSHOT | 3.2-SNAPSHOT |
+| 3.0.0 | ❓ | ❓ | ❌ | ✅ | ✅ |
+| 3.0.1 | | ❓ | ❌ | ✅ | ✅ |
+| 3.1.0 | | | ✅ | ❌ | ❌ |
+| 3.1-SNAPSHOT | | | | ✅ | ✅ |
+| 3.2-SNAPSHOT | | | | | ✅ |
++--------------+-------+-------+-------+--------------+--------------+
+```
+
+> ✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`
+
+## DataStream Jobs
+
+See `datastream/README.md`.
diff --git a/tools/mig-test/conf/docker-compose.yml b/tools/mig-test/conf/docker-compose.yml
new file mode 100644
index 000000000..b195a59f6
--- /dev/null
+++ b/tools/mig-test/conf/docker-compose.yml
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '3'
+services:
+ mysql:
+ image: mysql:8.0
+ environment:
+ MYSQL_ALLOW_EMPTY_PASSWORD: "true"
+ MYSQL_DATABASE: fallen
+ ports:
+ - 3306:3306
diff --git a/tools/mig-test/conf/pipeline-route.yaml b/tools/mig-test/conf/pipeline-route.yaml
new file mode 100644
index 000000000..0054d4d18
--- /dev/null
+++ b/tools/mig-test/conf/pipeline-route.yaml
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: localhost
+ port: 3306
+ username: root
+ password: ""
+ tables: fallen.\.*
+ server-id: 5400-5500
+ server-time-zone: UTC
+
+sink:
+ type: values
+ name: Values Sink
+
+pipeline:
+ name: ${PIPELINE_NAME}
+ parallelism: 1
+
+transform:
+ - source-table: fallen.\.*
+ projection: \*, 'extras' AS EXTRAS
+
+route:
+ - source-table: fallen.\.*
+ sink-table: fallen.terminus
diff --git a/tools/mig-test/conf/pipeline.yaml b/tools/mig-test/conf/pipeline.yaml
new file mode 100644
index 000000000..290b899b5
--- /dev/null
+++ b/tools/mig-test/conf/pipeline.yaml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: localhost
+ port: 3306
+ username: root
+ password: ""
+ tables: fallen.\.*
+ server-id: 5400-5500
+ server-time-zone: UTC
+
+sink:
+ type: values
+ name: Values Sink
+
+pipeline:
+ name: ${PIPELINE_NAME}
+ parallelism: 1
diff --git a/tools/mig-test/datastream/README.md b/tools/mig-test/datastream/README.md
new file mode 100644
index 000000000..e4d639b01
--- /dev/null
+++ b/tools/mig-test/datastream/README.md
@@ -0,0 +1,11 @@
+# Flink CDC MigrationTestUtils
+
+## DataStream Jobs
+### Preparation
+
+1. Install Ruby (macOS has embedded it by default)
+2. (Optional) Run `gem install terminal-table` for better display
+
+### Compile DataStream Jobs
+3. Go to `tools/mig-test/datastream` and run `ruby compile_jobs.rb` to compile dummy DataStream jobs with specific version tags
+4. Then, run `ruby run_migration_test.rb` to start testing
\ No newline at end of file
diff --git a/tools/mig-test/datastream/compile_jobs.rb b/tools/mig-test/datastream/compile_jobs.rb
new file mode 100644
index 000000000..579c0ba60
--- /dev/null
+++ b/tools/mig-test/datastream/compile_jobs.rb
@@ -0,0 +1,26 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+JOB_VERSIONS = %w[2.4.2 3.0.0 3.0.1 3.1.0 3.1.1 3.2-SNAPSHOT]
+
+JOB_VERSIONS.each do |version|
+ puts "Compiling DataStream job for CDC #{version}"
+ `cd datastream-#{version} && mvn clean package -DskipTests`
+end
+
+puts 'Done'
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-2.4.2/.gitignore b/tools/mig-test/datastream/datastream-2.4.2/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-2.4.2/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-2.4.2/pom.xml b/tools/mig-test/datastream/datastream-2.4.2/pom.xml
new file mode 100644
index 000000000..fd3fbe0ca
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-2.4.2/pom.xml
@@ -0,0 +1,151 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ datastream-job
+ 2.4.2
+ jar
+
+
+ UTF-8
+ 1.17.1
+ 2.4.2
+ 1.9.7.Final
+ 2.12
+ 2.0.13
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 30.1.1-jre-16.1
+
+
+ com.ververica
+ flink-connector-debezium
+ ${flink.cdc.version}
+
+
+ com.ververica
+ flink-cdc-base
+ ${flink.cdc.version}
+
+
+ com.ververica
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-2.4.2/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-2.4.2/src/main/java/DataStreamJob.java
new file mode 100644
index 000000000..4e2a7a901
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-2.4.2/src/main/java/DataStreamJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class DataStreamJob {
+
+ public static void main(String[] args) {
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("localhost")
+ .port(3306)
+ .databaseList("fallen")
+ .tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
+ .startupOptions(StartupOptions.initial())
+ .username("root")
+ .password("")
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .serverTimeZone("UTC")
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
+ .uid("sql-source-uid")
+ .setParallelism(1)
+ .print()
+ .setParallelism(1);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // ... unfortunately
+ }
+ }
+}
diff --git a/tools/mig-test/datastream/datastream-3.0.0/.gitignore b/tools/mig-test/datastream/datastream-3.0.0/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.0.0/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.0.0/pom.xml b/tools/mig-test/datastream/datastream-3.0.0/pom.xml
new file mode 100644
index 000000000..5d8711a81
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.0.0/pom.xml
@@ -0,0 +1,151 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ datastream-job
+ 3.0.0
+ jar
+
+
+ UTF-8
+ 1.18.1
+ 3.0.0
+ 1.9.7.Final
+ 2.12
+ 2.0.13
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-17.0
+
+
+ com.ververica
+ flink-connector-debezium
+ ${flink.cdc.version}
+
+
+ com.ververica
+ flink-cdc-base
+ ${flink.cdc.version}
+
+
+ com.ververica
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.0.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.0.0/src/main/java/DataStreamJob.java
new file mode 100644
index 000000000..4e2a7a901
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.0.0/src/main/java/DataStreamJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class DataStreamJob {
+
+ public static void main(String[] args) {
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("localhost")
+ .port(3306)
+ .databaseList("fallen")
+ .tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
+ .startupOptions(StartupOptions.initial())
+ .username("root")
+ .password("")
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .serverTimeZone("UTC")
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
+ .uid("sql-source-uid")
+ .setParallelism(1)
+ .print()
+ .setParallelism(1);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // ... unfortunately
+ }
+ }
+}
diff --git a/tools/mig-test/datastream/datastream-3.0.1/.gitignore b/tools/mig-test/datastream/datastream-3.0.1/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.0.1/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.0.1/pom.xml b/tools/mig-test/datastream/datastream-3.0.1/pom.xml
new file mode 100644
index 000000000..4044fd661
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.0.1/pom.xml
@@ -0,0 +1,151 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ datastream-job
+ 3.0.1
+ jar
+
+
+ UTF-8
+ 1.18.1
+ 3.0.1
+ 1.9.7.Final
+ 2.12
+ 2.0.13
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-17.0
+
+
+ com.ververica
+ flink-connector-debezium
+ ${flink.cdc.version}
+
+
+ com.ververica
+ flink-cdc-base
+ ${flink.cdc.version}
+
+
+ com.ververica
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.0.1/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.0.1/src/main/java/DataStreamJob.java
new file mode 100644
index 000000000..4e2a7a901
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.0.1/src/main/java/DataStreamJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class DataStreamJob {
+
+ public static void main(String[] args) {
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("localhost")
+ .port(3306)
+ .databaseList("fallen")
+ .tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
+ .startupOptions(StartupOptions.initial())
+ .username("root")
+ .password("")
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .serverTimeZone("UTC")
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
+ .uid("sql-source-uid")
+ .setParallelism(1)
+ .print()
+ .setParallelism(1);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // ... unfortunately
+ }
+ }
+}
diff --git a/tools/mig-test/datastream/datastream-3.1.0/.gitignore b/tools/mig-test/datastream/datastream-3.1.0/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.1.0/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.1.0/pom.xml b/tools/mig-test/datastream/datastream-3.1.0/pom.xml
new file mode 100644
index 000000000..6c927b999
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.1.0/pom.xml
@@ -0,0 +1,151 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ datastream-job
+ 3.1.0
+ jar
+
+
+ UTF-8
+ 1.18.1
+ 3.1.0
+ 1.9.7.Final
+ 2.12
+ 2.0.13
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-17.0
+
+
+ org.apache.flink
+ flink-connector-debezium
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-cdc-base
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.1.0/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.1.0/src/main/java/DataStreamJob.java
new file mode 100644
index 000000000..f821ac0a2
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.1.0/src/main/java/DataStreamJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class DataStreamJob {
+
+ public static void main(String[] args) {
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("localhost")
+ .port(3306)
+ .databaseList("fallen")
+ .tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
+ .startupOptions(StartupOptions.initial())
+ .username("root")
+ .password("")
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .serverTimeZone("UTC")
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
+ .uid("sql-source-uid")
+ .setParallelism(1)
+ .print()
+ .setParallelism(1);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // ... unfortunately
+ }
+ }
+}
diff --git a/tools/mig-test/datastream/datastream-3.1.1/.gitignore b/tools/mig-test/datastream/datastream-3.1.1/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.1.1/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.1.1/pom.xml b/tools/mig-test/datastream/datastream-3.1.1/pom.xml
new file mode 100644
index 000000000..d8f6f88d9
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.1.1/pom.xml
@@ -0,0 +1,151 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ datastream-job
+ 3.1.1
+ jar
+
+
+ UTF-8
+ 1.18.1
+ 3.1.1
+ 1.9.7.Final
+ 2.12
+ 2.0.13
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-17.0
+
+
+ org.apache.flink
+ flink-connector-debezium
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-cdc-base
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.1.1/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.1.1/src/main/java/DataStreamJob.java
new file mode 100644
index 000000000..f821ac0a2
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.1.1/src/main/java/DataStreamJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class DataStreamJob {
+
+ public static void main(String[] args) {
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("localhost")
+ .port(3306)
+ .databaseList("fallen")
+ .tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
+ .startupOptions(StartupOptions.initial())
+ .username("root")
+ .password("")
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .serverTimeZone("UTC")
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
+ .uid("sql-source-uid")
+ .setParallelism(1)
+ .print()
+ .setParallelism(1);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // ... unfortunately
+ }
+ }
+}
diff --git a/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/.gitignore b/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/pom.xml b/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/pom.xml
new file mode 100644
index 000000000..e9d9ceb8a
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/pom.xml
@@ -0,0 +1,151 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ datastream-job
+ 3.2-SNAPSHOT
+ jar
+
+
+ UTF-8
+ 1.18.1
+ 3.2-SNAPSHOT
+ 1.9.7.Final
+ 2.12
+ 2.0.13
+ 8
+ 8
+ UTF-8
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-17.0
+
+
+ org.apache.flink
+ flink-connector-debezium
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-cdc-base
+ ${flink.cdc.version}
+
+
+ org.apache.flink
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ io.debezium
+ debezium-connector-mysql
+ ${debezium.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-simple
+ ${slf4j.version}
+
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ package
+
+ single
+
+
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/src/main/java/DataStreamJob.java b/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/src/main/java/DataStreamJob.java
new file mode 100644
index 000000000..f821ac0a2
--- /dev/null
+++ b/tools/mig-test/datastream/datastream-3.2-SNAPSHOT/src/main/java/DataStreamJob.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class DataStreamJob {
+
+ public static void main(String[] args) {
+ MySqlSource mySqlSource = MySqlSource.builder()
+ .hostname("localhost")
+ .port(3306)
+ .databaseList("fallen")
+ .tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
+ .startupOptions(StartupOptions.initial())
+ .username("root")
+ .password("")
+ .deserializer(new JsonDebeziumDeserializationSchema())
+ .serverTimeZone("UTC")
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(3000);
+
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
+ .uid("sql-source-uid")
+ .setParallelism(1)
+ .print()
+ .setParallelism(1);
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ // ... unfortunately
+ }
+ }
+}
diff --git a/tools/mig-test/datastream/run_migration_test.rb b/tools/mig-test/datastream/run_migration_test.rb
new file mode 100644
index 000000000..a02b79b50
--- /dev/null
+++ b/tools/mig-test/datastream/run_migration_test.rb
@@ -0,0 +1,138 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'pathname'
+require 'securerandom'
+
+WAITING_SECONDS = 20
+FLINK_HOME = ENV['FLINK_HOME']
+throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil?
+FLINK_HOME = Pathname.new(FLINK_HOME).realpath
+
+SOURCE_PORT = 3306
+DATABASE_NAME = 'fallen'
+TABLES = ['girl'].freeze
+
+def exec_sql_source(sql)
+ `mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"`
+end
+
+def put_mystery_data(mystery)
+ exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
+end
+
+def ensure_mystery_data(mystery)
+ throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
+end
+
+puts ' Waiting for source to start up...'
+next until exec_sql_source("SELECT '1';") == "1\n1\n"
+
+def test_migration_chore(from_version, to_version)
+ TABLES.each do |table_name|
+ exec_sql_source("DROP TABLE IF EXISTS #{table_name};")
+ exec_sql_source("CREATE TABLE #{table_name} (ID INT NOT NULL, NAME VARCHAR(17), PRIMARY KEY (ID));")
+ end
+
+ # Clear previous savepoints and logs
+ `rm -rf savepoints`
+
+ old_job_id = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}-jar-with-dependencies.jar`.split.last
+ raise StandardError, 'Failed to submit Flink job' unless old_job_id.length == 32
+
+ puts "Submitted job at #{from_version} as #{old_job_id}"
+
+ random_string_1 = SecureRandom.hex(8)
+ put_mystery_data random_string_1
+ sleep WAITING_SECONDS
+ ensure_mystery_data random_string_1
+
+ puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{old_job_id}`
+ savepoint_file = `ls savepoints`.split("\n").last
+ new_job_id = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}-jar-with-dependencies.jar`.split.last
+ raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
+
+ puts "Submitted job at #{to_version} as #{new_job_id}"
+ random_string_2 = SecureRandom.hex(8)
+ put_mystery_data random_string_2
+ sleep WAITING_SECONDS
+ ensure_mystery_data random_string_2
+ puts `#{FLINK_HOME}/bin/flink cancel #{new_job_id}`
+ true
+end
+
+def test_migration(from_version, to_version)
+ puts "➡️ [MIGRATION] Testing migration from #{from_version} to #{to_version}..."
+ puts " with Flink #{FLINK_HOME}..."
+ begin
+ result = test_migration_chore from_version, to_version
+ if result
+ puts "✅ [MIGRATION] Successfully migrated from #{from_version} to #{to_version}!"
+ else
+ puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}..."
+ end
+ result
+ rescue => e
+ puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}...", e
+ false
+ end
+end
+
+version_list = %w[2.4.2 3.0.0 3.0.1 3.1.0 3.1.1 3.2-SNAPSHOT]
+version_result = Hash.new('❓')
+@failures = []
+
+version_list.each_with_index do |old_version, old_index|
+ puts 'Restarting cluster...'
+ `#{FLINK_HOME}/bin/stop-cluster.sh`
+ `rm -rf #{FLINK_HOME}/log/flink-*.out`
+ `#{FLINK_HOME}/bin/start-cluster.sh`
+ version_list.each_with_index do |new_version, new_index|
+ next if old_index > new_index
+
+ result = test_migration old_version, new_version
+ version_result[old_version + new_version] = result ? '✅' : '❌'
+ @failures << [old_version, new_version] unless result
+ end
+end
+
+printable_result = []
+printable_result << [''] + version_list
+version_list.each_with_index do |old_version, old_index|
+ table_line = [old_version]
+ version_list.each_with_index do |new_version, new_index|
+ table_line << if old_index > new_index
+ ''
+ else
+ version_result[old_version + new_version]
+ end
+ end
+ printable_result << table_line
+end
+
+begin
+ require 'terminal-table'
+ puts Terminal::Table.new rows: printable_result, title: 'Migration Test Result'
+rescue LoadError
+ puts 'Test summary: ', printable_result
+end
+puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`"
+
+if @failures.filter { |_, new_version| new_version == version_list.last }.any?
+ abort 'Some migration to snapshot version tests failed.'
+end
diff --git a/tools/mig-test/misc/patch_flink_conf.rb b/tools/mig-test/misc/patch_flink_conf.rb
new file mode 100644
index 000000000..fe6030188
--- /dev/null
+++ b/tools/mig-test/misc/patch_flink_conf.rb
@@ -0,0 +1,32 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FLINK_HOME = ENV['FLINK_HOME']
+throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil?
+
+EXTRA_CONF = <<~EXTRACONF
+
+taskmanager.numberOfTaskSlots: 10
+parallelism.default: 4
+execution.checkpointing.interval: 300
+EXTRACONF
+
+File.write("#{FLINK_HOME}/conf/flink-conf.yaml", EXTRA_CONF, mode: 'a+')
+
+# MySQL connector is not provided
+`wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar -O #{FLINK_HOME}/lib/mysql-connector-java-8.0.27.jar`
\ No newline at end of file
diff --git a/tools/mig-test/prepare_libs.rb b/tools/mig-test/prepare_libs.rb
new file mode 100644
index 000000000..aefb8338c
--- /dev/null
+++ b/tools/mig-test/prepare_libs.rb
@@ -0,0 +1,123 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CDC_SOURCE_HOME = ENV['CDC_SOURCE_HOME']
+throw 'Unspecified `CDC_SOURCE_HOME` environment variable.' if CDC_SOURCE_HOME.nil?
+
+Dir.chdir(__dir__)
+
+RELEASED_VERSIONS = {
+ '3.0.0': {
+ tar: 'https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz',
+ connectors: %w[
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-values/3.0.0/flink-cdc-pipeline-connector-values-3.0.0.jar
+ ]
+ },
+ '3.0.1': {
+ tar: 'https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.1/flink-cdc-3.0.1-bin.tar.gz',
+ connectors: %w[
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.1/flink-cdc-pipeline-connector-doris-3.0.1.jar
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.1/flink-cdc-pipeline-connector-mysql-3.0.1.jar
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.1/flink-cdc-pipeline-connector-starrocks-3.0.1.jar
+ https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-values/3.0.1/flink-cdc-pipeline-connector-values-3.0.1.jar
+ ]
+ },
+ '3.1.0': {
+ tar: 'https://dlcdn.apache.org/flink/flink-cdc-3.1.0/flink-cdc-3.1.0-bin.tar.gz',
+ connectors: %w[
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.0/flink-cdc-pipeline-connector-mysql-3.1.0.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.0/flink-cdc-pipeline-connector-doris-3.1.0.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.1.0/flink-cdc-pipeline-connector-starrocks-3.1.0.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.1.0/flink-cdc-pipeline-connector-kafka-3.1.0.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.1.0/flink-cdc-pipeline-connector-paimon-3.1.0.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-values/3.1.0/flink-cdc-pipeline-connector-values-3.1.0.jar
+ ]
+ },
+ '3.1.1': {
+ tar: 'https://dlcdn.apache.org/flink/flink-cdc-3.1.1/flink-cdc-3.1.1-bin.tar.gz',
+ connectors: %w[
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/flink-cdc-pipeline-connector-mysql-3.1.1.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.1/flink-cdc-pipeline-connector-doris-3.1.1.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.1.1/flink-cdc-pipeline-connector-starrocks-3.1.1.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.1.1/flink-cdc-pipeline-connector-kafka-3.1.1.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.1.1/flink-cdc-pipeline-connector-paimon-3.1.1.jar
+ https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-values/3.1.1/flink-cdc-pipeline-connector-values-3.1.1.jar
+ ]
+ }
+}.freeze
+
+HEAD_VERSION = '3.2-SNAPSHOT'
+
+def download_or_get(link)
+ `mkdir -p cache`
+ file_name = "cache/#{File.basename(link)}"
+ if File.exist? file_name
+ puts "#{file_name} exists, skip download"
+ return file_name
+ end
+ `wget #{link} -O #{file_name}`
+ file_name
+end
+
+M2_REPO = '~/.m2/repository/org/apache/flink'
+FILES = %w[
+ dist
+ pipeline-connector-kafka
+ pipeline-connector-mysql
+ pipeline-connector-doris
+ pipeline-connector-paimon
+ pipeline-connector-starrocks
+ pipeline-connector-values
+].freeze
+def download_released
+ `rm -rf cdc-versions`
+ RELEASED_VERSIONS.each do |version, links|
+ `mkdir -p cdc-versions/#{version}`
+ file_name = download_or_get(links[:tar])
+ `tar --strip-components=1 -xzvf #{file_name} -C cdc-versions/#{version}`
+ links[:connectors].each do |link|
+ jar_file_name = download_or_get(link)
+ `cp #{jar_file_name} cdc-versions/#{version}/lib/`
+ end
+ end
+end
+
+def compile_snapshot(version)
+ puts "Trying to create #{version}"
+ `mkdir -p cdc-versions/#{version}/lib`
+ `cp -r #{CDC_SOURCE_HOME}/flink-cdc-dist/src/main/flink-cdc-bin/* cdc-versions/#{version}/`
+
+ puts 'Compiling snapshot version...'
+ puts `cd #{CDC_SOURCE_HOME} && mvn clean install -DskipTests`
+
+ FILES.each do |lib|
+ if lib == 'dist'
+ `cp #{CDC_SOURCE_HOME}/flink-cdc-#{lib}/target/flink-cdc-#{lib}-#{version}.jar cdc-versions/#{version}/lib/`
+ else
+ `cp #{CDC_SOURCE_HOME}/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-#{lib}/target/flink-cdc-#{lib}-#{version}.jar cdc-versions/#{version}/lib/`
+ end
+ end
+end
+
+download_released
+compile_snapshot HEAD_VERSION
+
+puts 'Done'
diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb
new file mode 100644
index 000000000..5bb31cf30
--- /dev/null
+++ b/tools/mig-test/run_migration_test.rb
@@ -0,0 +1,162 @@
+#!/usr/bin/env ruby
+# frozen_string_literal: true
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'pathname'
+require 'securerandom'
+
+WAITING_SECONDS = 20
+FLINK_HOME = ENV['FLINK_HOME']
+throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil?
+FLINK_HOME = Pathname.new(FLINK_HOME).realpath
+
+SOURCE_PORT = 3306
+DATABASE_NAME = 'fallen'
+TABLES = ['girl'].freeze
+
+def exec_sql_source(sql)
+ `mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"`
+end
+
+def put_mystery_data(mystery)
+ exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
+end
+
+def ensure_mystery_data(mystery)
+ throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
+end
+
+puts ' Waiting for source to start up...'
+next until exec_sql_source("SELECT '1';") == "1\n1\n"
+
+def test_migration_chore(from_version, to_version)
+ TABLES.each do |table_name|
+ exec_sql_source("DROP TABLE IF EXISTS #{table_name};")
+ exec_sql_source("CREATE TABLE #{table_name} (ID INT NOT NULL, NAME VARCHAR(17), PRIMARY KEY (ID));")
+ end
+
+ # Clear previous savepoints and logs
+ `rm -rf savepoints`
+
+ # Prepare for current YAML file
+ test_route = !%w[3.0.0 3.0.1].include?(from_version)
+ yaml_job_template_file = test_route ? 'conf/pipeline-route.yaml' : 'conf/pipeline.yaml'
+ yaml_job_file = 'conf/temp.yaml'
+ yaml_content = File.open(yaml_job_template_file).read.gsub('${PIPELINE_NAME}',
+ "Pipeline Migration Job (#{from_version} -> #{to_version})")
+ File.write(yaml_job_file, yaml_content)
+
+ # Submit current pipeline job file
+ submit_job_output = `bash ./cdc-versions/#{from_version}/bin/flink-cdc.sh --flink-home #{FLINK_HOME} #{yaml_job_file}`
+ puts " #{submit_job_output}"
+ current_job_id = submit_job_output.split("\n")[1].split.last
+ raise StandardError, 'Failed to submit Flink job' unless current_job_id.length == 32
+
+ puts " Current Job ID: #{current_job_id}"
+
+ # Verify if data sync works
+ random_string_1 = SecureRandom.hex(8)
+ put_mystery_data random_string_1
+ sleep WAITING_SECONDS
+ ensure_mystery_data random_string_1
+
+ # Stop current job and create a savepoint
+ puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{current_job_id}`
+ savepoint_file = `ls savepoints`.split("\n").last
+
+ # Migrate to a newer CDC version
+ puts " Submitting CDC jobs at #{to_version}..."
+ submit_job_output = `bash ./cdc-versions/#{to_version}/bin/flink-cdc.sh --from-savepoint #{Dir.pwd}/savepoints/#{savepoint_file} --allow-nonRestored-state --flink-home #{FLINK_HOME} #{yaml_job_file}`
+ puts " #{submit_job_output}"
+ new_job_id = submit_job_output.split("\n")[1].split.last
+ raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
+
+ puts " Upgraded Job ID: #{new_job_id}"
+
+ # Verify if data sync works
+ puts "Submitted job at #{to_version} as #{new_job_id}"
+ random_string_2 = SecureRandom.hex(8)
+ put_mystery_data random_string_2
+ sleep WAITING_SECONDS
+ ensure_mystery_data random_string_2
+ puts `#{FLINK_HOME}/bin/flink cancel #{new_job_id}`
+ true
+end
+
+def test_migration(from_version, to_version)
+ puts "➡️ [MIGRATION] Testing migration from #{from_version} to #{to_version}..."
+ puts " with Flink #{FLINK_HOME}..."
+ begin
+ result = test_migration_chore from_version, to_version
+ if result
+ puts "✅ [MIGRATION] Successfully migrated from #{from_version} to #{to_version}!"
+ else
+ puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}..."
+ end
+ result
+ rescue => e
+ puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}...", e
+ false
+ end
+end
+
+version_list = %w[3.0.0 3.0.1 3.1.0 3.1.1 3.2-SNAPSHOT]
+no_savepoint_versions = %w[3.0.0 3.0.1]
+version_result = Hash.new('❓')
+@failures = []
+
+version_list.each_with_index do |old_version, old_index|
+ puts 'Restarting cluster...'
+ `#{FLINK_HOME}/bin/stop-cluster.sh`
+ puts 'Stopped cluster.'
+ `#{FLINK_HOME}/bin/start-cluster.sh`
+ puts 'Started cluster.'
+ version_list.each_with_index do |new_version, new_index|
+ next if old_index > new_index
+ next if no_savepoint_versions.include? new_version
+
+ result = test_migration old_version, new_version
+ version_result[old_version + new_version] = result ? '✅' : '❌'
+ @failures << [old_version, new_version] unless result
+ end
+end
+
+printable_result = []
+printable_result << [''] + version_list
+version_list.each_with_index do |old_version, old_index|
+ table_line = [old_version]
+ version_list.each_with_index do |new_version, new_index|
+ table_line << if old_index > new_index
+ ''
+ else
+ version_result[old_version + new_version]
+ end
+ end
+ printable_result << table_line
+end
+
+begin
+ require 'terminal-table'
+ puts Terminal::Table.new rows: printable_result, title: 'Migration Test Result'
+rescue LoadError
+ puts 'Test summary: ', printable_result
+end
+puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`"
+
+if @failures.filter { |old_version, new_version| new_version == version_list.last && old_version != '3.1.0' }.any?
+ abort 'Some migration to snapshot version tests failed.'
+end