[FLINK-35736][test] Add migration test scripts & CI workflows

This closes #3447
pull/3501/head
yuxiqian 6 months ago committed by GitHub
parent b15a226ccd
commit 5917f78704
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -115,19 +115,8 @@ jobs:
- name: Run license check
run: gem install rubyzip -v 2.3.0 && ./tools/ci/license_check.rb
migration_test:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify
compile_and_test:
needs: license_check
# Only run the CI pipeline for the flink-cdc-connectors repository
# if: github.repository == 'apache/flink-cdc-connectors'
runs-on: ubuntu-latest
@ -263,3 +252,100 @@ jobs:
done
fi
exit 0
migration_test_ut:
needs: license_check
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify
pipeline_migration_test:
needs: migration_test_ut
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ '8', '11' ]
steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: maven
- name: Install dependencies
run: gem install terminal-table
- name: Prepare CDC versions
run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
- name: Prepare Flink distro
run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
working-directory: ./tools/mig-test
- name: Patch Flink configs
run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
working-directory: ./tools/mig-test
- name: Start containers
run: cd conf && docker-compose up -d
working-directory: ./tools/mig-test
- name: Run migration tests
run: FLINK_HOME=./flink-1.18.1/ ruby run_migration_test.rb
working-directory: ./tools/mig-test
- name: Stop containers
if: always()
run: cd conf && docker-compose down
working-directory: ./tools/mig-test
data_stream_migration_test:
needs: migration_test_ut
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ '8', '11' ]
steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.0
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: maven
- name: Install dependencies
run: gem install terminal-table
- name: Prepare CDC versions
run: CDC_SOURCE_HOME=$PWD ruby tools/mig-test/prepare_libs.rb
- name: Prepare Flink distro
run: wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz && tar -xzvf flink-1.18.1-bin-scala_2.12.tgz
working-directory: ./tools/mig-test
- name: Patch Flink configs
run: FLINK_HOME=./flink-1.18.1/ ruby misc/patch_flink_conf.rb
working-directory: ./tools/mig-test
- name: Compile Dummy DataStream Jobs
run: cd datastream && ruby compile_jobs.rb
working-directory: ./tools/mig-test
- name: Start containers
run: cd conf && docker-compose up -d
working-directory: ./tools/mig-test
- name: Run migration tests
run: cd datastream && FLINK_HOME=../flink-1.18.1/ ruby run_migration_test.rb
working-directory: ./tools/mig-test
- name: Stop containers
if: always()
run: cd conf && docker-compose down
working-directory: ./tools/mig-test

@ -0,0 +1,8 @@
*.sql
savepoints/**
cdc-versions/**
cache/**
.idea/**
Gemfile.lock
/logs/
conf/temp.yaml

@ -0,0 +1,38 @@
# Flink CDC Migration Test Utilities
## Pipeline Jobs
### Preparation
1. Install Ruby (macOS has embedded it by default)
2. (Optional) Run `gem install terminal-table` for better display
### Compile snapshot CDC versions
3. Set `CDC_SOURCE_HOME` to the root directory of the Flink CDC git repository
4. Go to `tools/mig-test` and run `ruby prepare_libs.rb` to download released / compile snapshot CDC versions
### Run migration tests
5. Enter `conf/` and run `docker compose up -d` to start up test containers
6. Set `FLINK_HOME` to the home directory of Flink
7. Go back to `tools/mig-test` and run `ruby run_migration_test.rb` to start testing
### Result
The migration result will be displayed in the console like this:
```
+--------------------------------------------------------------------+
| Migration Test Result |
+--------------+-------+-------+-------+--------------+--------------+
| | 3.0.0 | 3.0.1 | 3.1.0 | 3.1-SNAPSHOT | 3.2-SNAPSHOT |
| 3.0.0 | ❓ | ❓ | ❌ | ✅ | ✅ |
| 3.0.1 | | ❓ | ❌ | ✅ | ✅ |
| 3.1.0 | | | ✅ | ❌ | ❌ |
| 3.1-SNAPSHOT | | | | ✅ | ✅ |
| 3.2-SNAPSHOT | | | | | ✅ |
+--------------+-------+-------+-------+--------------+--------------+
```
> ✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`
## DataStream Jobs
See `datastream/README.md`.

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '3'
services:
mysql:
image: mysql:8.0
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: "true"
MYSQL_DATABASE: fallen
ports:
- 3306:3306

@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: root
password: ""
tables: fallen.\.*
server-id: 5400-5500
server-time-zone: UTC
sink:
type: values
name: Values Sink
pipeline:
name: ${PIPELINE_NAME}
parallelism: 1
transform:
- source-table: fallen.\.*
projection: \*, 'extras' AS EXTRAS
route:
- source-table: fallen.\.*
sink-table: fallen.terminus

@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: root
password: ""
tables: fallen.\.*
server-id: 5400-5500
server-time-zone: UTC
sink:
type: values
name: Values Sink
pipeline:
name: ${PIPELINE_NAME}
parallelism: 1

@ -0,0 +1,11 @@
# Flink CDC MigrationTestUtils
## DataStream Jobs
### Preparation
1. Install Ruby (macOS has embedded it by default)
2. (Optional) Run `gem install terminal-table` for better display
### Compile DataStream Jobs
3. Go to `tools/mig-test/datastream` and run `ruby compile_jobs.rb` to compile dummy DataStream jobs with specific version tags
4. Then, run `ruby run_migration_test.rb` to start testing

@ -0,0 +1,26 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
JOB_VERSIONS = %w[2.4.2 3.0.0 3.0.1 3.1.0 3.1.1 3.2-SNAPSHOT]
JOB_VERSIONS.each do |version|
puts "Compiling DataStream job for CDC #{version}"
`cd datastream-#{version} && mvn clean package -DskipTests`
end
puts 'Done'

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>datastream-job</artifactId>
<version>2.4.2</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<flink.cdc.version>2.4.2</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.13</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-16.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- any other plugins -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("fallen")
.tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
.startupOptions(StartupOptions.initial())
.username("root")
.password("")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.uid("sql-source-uid")
.setParallelism(1)
.print()
.setParallelism(1);
try {
env.execute();
} catch (Exception e) {
// ... unfortunately
}
}
}

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>datastream-job</artifactId>
<version>3.0.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.0.0</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.13</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- any other plugins -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("fallen")
.tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
.startupOptions(StartupOptions.initial())
.username("root")
.password("")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.uid("sql-source-uid")
.setParallelism(1)
.print()
.setParallelism(1);
try {
env.execute();
} catch (Exception e) {
// ... unfortunately
}
}
}

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>datastream-job</artifactId>
<version>3.0.1</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.0.1</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.13</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- any other plugins -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("fallen")
.tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
.startupOptions(StartupOptions.initial())
.username("root")
.password("")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.uid("sql-source-uid")
.setParallelism(1)
.print()
.setParallelism(1);
try {
env.execute();
} catch (Exception e) {
// ... unfortunately
}
}
}

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>datastream-job</artifactId>
<version>3.1.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.1.0</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.13</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- any other plugins -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("fallen")
.tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
.startupOptions(StartupOptions.initial())
.username("root")
.password("")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.uid("sql-source-uid")
.setParallelism(1)
.print()
.setParallelism(1);
try {
env.execute();
} catch (Exception e) {
// ... unfortunately
}
}
}

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>datastream-job</artifactId>
<version>3.1.1</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.1.1</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.13</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- any other plugins -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("fallen")
.tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
.startupOptions(StartupOptions.initial())
.username("root")
.password("")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.uid("sql-source-uid")
.setParallelism(1)
.print()
.setParallelism(1);
try {
env.execute();
} catch (Exception e) {
// ... unfortunately
}
}
}

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

@ -0,0 +1,151 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>datastream-job</artifactId>
<version>3.2-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.1</flink.version>
<flink.cdc.version>3.2-SNAPSHOT</flink.cdc.version>
<debezium.version>1.9.7.Final</debezium.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>2.0.13</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Checked the dependencies of the Flink project and below is a feasible reference. -->
<!-- Use flink shaded guava 18.0-13.0 for flink 1.13 -->
<!-- Use flink shaded guava 30.1.1-jre-14.0 for flink-1.14 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.15 -->
<!-- Use flink shaded guava 30.1.1-jre-15.0 for flink-1.16 -->
<!-- Use flink shaded guava 30.1.1-jre-16.1 for flink-1.17 -->
<!-- Use flink shaded guava 31.1-jre-17.0 for flink-1.18 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-debezium</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- any other plugins -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamJob {
public static void main(String[] args) {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("fallen")
.tableList("fallen.angel", "fallen.gabriel", "fallen.girl")
.startupOptions(StartupOptions.initial())
.username("root")
.password("")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.uid("sql-source-uid")
.setParallelism(1)
.print()
.setParallelism(1);
try {
env.execute();
} catch (Exception e) {
// ... unfortunately
}
}
}

@ -0,0 +1,138 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'pathname'
require 'securerandom'
WAITING_SECONDS = 20
FLINK_HOME = ENV['FLINK_HOME']
throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil?
FLINK_HOME = Pathname.new(FLINK_HOME).realpath
SOURCE_PORT = 3306
DATABASE_NAME = 'fallen'
TABLES = ['girl'].freeze
def exec_sql_source(sql)
`mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"`
end
def put_mystery_data(mystery)
exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
end
def ensure_mystery_data(mystery)
throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end
puts ' Waiting for source to start up...'
next until exec_sql_source("SELECT '1';") == "1\n1\n"
def test_migration_chore(from_version, to_version)
TABLES.each do |table_name|
exec_sql_source("DROP TABLE IF EXISTS #{table_name};")
exec_sql_source("CREATE TABLE #{table_name} (ID INT NOT NULL, NAME VARCHAR(17), PRIMARY KEY (ID));")
end
# Clear previous savepoints and logs
`rm -rf savepoints`
old_job_id = `#{FLINK_HOME}/bin/flink run -p 1 -c DataStreamJob --detached datastream-#{from_version}/target/datastream-job-#{from_version}-jar-with-dependencies.jar`.split.last
raise StandardError, 'Failed to submit Flink job' unless old_job_id.length == 32
puts "Submitted job at #{from_version} as #{old_job_id}"
random_string_1 = SecureRandom.hex(8)
put_mystery_data random_string_1
sleep WAITING_SECONDS
ensure_mystery_data random_string_1
puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{old_job_id}`
savepoint_file = `ls savepoints`.split("\n").last
new_job_id = `#{FLINK_HOME}/bin/flink run --fromSavepoint #{Dir.pwd}/savepoints/#{savepoint_file} -p 1 -c DataStreamJob --detached datastream-#{to_version}/target/datastream-job-#{to_version}-jar-with-dependencies.jar`.split.last
raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
puts "Submitted job at #{to_version} as #{new_job_id}"
random_string_2 = SecureRandom.hex(8)
put_mystery_data random_string_2
sleep WAITING_SECONDS
ensure_mystery_data random_string_2
puts `#{FLINK_HOME}/bin/flink cancel #{new_job_id}`
true
end
def test_migration(from_version, to_version)
puts "➡️ [MIGRATION] Testing migration from #{from_version} to #{to_version}..."
puts " with Flink #{FLINK_HOME}..."
begin
result = test_migration_chore from_version, to_version
if result
puts "✅ [MIGRATION] Successfully migrated from #{from_version} to #{to_version}!"
else
puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}..."
end
result
rescue => e
puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}...", e
false
end
end
version_list = %w[2.4.2 3.0.0 3.0.1 3.1.0 3.1.1 3.2-SNAPSHOT]
version_result = Hash.new('❓')
@failures = []
version_list.each_with_index do |old_version, old_index|
puts 'Restarting cluster...'
`#{FLINK_HOME}/bin/stop-cluster.sh`
`rm -rf #{FLINK_HOME}/log/flink-*.out`
`#{FLINK_HOME}/bin/start-cluster.sh`
version_list.each_with_index do |new_version, new_index|
next if old_index > new_index
result = test_migration old_version, new_version
version_result[old_version + new_version] = result ? '✅' : '❌'
@failures << [old_version, new_version] unless result
end
end
printable_result = []
printable_result << [''] + version_list
version_list.each_with_index do |old_version, old_index|
table_line = [old_version]
version_list.each_with_index do |new_version, new_index|
table_line << if old_index > new_index
''
else
version_result[old_version + new_version]
end
end
printable_result << table_line
end
begin
require 'terminal-table'
puts Terminal::Table.new rows: printable_result, title: 'Migration Test Result'
rescue LoadError
puts 'Test summary: ', printable_result
end
puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`"
if @failures.filter { |_, new_version| new_version == version_list.last }.any?
abort 'Some migration to snapshot version tests failed.'
end

@ -0,0 +1,32 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FLINK_HOME = ENV['FLINK_HOME']
throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil?
EXTRA_CONF = <<~EXTRACONF
taskmanager.numberOfTaskSlots: 10
parallelism.default: 4
execution.checkpointing.interval: 300
EXTRACONF
File.write("#{FLINK_HOME}/conf/flink-conf.yaml", EXTRA_CONF, mode: 'a+')
# MySQL connector is not provided
`wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar -O #{FLINK_HOME}/lib/mysql-connector-java-8.0.27.jar`

@ -0,0 +1,123 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CDC_SOURCE_HOME = ENV['CDC_SOURCE_HOME']
throw 'Unspecified `CDC_SOURCE_HOME` environment variable.' if CDC_SOURCE_HOME.nil?
Dir.chdir(__dir__)
RELEASED_VERSIONS = {
'3.0.0': {
tar: 'https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-cdc-3.0.0-bin.tar.gz',
connectors: %w[
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.0/flink-cdc-pipeline-connector-doris-3.0.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.0/flink-cdc-pipeline-connector-mysql-3.0.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.0/flink-cdc-pipeline-connector-starrocks-3.0.0.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-values/3.0.0/flink-cdc-pipeline-connector-values-3.0.0.jar
]
},
'3.0.1': {
tar: 'https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.1/flink-cdc-3.0.1-bin.tar.gz',
connectors: %w[
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-doris/3.0.1/flink-cdc-pipeline-connector-doris-3.0.1.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-mysql/3.0.1/flink-cdc-pipeline-connector-mysql-3.0.1.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-starrocks/3.0.1/flink-cdc-pipeline-connector-starrocks-3.0.1.jar
https://repo1.maven.org/maven2/com/ververica/flink-cdc-pipeline-connector-values/3.0.1/flink-cdc-pipeline-connector-values-3.0.1.jar
]
},
'3.1.0': {
tar: 'https://dlcdn.apache.org/flink/flink-cdc-3.1.0/flink-cdc-3.1.0-bin.tar.gz',
connectors: %w[
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.0/flink-cdc-pipeline-connector-mysql-3.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.0/flink-cdc-pipeline-connector-doris-3.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.1.0/flink-cdc-pipeline-connector-starrocks-3.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.1.0/flink-cdc-pipeline-connector-kafka-3.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.1.0/flink-cdc-pipeline-connector-paimon-3.1.0.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-values/3.1.0/flink-cdc-pipeline-connector-values-3.1.0.jar
]
},
'3.1.1': {
tar: 'https://dlcdn.apache.org/flink/flink-cdc-3.1.1/flink-cdc-3.1.1-bin.tar.gz',
connectors: %w[
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/flink-cdc-pipeline-connector-mysql-3.1.1.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.1.1/flink-cdc-pipeline-connector-doris-3.1.1.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.1.1/flink-cdc-pipeline-connector-starrocks-3.1.1.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-kafka/3.1.1/flink-cdc-pipeline-connector-kafka-3.1.1.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-paimon/3.1.1/flink-cdc-pipeline-connector-paimon-3.1.1.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-values/3.1.1/flink-cdc-pipeline-connector-values-3.1.1.jar
]
}
}.freeze
HEAD_VERSION = '3.2-SNAPSHOT'
def download_or_get(link)
`mkdir -p cache`
file_name = "cache/#{File.basename(link)}"
if File.exist? file_name
puts "#{file_name} exists, skip download"
return file_name
end
`wget #{link} -O #{file_name}`
file_name
end
M2_REPO = '~/.m2/repository/org/apache/flink'
FILES = %w[
dist
pipeline-connector-kafka
pipeline-connector-mysql
pipeline-connector-doris
pipeline-connector-paimon
pipeline-connector-starrocks
pipeline-connector-values
].freeze
def download_released
`rm -rf cdc-versions`
RELEASED_VERSIONS.each do |version, links|
`mkdir -p cdc-versions/#{version}`
file_name = download_or_get(links[:tar])
`tar --strip-components=1 -xzvf #{file_name} -C cdc-versions/#{version}`
links[:connectors].each do |link|
jar_file_name = download_or_get(link)
`cp #{jar_file_name} cdc-versions/#{version}/lib/`
end
end
end
def compile_snapshot(version)
puts "Trying to create #{version}"
`mkdir -p cdc-versions/#{version}/lib`
`cp -r #{CDC_SOURCE_HOME}/flink-cdc-dist/src/main/flink-cdc-bin/* cdc-versions/#{version}/`
puts 'Compiling snapshot version...'
puts `cd #{CDC_SOURCE_HOME} && mvn clean install -DskipTests`
FILES.each do |lib|
if lib == 'dist'
`cp #{CDC_SOURCE_HOME}/flink-cdc-#{lib}/target/flink-cdc-#{lib}-#{version}.jar cdc-versions/#{version}/lib/`
else
`cp #{CDC_SOURCE_HOME}/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-#{lib}/target/flink-cdc-#{lib}-#{version}.jar cdc-versions/#{version}/lib/`
end
end
end
download_released
compile_snapshot HEAD_VERSION
puts 'Done'

@ -0,0 +1,162 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'pathname'
require 'securerandom'
WAITING_SECONDS = 20
FLINK_HOME = ENV['FLINK_HOME']
throw 'Unspecified `FLINK_HOME` environment variable.' if FLINK_HOME.nil?
FLINK_HOME = Pathname.new(FLINK_HOME).realpath
SOURCE_PORT = 3306
DATABASE_NAME = 'fallen'
TABLES = ['girl'].freeze
def exec_sql_source(sql)
`mysql -h 127.0.0.1 -P#{SOURCE_PORT} -uroot --skip-password -e "USE #{DATABASE_NAME}; #{sql}"`
end
def put_mystery_data(mystery)
exec_sql_source("REPLACE INTO girl(id, name) VALUES (17, '#{mystery}');")
end
def ensure_mystery_data(mystery)
throw StandardError, 'Failed to get specific mystery string' unless `cat #{FLINK_HOME}/log/*.out`.include? mystery
end
puts ' Waiting for source to start up...'
next until exec_sql_source("SELECT '1';") == "1\n1\n"
def test_migration_chore(from_version, to_version)
TABLES.each do |table_name|
exec_sql_source("DROP TABLE IF EXISTS #{table_name};")
exec_sql_source("CREATE TABLE #{table_name} (ID INT NOT NULL, NAME VARCHAR(17), PRIMARY KEY (ID));")
end
# Clear previous savepoints and logs
`rm -rf savepoints`
# Prepare for current YAML file
test_route = !%w[3.0.0 3.0.1].include?(from_version)
yaml_job_template_file = test_route ? 'conf/pipeline-route.yaml' : 'conf/pipeline.yaml'
yaml_job_file = 'conf/temp.yaml'
yaml_content = File.open(yaml_job_template_file).read.gsub('${PIPELINE_NAME}',
"Pipeline Migration Job (#{from_version} -> #{to_version})")
File.write(yaml_job_file, yaml_content)
# Submit current pipeline job file
submit_job_output = `bash ./cdc-versions/#{from_version}/bin/flink-cdc.sh --flink-home #{FLINK_HOME} #{yaml_job_file}`
puts " #{submit_job_output}"
current_job_id = submit_job_output.split("\n")[1].split.last
raise StandardError, 'Failed to submit Flink job' unless current_job_id.length == 32
puts " Current Job ID: #{current_job_id}"
# Verify if data sync works
random_string_1 = SecureRandom.hex(8)
put_mystery_data random_string_1
sleep WAITING_SECONDS
ensure_mystery_data random_string_1
# Stop current job and create a savepoint
puts `#{FLINK_HOME}/bin/flink stop --savepointPath #{Dir.pwd}/savepoints #{current_job_id}`
savepoint_file = `ls savepoints`.split("\n").last
# Migrate to a newer CDC version
puts " Submitting CDC jobs at #{to_version}..."
submit_job_output = `bash ./cdc-versions/#{to_version}/bin/flink-cdc.sh --from-savepoint #{Dir.pwd}/savepoints/#{savepoint_file} --allow-nonRestored-state --flink-home #{FLINK_HOME} #{yaml_job_file}`
puts " #{submit_job_output}"
new_job_id = submit_job_output.split("\n")[1].split.last
raise StandardError, 'Failed to submit Flink job' unless new_job_id.length == 32
puts " Upgraded Job ID: #{new_job_id}"
# Verify if data sync works
puts "Submitted job at #{to_version} as #{new_job_id}"
random_string_2 = SecureRandom.hex(8)
put_mystery_data random_string_2
sleep WAITING_SECONDS
ensure_mystery_data random_string_2
puts `#{FLINK_HOME}/bin/flink cancel #{new_job_id}`
true
end
def test_migration(from_version, to_version)
puts "➡️ [MIGRATION] Testing migration from #{from_version} to #{to_version}..."
puts " with Flink #{FLINK_HOME}..."
begin
result = test_migration_chore from_version, to_version
if result
puts "✅ [MIGRATION] Successfully migrated from #{from_version} to #{to_version}!"
else
puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}..."
end
result
rescue => e
puts "❌ [MIGRATION] Failed to migrate from #{from_version} to #{to_version}...", e
false
end
end
version_list = %w[3.0.0 3.0.1 3.1.0 3.1.1 3.2-SNAPSHOT]
no_savepoint_versions = %w[3.0.0 3.0.1]
version_result = Hash.new('❓')
@failures = []
version_list.each_with_index do |old_version, old_index|
puts 'Restarting cluster...'
`#{FLINK_HOME}/bin/stop-cluster.sh`
puts 'Stopped cluster.'
`#{FLINK_HOME}/bin/start-cluster.sh`
puts 'Started cluster.'
version_list.each_with_index do |new_version, new_index|
next if old_index > new_index
next if no_savepoint_versions.include? new_version
result = test_migration old_version, new_version
version_result[old_version + new_version] = result ? '✅' : '❌'
@failures << [old_version, new_version] unless result
end
end
printable_result = []
printable_result << [''] + version_list
version_list.each_with_index do |old_version, old_index|
table_line = [old_version]
version_list.each_with_index do |new_version, new_index|
table_line << if old_index > new_index
''
else
version_result[old_version + new_version]
end
end
printable_result << table_line
end
begin
require 'terminal-table'
puts Terminal::Table.new rows: printable_result, title: 'Migration Test Result'
rescue LoadError
puts 'Test summary: ', printable_result
end
puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`"
if @failures.filter { |old_version, new_version| new_version == version_list.last && old_version != '3.1.0' }.any?
abort 'Some migration to snapshot version tests failed.'
end
Loading…
Cancel
Save