From 54e16ebf99da745f5ec34a4a8419921adbaea857 Mon Sep 17 00:00:00 2001
From: yux <34335406+yuxiqian@users.noreply.github.com>
Date: Fri, 7 Jun 2024 16:52:30 +0800
Subject: [PATCH] [FLINK-35464][cdc] Fixes operator state backwards
compatibility from CDC 3.0.x and add migration tests
This closes #3370.
---
.github/labeler.yml | 2 +
.github/workflows/flink_cdc.yml | 12 ++
.../flink-cdc-migration-testcases/pom.xml | 62 ++++++++++
.../migration/tests/MigrationTestBase.java | 117 ++++++++++++++++++
.../tests/SchemaManagerMigrationTest.java | 39 ++++++
.../tests/SchemaRegistryMigrationTest.java | 38 ++++++
.../tests/TableChangeInfoMigrationTest.java | 41 ++++++
.../src/test/resources/log4j2-test.properties | 25 ++++
.../flink-cdc-release-3.0.0/pom.xml | 86 +++++++++++++
.../migration/tests/MigrationMockBase.java | 27 ++++
.../tests/SchemaManagerMigrationMock.java | 65 ++++++++++
.../tests/SchemaRegistryMigrationMock.java | 95 ++++++++++++++
.../flink-cdc-release-3.0.1/pom.xml | 86 +++++++++++++
.../migration/tests/MigrationMockBase.java | 27 ++++
.../tests/SchemaManagerMigrationMock.java | 65 ++++++++++
.../tests/SchemaRegistryMigrationMock.java | 95 ++++++++++++++
.../flink-cdc-release-3.1.0/pom.xml | 87 +++++++++++++
.../migration/tests/MigrationMockBase.java | 27 ++++
.../tests/SchemaManagerMigrationMock.java | 68 ++++++++++
.../tests/SchemaRegistryMigrationMock.java | 116 +++++++++++++++++
.../tests/TableChangeInfoMigrationMock.java | 60 +++++++++
.../flink-cdc-release-snapshot/pom.xml | 90 ++++++++++++++
.../migration/tests/MigrationMockBase.java | 27 ++++
.../tests/SchemaManagerMigrationMock.java | 65 ++++++++++
.../tests/SchemaRegistryMigrationMock.java | 116 +++++++++++++++++
.../tests/TableChangeInfoMigrationMock.java | 60 +++++++++
flink-cdc-migration-tests/pom.xml | 57 +++++++++
.../schema/coordinator/SchemaManager.java | 58 +++++----
.../schema/coordinator/SchemaRegistry.java | 51 ++++++--
.../operators/transform/TableChangeInfo.java | 6 +-
.../serializer/schema/SchemaSerializer.java | 32 +++--
.../schema/coordinator/SchemaManagerTest.java | 4 +-
32 files changed, 1759 insertions(+), 47 deletions(-)
create mode 100644 flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
create mode 100644 flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationTest.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationTest.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationTest.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/resources/log4j2-test.properties
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.0/pom.xml
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.1/pom.xml
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.1.0/pom.xml
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-snapshot/pom.xml
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
create mode 100644 flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
create mode 100644 flink-cdc-migration-tests/pom.xml
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 7b55dea33..f36f372c7 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -37,6 +37,8 @@ runtime:
- flink-cdc-runtime/**/*
e2e-tests:
- flink-cdc-e2e-tests/**/*
+migration-tests:
+ - flink-cdc-migration-tests/**/*
base:
- flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/**/*
debezium:
diff --git a/.github/workflows/flink_cdc.yml b/.github/workflows/flink_cdc.yml
index 61e05301c..f8cec22ec 100644
--- a/.github/workflows/flink_cdc.yml
+++ b/.github/workflows/flink_cdc.yml
@@ -122,6 +122,18 @@ jobs:
mvn ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-license -N \
-Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) ${{ env.MVN_VALIDATION_DIR }}"
+ migration_test:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Check out repository code
+ uses: actions/checkout@v4
+ with:
+ submodules: true
+ - name: Compile snapshot CDC version
+ run: mvn --no-snapshot-updates -B install -DskipTests
+ - name: Run migration tests
+ run: cd flink-cdc-migration-tests && mvn clean verify
+
compile_and_test:
runs-on: ubuntu-latest
strategy:
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
new file mode 100644
index 000000000..4be8f9c12
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
@@ -0,0 +1,62 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-migration-tests
+ ${revision}
+
+
+ flink-cdc-migration-testcases
+ flink-cdc-migration-testcases
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+ org.apache.flink
+ flink-cdc-release-3.0.0
+ ${revision}
+ compile
+
+
+ org.apache.flink
+ flink-cdc-release-3.0.1
+ ${revision}
+ compile
+
+
+ org.apache.flink
+ flink-cdc-release-3.1.0
+ ${revision}
+ compile
+
+
+ org.apache.flink
+ flink-cdc-release-snapshot
+ ${revision}
+ compile
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
new file mode 100644
index 000000000..2172f011e
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utilities for migration tests. */
+public class MigrationTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MigrationTestBase.class);
+
+ /** Flink CDC versions since 3.0. */
+ public enum FlinkCdcVersion {
+ v3_0_0,
+ v3_0_1,
+ v3_1_0,
+ SNAPSHOT;
+
+ public String getShadedClassPrefix() {
+ switch (this) {
+ case v3_0_0:
+ return "com.ververica.cdc.v3_0_0";
+ case v3_0_1:
+ return "com.ververica.cdc.v3_0_1";
+ case v3_1_0:
+ return "org.apache.flink.cdc.v3_1_0";
+ case SNAPSHOT:
+ return "org.apache.flink.cdc.snapshot";
+ default:
+ throw new RuntimeException("Unknown Flink CDC version: " + this);
+ }
+ }
+ }
+
+ private static final List versions =
+ Arrays.asList(
+ FlinkCdcVersion.v3_0_0,
+ FlinkCdcVersion.v3_0_1,
+ FlinkCdcVersion.v3_1_0,
+ FlinkCdcVersion.SNAPSHOT);
+
+ public static List getAllVersions() {
+ return versions.subList(0, versions.size());
+ }
+
+ public static List getVersionSince(FlinkCdcVersion sinceVersion) {
+ return versions.subList(versions.indexOf(sinceVersion), versions.size());
+ }
+
+ public static List getAllVersionExcept(FlinkCdcVersion... excludedVersions) {
+ List excluded = Arrays.asList(excludedVersions);
+ return versions.stream().filter(e -> !excluded.contains(e)).collect(Collectors.toList());
+ }
+
+ public static FlinkCdcVersion getSnapshotVersion() {
+ return versions.get(versions.size() - 1);
+ }
+
+ private static Class> getMockClass(FlinkCdcVersion version, String caseName)
+ throws Exception {
+ return Class.forName(version.getShadedClassPrefix() + ".migration.tests." + caseName);
+ }
+
+ protected void testMigrationFromTo(
+ FlinkCdcVersion fromVersion, FlinkCdcVersion toVersion, String caseName)
+ throws Exception {
+
+ LOG.info("Testing {} compatibility case from {} -> {}", caseName, fromVersion, toVersion);
+
+ // Serialize dummy object to bytes in early versions
+ Class> fromVersionMockClass = getMockClass(fromVersion, caseName);
+ Object fromVersionMockObject = fromVersionMockClass.newInstance();
+
+ int serializerVersion =
+ (int)
+ fromVersionMockClass
+ .getDeclaredMethod("getSerializerVersion")
+ .invoke(fromVersionMockObject);
+ byte[] serializedObject =
+ (byte[])
+ fromVersionMockClass
+ .getDeclaredMethod("serializeObject")
+ .invoke(fromVersionMockObject);
+
+ // Deserialize object in latest versions
+ Class> toVersionMockClass = getMockClass(toVersion, caseName);
+ Object toVersionMockObject = toVersionMockClass.newInstance();
+
+ Assert.assertTrue(
+ (boolean)
+ toVersionMockClass
+ .getDeclaredMethod(
+ "deserializeAndCheckObject", int.class, byte[].class)
+ .invoke(toVersionMockObject, serializerVersion, serializedObject));
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationTest.java b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationTest.java
new file mode 100644
index 000000000..11adf6f35
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import org.junit.Test;
+
+import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;
+
+/** Migration test cases for {@link SchemaManager}. */
+public class SchemaManagerMigrationTest extends MigrationTestBase {
+
+ public static String mockCaseName = "SchemaManagerMigrationMock";
+
+ @Test
+ public void testMigration() throws Exception {
+ // It is known that 3.1.0 that breaks backwards compatibility.
+ // No state compatibility is guaranteed.
+ for (FlinkCdcVersion version : getAllVersionExcept(v3_1_0)) {
+ testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
+ }
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationTest.java b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationTest.java
new file mode 100644
index 000000000..1e1a4c68e
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import org.junit.Test;
+
+import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;
+
+/** Migration test cases for {@link SchemaRegistry}. */
+public class SchemaRegistryMigrationTest extends MigrationTestBase {
+ public static String mockCaseName = "SchemaRegistryMigrationMock";
+
+ @Test
+ public void testMigration() throws Exception {
+ // It is known that 3.1.0 that breaks backwards compatibility.
+ // No state compatibility is guaranteed.
+ for (FlinkCdcVersion version : getAllVersionExcept(v3_1_0)) {
+ testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
+ }
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationTest.java b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationTest.java
new file mode 100644
index 000000000..3af59245c
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
+
+import org.junit.Test;
+
+import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_0_0;
+import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_0_1;
+import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;
+
+/** Migration test cases for {@link TableChangeInfo}. */
+public class TableChangeInfoMigrationTest extends MigrationTestBase {
+
+ public static String mockCaseName = "TableChangeInfoMigrationMock";
+
+ @Test
+ public void testMigration() throws Exception {
+ // Transform feature does not present until 3.1.0, and
+ // CDC 3.1.0 breaks backwards compatibility.
+ for (FlinkCdcVersion version : getAllVersionExcept(v3_0_0, v3_0_1, v3_1_0)) {
+ testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
+ }
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/resources/log4j2-test.properties b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..f0d32fb59
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/resources/log4j2-test.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.0/pom.xml b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/pom.xml
new file mode 100644
index 000000000..7207e5996
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-migration-tests
+ ${revision}
+
+
+ flink-cdc-release-3.0.0
+ flink-cdc-release-3.0.0
+
+
+
+ com.ververica
+ flink-cdc-base
+ 3.0.0
+
+
+ com.ververica
+ flink-cdc-common
+ 3.0.0
+
+
+ com.ververica
+ flink-cdc-runtime
+ 3.0.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ shade-flink-cdc
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ com.ververica.cdc
+ com.ververica.cdc.v3_0_0
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java
new file mode 100644
index 000000000..2dd92446e
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.migration.tests;
+
+/** Base classes for migration test cases. */
+public interface MigrationMockBase {
+ int getSerializerVersion();
+
+ byte[] serializeObject() throws Exception;
+
+ boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java
new file mode 100644
index 000000000..44477b587
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.migration.tests;
+
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.DataTypes;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaManagerMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummyObject() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return SchemaManager.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return SchemaManager.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
+ Object expected = generateDummyObject();
+ Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
+ return expected.equals(actual);
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java
new file mode 100644
index 000000000..3322fb0be
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.0/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.migration.tests;
+
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.DataTypes;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaRegistryMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummySchemaManager() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ public SchemaRegistry generateSchemaRegistry() {
+ return new SchemaRegistry("Dummy Name", null, e -> {});
+ }
+
+ private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ return (SchemaManager) field.get(schemaRegistry);
+ }
+
+ private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
+ throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ field.set(schemaRegistry, schemaManager);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return -1;
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ SchemaRegistry registry = generateSchemaRegistry();
+
+ setSchemaManager(registry, generateDummySchemaManager());
+ registry.checkpointCoordinator(0, future);
+
+ while (!future.isDone()) {
+ Thread.sleep(1000);
+ }
+ return future.get();
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
+ SchemaRegistry expected = generateSchemaRegistry();
+ SchemaRegistry actual = generateSchemaRegistry();
+ actual.resetToCheckpoint(0, b);
+ return getSchemaManager(expected).equals(getSchemaManager(actual));
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.1/pom.xml b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/pom.xml
new file mode 100644
index 000000000..bdb5ecfe9
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-migration-tests
+ ${revision}
+
+
+ flink-cdc-release-3.0.1
+ flink-cdc-release-3.0.1
+
+
+
+ com.ververica
+ flink-cdc-base
+ 3.0.1
+
+
+ com.ververica
+ flink-cdc-common
+ 3.0.1
+
+
+ com.ververica
+ flink-cdc-runtime
+ 3.0.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ shade-flink-cdc
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ com.ververica.cdc
+ com.ververica.cdc.v3_0_1
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java
new file mode 100644
index 000000000..2dd92446e
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/MigrationMockBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.migration.tests;
+
+/** Base classes for migration test cases. */
+public interface MigrationMockBase {
+ int getSerializerVersion();
+
+ byte[] serializeObject() throws Exception;
+
+ boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java
new file mode 100644
index 000000000..44477b587
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaManagerMigrationMock.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.migration.tests;
+
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.DataTypes;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaManagerMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummyObject() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return SchemaManager.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return SchemaManager.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
+ Object expected = generateDummyObject();
+ Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
+ return expected.equals(actual);
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java
new file mode 100644
index 000000000..3322fb0be
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.0.1/src/main/java/com/ververica/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.migration.tests;
+
+import com.ververica.cdc.common.event.TableId;
+import com.ververica.cdc.common.schema.Schema;
+import com.ververica.cdc.common.types.DataTypes;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
+import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaRegistryMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummySchemaManager() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ public SchemaRegistry generateSchemaRegistry() {
+ return new SchemaRegistry("Dummy Name", null, e -> {});
+ }
+
+ private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ return (SchemaManager) field.get(schemaRegistry);
+ }
+
+ private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
+ throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ field.set(schemaRegistry, schemaManager);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return -1;
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ SchemaRegistry registry = generateSchemaRegistry();
+
+ setSchemaManager(registry, generateDummySchemaManager());
+ registry.checkpointCoordinator(0, future);
+
+ while (!future.isDone()) {
+ Thread.sleep(1000);
+ }
+ return future.get();
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
+ SchemaRegistry expected = generateSchemaRegistry();
+ SchemaRegistry actual = generateSchemaRegistry();
+ actual.resetToCheckpoint(0, b);
+ return getSchemaManager(expected).equals(getSchemaManager(actual));
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.0/pom.xml b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/pom.xml
new file mode 100644
index 000000000..3d6cd4394
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-migration-tests
+ ${revision}
+
+
+ flink-cdc-release-3.1.0
+ flink-cdc-release-3.1.0
+
+
+
+ org.apache.flink
+ flink-cdc-base
+ 3.1.0
+
+
+ org.apache.flink
+ flink-cdc-common
+ 3.1.0
+
+
+ org.apache.flink
+ flink-cdc-runtime
+ 3.1.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ shade-flink-cdc
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ org.apache.flink.cdc
+ org.apache.flink.cdc.v3_1_0
+ META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
new file mode 100644
index 000000000..3f52615db
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+/** Base classes for migration test cases. */
+public interface MigrationMockBase {
+ int getSerializerVersion();
+
+ byte[] serializeObject() throws Exception;
+
+ boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
new file mode 100644
index 000000000..c4f0788dd
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaManagerMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ private static final String SCHEMA_MANAGER =
+ "runtime.operators.schema.coordinator.SchemaManager";
+
+ public SchemaManager generateDummyObject() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return SchemaManager.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return SchemaManager.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
+ Object expected = generateDummyObject();
+ Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
+ return expected.equals(actual);
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
new file mode 100644
index 000000000..93269abec
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaRegistryMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummySchemaManager() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ public SchemaRegistry generateSchemaRegistry() {
+ return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>());
+ }
+
+ private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ return (SchemaManager) field.get(schemaRegistry);
+ }
+
+ private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
+ throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ field.set(schemaRegistry, schemaManager);
+ }
+
+ private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation");
+ field.setAccessible(true);
+ return (SchemaDerivation) field.get(schemaRegistry);
+ }
+
+ private List> getSchemaRoutes(SchemaRegistry schemaRegistry)
+ throws Exception {
+ SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry);
+ Field field = SchemaDerivation.class.getDeclaredField("routes");
+ field.setAccessible(true);
+ return (List>) field.get(schemaDerivation);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return -1;
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ SchemaRegistry registry = generateSchemaRegistry();
+ setSchemaManager(registry, generateDummySchemaManager());
+
+ registry.checkpointCoordinator(0, future);
+
+ while (!future.isDone()) {
+ Thread.sleep(1000);
+ }
+ return future.get();
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
+ SchemaRegistry expected = generateSchemaRegistry();
+ setSchemaManager(expected, generateDummySchemaManager());
+ SchemaRegistry actual = generateSchemaRegistry();
+ actual.resetToCheckpoint(0, b);
+ return getSchemaManager(expected).equals(getSchemaManager(actual))
+ && getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
new file mode 100644
index 000000000..6a14a2be2
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.0/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class TableChangeInfoMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public TableChangeInfo generateDummyObject() {
+ return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return TableChangeInfo.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return TableChangeInfo.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] bytes) throws Exception {
+ TableChangeInfo expected = generateDummyObject();
+ TableChangeInfo actual = TableChangeInfo.SERIALIZER.deserialize(version, bytes);
+
+ return expected.getTableId().equals(actual.getTableId())
+ && expected.getOriginalSchema().equals(actual.getOriginalSchema())
+ && expected.getTransformedSchema().equals(actual.getTransformedSchema());
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-snapshot/pom.xml b/flink-cdc-migration-tests/flink-cdc-release-snapshot/pom.xml
new file mode 100644
index 000000000..232396ba9
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-snapshot/pom.xml
@@ -0,0 +1,90 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-migration-tests
+ ${revision}
+
+
+ flink-cdc-release-snapshot
+ flink-cdc-release-snapshot
+
+
+
+ org.apache.flink
+ flink-cdc-base
+ ${revision}
+ compile
+
+
+ org.apache.flink
+ flink-cdc-common
+ ${revision}
+ compile
+
+
+ org.apache.flink
+ flink-cdc-runtime
+ ${revision}
+ compile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ shade-flink-cdc
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ org.apache.flink.cdc
+ org.apache.flink.cdc.snapshot
+ META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
new file mode 100644
index 000000000..3f52615db
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+/** Base classes for migration test cases. */
+public interface MigrationMockBase {
+ int getSerializerVersion();
+
+ byte[] serializeObject() throws Exception;
+
+ boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
new file mode 100644
index 000000000..c946b9f5f
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaManagerMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummyObject() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return SchemaManager.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return SchemaManager.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
+ Object expected = generateDummyObject();
+ Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
+ return expected.equals(actual);
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
new file mode 100644
index 000000000..93269abec
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaRegistryMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummySchemaManager() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ public SchemaRegistry generateSchemaRegistry() {
+ return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>());
+ }
+
+ private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ return (SchemaManager) field.get(schemaRegistry);
+ }
+
+ private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
+ throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ field.set(schemaRegistry, schemaManager);
+ }
+
+ private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation");
+ field.setAccessible(true);
+ return (SchemaDerivation) field.get(schemaRegistry);
+ }
+
+ private List> getSchemaRoutes(SchemaRegistry schemaRegistry)
+ throws Exception {
+ SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry);
+ Field field = SchemaDerivation.class.getDeclaredField("routes");
+ field.setAccessible(true);
+ return (List>) field.get(schemaDerivation);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return -1;
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ SchemaRegistry registry = generateSchemaRegistry();
+ setSchemaManager(registry, generateDummySchemaManager());
+
+ registry.checkpointCoordinator(0, future);
+
+ while (!future.isDone()) {
+ Thread.sleep(1000);
+ }
+ return future.get();
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
+ SchemaRegistry expected = generateSchemaRegistry();
+ setSchemaManager(expected, generateDummySchemaManager());
+ SchemaRegistry actual = generateSchemaRegistry();
+ actual.resetToCheckpoint(0, b);
+ return getSchemaManager(expected).equals(getSchemaManager(actual))
+ && getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
new file mode 100644
index 000000000..6a14a2be2
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class TableChangeInfoMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public TableChangeInfo generateDummyObject() {
+ return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return TableChangeInfo.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return TableChangeInfo.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] bytes) throws Exception {
+ TableChangeInfo expected = generateDummyObject();
+ TableChangeInfo actual = TableChangeInfo.SERIALIZER.deserialize(version, bytes);
+
+ return expected.getTableId().equals(actual.getTableId())
+ && expected.getOriginalSchema().equals(actual.getOriginalSchema())
+ && expected.getTransformedSchema().equals(actual.getTransformedSchema());
+ }
+}
diff --git a/flink-cdc-migration-tests/pom.xml b/flink-cdc-migration-tests/pom.xml
new file mode 100644
index 000000000..ef92282f8
--- /dev/null
+++ b/flink-cdc-migration-tests/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-parent
+ ${revision}
+
+
+ flink-cdc-migration-tests
+ flink-cdc-migration-tests
+ pom
+
+
+ flink-cdc-release-3.0.0
+ flink-cdc-release-3.0.1
+ flink-cdc-release-3.1.0
+ flink-cdc-release-snapshot
+ flink-cdc-migration-testcases
+
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+ true
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
index 5b71e1c4e..99335180d 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
@@ -172,7 +172,7 @@ public class SchemaManager {
/** Serializer for {@link SchemaManager}. */
public static class Serializer implements SimpleVersionedSerializer {
- public static final int CURRENT_VERSION = 0;
+ public static final int CURRENT_VERSION = 1;
@Override
public int getVersion() {
@@ -211,30 +211,40 @@ public class SchemaManager {
@Override
public SchemaManager deserialize(int version, byte[] serialized) throws IOException {
- TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
- SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
- try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
- DataInputStream in = new DataInputStream(bais)) {
- // Total schema length
- int numTables = in.readInt();
- Map> tableSchemas = new HashMap<>(numTables);
- for (int i = 0; i < numTables; i++) {
- // Table ID
- TableId tableId =
- tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
- // Schema with versions
- int numVersions = in.readInt();
- SortedMap versionedSchemas = new TreeMap<>(Integer::compareTo);
- for (int j = 0; j < numVersions; j++) {
- // Version
- int schemaVersion = in.readInt();
- Schema schema =
- schemaSerializer.deserialize(new DataInputViewStreamWrapper(in));
- versionedSchemas.put(schemaVersion, schema);
+ switch (version) {
+ case 0:
+ case 1:
+ TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
+ SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ // Total schema length
+ int numTables = in.readInt();
+ Map> tableSchemas =
+ new HashMap<>(numTables);
+ for (int i = 0; i < numTables; i++) {
+ // Table ID
+ TableId tableId =
+ tableIdSerializer.deserialize(
+ new DataInputViewStreamWrapper(in));
+ // Schema with versions
+ int numVersions = in.readInt();
+ SortedMap versionedSchemas =
+ new TreeMap<>(Integer::compareTo);
+ for (int j = 0; j < numVersions; j++) {
+ // Version
+ int schemaVersion = in.readInt();
+ Schema schema =
+ schemaSerializer.deserialize(
+ version, new DataInputViewStreamWrapper(in));
+ versionedSchemas.put(schemaVersion, schema);
+ }
+ tableSchemas.put(tableId, versionedSchemas);
+ }
+ return new SchemaManager(tableSchemas);
}
- tableSchemas.put(tableId, versionedSchemas);
- }
- return new SchemaManager(tableSchemas);
+ default:
+ throw new IOException("Unrecognized serialization version " + version);
}
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 3361c9454..2c718dcec 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -46,6 +46,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -196,18 +198,43 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
DataInputStream in = new DataInputStream(bais)) {
int schemaManagerSerializerVersion = in.readInt();
- int length = in.readInt();
- byte[] serializedSchemaManager = new byte[length];
- in.readFully(serializedSchemaManager);
- schemaManager =
- SchemaManager.SERIALIZER.deserialize(
- schemaManagerSerializerVersion, serializedSchemaManager);
- Map> derivationMapping =
- SchemaDerivation.deserializerDerivationMapping(in);
- schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping);
- requestHandler =
- new SchemaRegistryRequestHandler(
- metadataApplier, schemaManager, schemaDerivation);
+ switch (schemaManagerSerializerVersion) {
+ case 0:
+ {
+ int length = in.readInt();
+ byte[] serializedSchemaManager = new byte[length];
+ in.readFully(serializedSchemaManager);
+ schemaManager =
+ SchemaManager.SERIALIZER.deserialize(
+ schemaManagerSerializerVersion, serializedSchemaManager);
+ schemaDerivation =
+ new SchemaDerivation(schemaManager, routes, Collections.emptyMap());
+ requestHandler =
+ new SchemaRegistryRequestHandler(
+ metadataApplier, schemaManager, schemaDerivation);
+ break;
+ }
+ case 1:
+ {
+ int length = in.readInt();
+ byte[] serializedSchemaManager = new byte[length];
+ in.readFully(serializedSchemaManager);
+ schemaManager =
+ SchemaManager.SERIALIZER.deserialize(
+ schemaManagerSerializerVersion, serializedSchemaManager);
+ Map> derivationMapping =
+ SchemaDerivation.deserializerDerivationMapping(in);
+ schemaDerivation =
+ new SchemaDerivation(schemaManager, routes, derivationMapping);
+ requestHandler =
+ new SchemaRegistryRequestHandler(
+ metadataApplier, schemaManager, schemaDerivation);
+ break;
+ }
+ default:
+ throw new IOException(
+ "Unrecognized serialization version " + schemaManagerSerializerVersion);
+ }
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
index db92f40e7..7fab9c65a 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
@@ -109,7 +109,7 @@ public class TableChangeInfo {
/** Serializer for {@link TableChangeInfo}. */
public static class Serializer implements SimpleVersionedSerializer {
- public static final int CURRENT_VERSION = 0;
+ public static final int CURRENT_VERSION = 1;
@Override
public int getVersion() {
@@ -140,9 +140,9 @@ public class TableChangeInfo {
DataInputStream in = new DataInputStream(bais)) {
TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
Schema originalSchema =
- schemaSerializer.deserialize(new DataInputViewStreamWrapper(in));
+ schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
Schema transformedSchema =
- schemaSerializer.deserialize(new DataInputViewStreamWrapper(in));
+ schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
return TableChangeInfo.of(tableId, originalSchema, transformedSchema);
}
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
index 5a4837938..8e2a4fa02 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/SchemaSerializer.java
@@ -89,15 +89,33 @@ public class SchemaSerializer extends TypeSerializerSingleton {
stringSerializer.serialize(record.comment(), target);
}
+ private static final int CURRENT_VERSION = 1;
+
@Override
public Schema deserialize(DataInputView source) throws IOException {
- return Schema.newBuilder()
- .setColumns(columnsSerializer.deserialize(source))
- .primaryKey(primaryKeysSerializer.deserialize(source))
- .partitionKey(partitionKeysSerializer.deserialize(source))
- .options(optionsSerializer.deserialize(source))
- .comment(stringSerializer.deserialize(source))
- .build();
+ return deserialize(CURRENT_VERSION, source);
+ }
+
+ public Schema deserialize(int version, DataInputView source) throws IOException {
+ switch (version) {
+ case 0:
+ return Schema.newBuilder()
+ .setColumns(columnsSerializer.deserialize(source))
+ .primaryKey(primaryKeysSerializer.deserialize(source))
+ .options(optionsSerializer.deserialize(source))
+ .comment(stringSerializer.deserialize(source))
+ .build();
+ case 1:
+ return Schema.newBuilder()
+ .setColumns(columnsSerializer.deserialize(source))
+ .primaryKey(primaryKeysSerializer.deserialize(source))
+ .partitionKey(partitionKeysSerializer.deserialize(source))
+ .options(optionsSerializer.deserialize(source))
+ .comment(stringSerializer.deserialize(source))
+ .build();
+ default:
+ throw new IOException("Unrecognized serialization version " + version);
+ }
}
@Override
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java
index 2b1df7102..941714dcc 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java
@@ -195,7 +195,9 @@ class SchemaManagerTest {
schemaManager.applySchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA));
schemaManager.applySchemaChange(new CreateTableEvent(PRODUCTS, PRODUCTS_SCHEMA));
byte[] serialized = SchemaManager.SERIALIZER.serialize(schemaManager);
- SchemaManager deserialized = SchemaManager.SERIALIZER.deserialize(0, serialized);
+ SchemaManager deserialized =
+ SchemaManager.SERIALIZER.deserialize(
+ SchemaManager.Serializer.CURRENT_VERSION, serialized);
assertThat(deserialized).isEqualTo(schemaManager);
}
}