[FLINK-35464][cdc] Fixes operator state backwards compatibility from CDC 3.0.x and add migration tests

This closes  .
pull/3402/head
yux committed by GitHub
parent e59674d95c
commit 54e16ebf99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -37,6 +37,8 @@ runtime:
- flink-cdc-runtime/**/*
e2e-tests:
- flink-cdc-e2e-tests/**/*
migration-tests:
- flink-cdc-migration-tests/**/*
base:
- flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/**/*
debezium:

@ -122,6 +122,18 @@ jobs:
mvn ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-license -N \
-Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) ${{ env.MVN_VALIDATION_DIR }}"
migration_test:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
with:
submodules: true
- name: Compile snapshot CDC version
run: mvn --no-snapshot-updates -B install -DskipTests
- name: Run migration tests
run: cd flink-cdc-migration-tests && mvn clean verify
compile_and_test:
runs-on: ubuntu-latest
strategy:

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-migration-testcases</artifactId>
<name>flink-cdc-migration-testcases</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.0.0</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.0.1</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.1.0</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-snapshot</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/** Utilities for migration tests. */
public class MigrationTestBase {
private static final Logger LOG = LoggerFactory.getLogger(MigrationTestBase.class);
/** Flink CDC versions since 3.0. */
public enum FlinkCdcVersion {
v3_0_0,
v3_0_1,
v3_1_0,
SNAPSHOT;
public String getShadedClassPrefix() {
switch (this) {
case v3_0_0:
return "com.ververica.cdc.v3_0_0";
case v3_0_1:
return "com.ververica.cdc.v3_0_1";
case v3_1_0:
return "org.apache.flink.cdc.v3_1_0";
case SNAPSHOT:
return "org.apache.flink.cdc.snapshot";
default:
throw new RuntimeException("Unknown Flink CDC version: " + this);
}
}
}
private static final List<FlinkCdcVersion> versions =
Arrays.asList(
FlinkCdcVersion.v3_0_0,
FlinkCdcVersion.v3_0_1,
FlinkCdcVersion.v3_1_0,
FlinkCdcVersion.SNAPSHOT);
public static List<FlinkCdcVersion> getAllVersions() {
return versions.subList(0, versions.size());
}
public static List<FlinkCdcVersion> getVersionSince(FlinkCdcVersion sinceVersion) {
return versions.subList(versions.indexOf(sinceVersion), versions.size());
}
public static List<FlinkCdcVersion> getAllVersionExcept(FlinkCdcVersion... excludedVersions) {
List<FlinkCdcVersion> excluded = Arrays.asList(excludedVersions);
return versions.stream().filter(e -> !excluded.contains(e)).collect(Collectors.toList());
}
public static FlinkCdcVersion getSnapshotVersion() {
return versions.get(versions.size() - 1);
}
private static Class<?> getMockClass(FlinkCdcVersion version, String caseName)
throws Exception {
return Class.forName(version.getShadedClassPrefix() + ".migration.tests." + caseName);
}
protected void testMigrationFromTo(
FlinkCdcVersion fromVersion, FlinkCdcVersion toVersion, String caseName)
throws Exception {
LOG.info("Testing {} compatibility case from {} -> {}", caseName, fromVersion, toVersion);
// Serialize dummy object to bytes in early versions
Class<?> fromVersionMockClass = getMockClass(fromVersion, caseName);
Object fromVersionMockObject = fromVersionMockClass.newInstance();
int serializerVersion =
(int)
fromVersionMockClass
.getDeclaredMethod("getSerializerVersion")
.invoke(fromVersionMockObject);
byte[] serializedObject =
(byte[])
fromVersionMockClass
.getDeclaredMethod("serializeObject")
.invoke(fromVersionMockObject);
// Deserialize object in latest versions
Class<?> toVersionMockClass = getMockClass(toVersion, caseName);
Object toVersionMockObject = toVersionMockClass.newInstance();
Assert.assertTrue(
(boolean)
toVersionMockClass
.getDeclaredMethod(
"deserializeAndCheckObject", int.class, byte[].class)
.invoke(toVersionMockObject, serializerVersion, serializedObject));
}
}

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.junit.Test;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;
/** Migration test cases for {@link SchemaManager}. */
public class SchemaManagerMigrationTest extends MigrationTestBase {
public static String mockCaseName = "SchemaManagerMigrationMock";
@Test
public void testMigration() throws Exception {
// It is known that 3.1.0 that breaks backwards compatibility.
// No state compatibility is guaranteed.
for (FlinkCdcVersion version : getAllVersionExcept(v3_1_0)) {
testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
}
}
}

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.junit.Test;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;
/** Migration test cases for {@link SchemaRegistry}. */
public class SchemaRegistryMigrationTest extends MigrationTestBase {
public static String mockCaseName = "SchemaRegistryMigrationMock";
@Test
public void testMigration() throws Exception {
// It is known that 3.1.0 that breaks backwards compatibility.
// No state compatibility is guaranteed.
for (FlinkCdcVersion version : getAllVersionExcept(v3_1_0)) {
testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
}
}
}

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
import org.junit.Test;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_0_0;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_0_1;
import static org.apache.flink.cdc.migration.tests.MigrationTestBase.FlinkCdcVersion.v3_1_0;
/** Migration test cases for {@link TableChangeInfo}. */
public class TableChangeInfoMigrationTest extends MigrationTestBase {
public static String mockCaseName = "TableChangeInfoMigrationMock";
@Test
public void testMigration() throws Exception {
// Transform feature does not present until 3.1.0, and
// CDC 3.1.0 breaks backwards compatibility.
for (FlinkCdcVersion version : getAllVersionExcept(v3_0_0, v3_0_1, v3_1_0)) {
testMigrationFromTo(version, getSnapshotVersion(), mockCaseName);
}
}
}

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level=INFO
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-release-3.0.0</artifactId>
<name>flink-cdc-release-3.0.0</name>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink-cdc</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.ververica.cdc</pattern>
<shadedPattern>com.ververica.cdc.v3_0_0</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.migration.tests;
/** Base classes for migration test cases. */
public interface MigrationMockBase {
int getSerializerVersion();
byte[] serializeObject() throws Exception;
boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.migration.tests;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaManagerMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummyObject() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
@Override
public int getSerializerVersion() {
return SchemaManager.SERIALIZER.getVersion();
}
@Override
public byte[] serializeObject() throws Exception {
return SchemaManager.SERIALIZER.serialize(generateDummyObject());
}
@Override
public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
Object expected = generateDummyObject();
Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
return expected.equals(actual);
}
}

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.migration.tests;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaRegistryMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummySchemaManager() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry("Dummy Name", null, e -> {});
}
private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
return (SchemaManager) field.get(schemaRegistry);
}
private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
field.set(schemaRegistry, schemaManager);
}
@Override
public int getSerializerVersion() {
return -1;
}
@Override
public byte[] serializeObject() throws Exception {
CompletableFuture<byte[]> future = new CompletableFuture<>();
SchemaRegistry registry = generateSchemaRegistry();
setSchemaManager(registry, generateDummySchemaManager());
registry.checkpointCoordinator(0, future);
while (!future.isDone()) {
Thread.sleep(1000);
}
return future.get();
}
@Override
public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
SchemaRegistry expected = generateSchemaRegistry();
SchemaRegistry actual = generateSchemaRegistry();
actual.resetToCheckpoint(0, b);
return getSchemaManager(expected).equals(getSchemaManager(actual));
}
}

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-release-3.0.1</artifactId>
<name>flink-cdc-release-3.0.1</name>
<dependencies>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>3.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink-cdc</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.ververica.cdc</pattern>
<shadedPattern>com.ververica.cdc.v3_0_1</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.migration.tests;
/** Base classes for migration test cases. */
public interface MigrationMockBase {
int getSerializerVersion();
byte[] serializeObject() throws Exception;
boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.migration.tests;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaManagerMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummyObject() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
@Override
public int getSerializerVersion() {
return SchemaManager.SERIALIZER.getVersion();
}
@Override
public byte[] serializeObject() throws Exception {
return SchemaManager.SERIALIZER.serialize(generateDummyObject());
}
@Override
public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
Object expected = generateDummyObject();
Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
return expected.equals(actual);
}
}

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.migration.tests;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager;
import com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaRegistryMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummySchemaManager() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry("Dummy Name", null, e -> {});
}
private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
return (SchemaManager) field.get(schemaRegistry);
}
private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
field.set(schemaRegistry, schemaManager);
}
@Override
public int getSerializerVersion() {
return -1;
}
@Override
public byte[] serializeObject() throws Exception {
CompletableFuture<byte[]> future = new CompletableFuture<>();
SchemaRegistry registry = generateSchemaRegistry();
setSchemaManager(registry, generateDummySchemaManager());
registry.checkpointCoordinator(0, future);
while (!future.isDone()) {
Thread.sleep(1000);
}
return future.get();
}
@Override
public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
SchemaRegistry expected = generateSchemaRegistry();
SchemaRegistry actual = generateSchemaRegistry();
actual.resetToCheckpoint(0, b);
return getSchemaManager(expected).equals(getSchemaManager(actual));
}
}

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-release-3.1.0</artifactId>
<name>flink-cdc-release-3.1.0</name>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>3.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink-cdc</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.flink.cdc</pattern>
<shadedPattern>org.apache.flink.cdc.v3_1_0</shadedPattern>
<excludes>META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
/** Base classes for migration test cases. */
public interface MigrationMockBase {
int getSerializerVersion();
byte[] serializeObject() throws Exception;
boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
}

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaManagerMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
private static final String SCHEMA_MANAGER =
"runtime.operators.schema.coordinator.SchemaManager";
public SchemaManager generateDummyObject() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
@Override
public int getSerializerVersion() {
return SchemaManager.SERIALIZER.getVersion();
}
@Override
public byte[] serializeObject() throws Exception {
return SchemaManager.SERIALIZER.serialize(generateDummyObject());
}
@Override
public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
Object expected = generateDummyObject();
Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
return expected.equals(actual);
}
}

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaRegistryMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummySchemaManager() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>());
}
private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
return (SchemaManager) field.get(schemaRegistry);
}
private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
field.set(schemaRegistry, schemaManager);
}
private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation");
field.setAccessible(true);
return (SchemaDerivation) field.get(schemaRegistry);
}
private List<Tuple2<Selectors, TableId>> getSchemaRoutes(SchemaRegistry schemaRegistry)
throws Exception {
SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry);
Field field = SchemaDerivation.class.getDeclaredField("routes");
field.setAccessible(true);
return (List<Tuple2<Selectors, TableId>>) field.get(schemaDerivation);
}
@Override
public int getSerializerVersion() {
return -1;
}
@Override
public byte[] serializeObject() throws Exception {
CompletableFuture<byte[]> future = new CompletableFuture<>();
SchemaRegistry registry = generateSchemaRegistry();
setSchemaManager(registry, generateDummySchemaManager());
registry.checkpointCoordinator(0, future);
while (!future.isDone()) {
Thread.sleep(1000);
}
return future.get();
}
@Override
public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
SchemaRegistry expected = generateSchemaRegistry();
setSchemaManager(expected, generateDummySchemaManager());
SchemaRegistry actual = generateSchemaRegistry();
actual.resetToCheckpoint(0, b);
return getSchemaManager(expected).equals(getSchemaManager(actual))
&& getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
}
}

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
/** Dummy classes for migration test. Called via reflection. */
public class TableChangeInfoMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public TableChangeInfo generateDummyObject() {
return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA);
}
@Override
public int getSerializerVersion() {
return TableChangeInfo.SERIALIZER.getVersion();
}
@Override
public byte[] serializeObject() throws Exception {
return TableChangeInfo.SERIALIZER.serialize(generateDummyObject());
}
@Override
public boolean deserializeAndCheckObject(int version, byte[] bytes) throws Exception {
TableChangeInfo expected = generateDummyObject();
TableChangeInfo actual = TableChangeInfo.SERIALIZER.deserialize(version, bytes);
return expected.getTableId().equals(actual.getTableId())
&& expected.getOriginalSchema().equals(actual.getOriginalSchema())
&& expected.getTransformedSchema().equals(actual.getTransformedSchema());
}
}

@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-release-snapshot</artifactId>
<name>flink-cdc-release-snapshot</name>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink-cdc</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.flink.cdc</pattern>
<shadedPattern>org.apache.flink.cdc.snapshot</shadedPattern>
<excludes>META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
/** Base classes for migration test cases. */
public interface MigrationMockBase {
int getSerializerVersion();
byte[] serializeObject() throws Exception;
boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaManagerMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummyObject() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
@Override
public int getSerializerVersion() {
return SchemaManager.SERIALIZER.getVersion();
}
@Override
public byte[] serializeObject() throws Exception {
return SchemaManager.SERIALIZER.serialize(generateDummyObject());
}
@Override
public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
Object expected = generateDummyObject();
Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
return expected.equals(actual);
}
}

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/** Dummy classes for migration test. Called via reflection. */
public class SchemaRegistryMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public SchemaManager generateDummySchemaManager() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}
public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>());
}
private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
return (SchemaManager) field.get(schemaRegistry);
}
private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
field.set(schemaRegistry, schemaManager);
}
private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation");
field.setAccessible(true);
return (SchemaDerivation) field.get(schemaRegistry);
}
private List<Tuple2<Selectors, TableId>> getSchemaRoutes(SchemaRegistry schemaRegistry)
throws Exception {
SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry);
Field field = SchemaDerivation.class.getDeclaredField("routes");
field.setAccessible(true);
return (List<Tuple2<Selectors, TableId>>) field.get(schemaDerivation);
}
@Override
public int getSerializerVersion() {
return -1;
}
@Override
public byte[] serializeObject() throws Exception {
CompletableFuture<byte[]> future = new CompletableFuture<>();
SchemaRegistry registry = generateSchemaRegistry();
setSchemaManager(registry, generateDummySchemaManager());
registry.checkpointCoordinator(0, future);
while (!future.isDone()) {
Thread.sleep(1000);
}
return future.get();
}
@Override
public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
SchemaRegistry expected = generateSchemaRegistry();
setSchemaManager(expected, generateDummySchemaManager());
SchemaRegistry actual = generateSchemaRegistry();
actual.resetToCheckpoint(0, b);
return getSchemaManager(expected).equals(getSchemaManager(actual))
&& getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
}
}

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.migration.tests;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
/** Dummy classes for migration test. Called via reflection. */
public class TableChangeInfoMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();
public TableChangeInfo generateDummyObject() {
return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA);
}
@Override
public int getSerializerVersion() {
return TableChangeInfo.SERIALIZER.getVersion();
}
@Override
public byte[] serializeObject() throws Exception {
return TableChangeInfo.SERIALIZER.serialize(generateDummyObject());
}
@Override
public boolean deserializeAndCheckObject(int version, byte[] bytes) throws Exception {
TableChangeInfo expected = generateDummyObject();
TableChangeInfo actual = TableChangeInfo.SERIALIZER.deserialize(version, bytes);
return expected.getTableId().equals(actual.getTableId())
&& expected.getOriginalSchema().equals(actual.getOriginalSchema())
&& expected.getTransformedSchema().equals(actual.getTransformedSchema());
}
}

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-parent</artifactId>
<version>${revision}</version>
</parent>
<artifactId>flink-cdc-migration-tests</artifactId>
<name>flink-cdc-migration-tests</name>
<packaging>pom</packaging>
<modules>
<module>flink-cdc-release-3.0.0</module>
<module>flink-cdc-release-3.0.1</module>
<module>flink-cdc-release-3.1.0</module>
<module>flink-cdc-release-snapshot</module>
<module>flink-cdc-migration-testcases</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

@ -172,7 +172,7 @@ public class SchemaManager {
/** Serializer for {@link SchemaManager}. */
public static class Serializer implements SimpleVersionedSerializer<SchemaManager> {
public static final int CURRENT_VERSION = 0;
public static final int CURRENT_VERSION = 1;
@Override
public int getVersion() {
@ -211,30 +211,40 @@ public class SchemaManager {
@Override
public SchemaManager deserialize(int version, byte[] serialized) throws IOException {
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
// Total schema length
int numTables = in.readInt();
Map<TableId, SortedMap<Integer, Schema>> tableSchemas = new HashMap<>(numTables);
for (int i = 0; i < numTables; i++) {
// Table ID
TableId tableId =
tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
// Schema with versions
int numVersions = in.readInt();
SortedMap<Integer, Schema> versionedSchemas = new TreeMap<>(Integer::compareTo);
for (int j = 0; j < numVersions; j++) {
// Version
int schemaVersion = in.readInt();
Schema schema =
schemaSerializer.deserialize(new DataInputViewStreamWrapper(in));
versionedSchemas.put(schemaVersion, schema);
switch (version) {
case 0:
case 1:
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
// Total schema length
int numTables = in.readInt();
Map<TableId, SortedMap<Integer, Schema>> tableSchemas =
new HashMap<>(numTables);
for (int i = 0; i < numTables; i++) {
// Table ID
TableId tableId =
tableIdSerializer.deserialize(
new DataInputViewStreamWrapper(in));
// Schema with versions
int numVersions = in.readInt();
SortedMap<Integer, Schema> versionedSchemas =
new TreeMap<>(Integer::compareTo);
for (int j = 0; j < numVersions; j++) {
// Version
int schemaVersion = in.readInt();
Schema schema =
schemaSerializer.deserialize(
version, new DataInputViewStreamWrapper(in));
versionedSchemas.put(schemaVersion, schema);
}
tableSchemas.put(tableId, versionedSchemas);
}
return new SchemaManager(tableSchemas);
}
tableSchemas.put(tableId, versionedSchemas);
}
return new SchemaManager(tableSchemas);
default:
throw new IOException("Unrecognized serialization version " + version);
}
}
}

@ -46,6 +46,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -196,18 +198,43 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
DataInputStream in = new DataInputStream(bais)) {
int schemaManagerSerializerVersion = in.readInt();
int length = in.readInt();
byte[] serializedSchemaManager = new byte[length];
in.readFully(serializedSchemaManager);
schemaManager =
SchemaManager.SERIALIZER.deserialize(
schemaManagerSerializerVersion, serializedSchemaManager);
Map<TableId, Set<TableId>> derivationMapping =
SchemaDerivation.deserializerDerivationMapping(in);
schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping);
requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation);
switch (schemaManagerSerializerVersion) {
case 0:
{
int length = in.readInt();
byte[] serializedSchemaManager = new byte[length];
in.readFully(serializedSchemaManager);
schemaManager =
SchemaManager.SERIALIZER.deserialize(
schemaManagerSerializerVersion, serializedSchemaManager);
schemaDerivation =
new SchemaDerivation(schemaManager, routes, Collections.emptyMap());
requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation);
break;
}
case 1:
{
int length = in.readInt();
byte[] serializedSchemaManager = new byte[length];
in.readFully(serializedSchemaManager);
schemaManager =
SchemaManager.SERIALIZER.deserialize(
schemaManagerSerializerVersion, serializedSchemaManager);
Map<TableId, Set<TableId>> derivationMapping =
SchemaDerivation.deserializerDerivationMapping(in);
schemaDerivation =
new SchemaDerivation(schemaManager, routes, derivationMapping);
requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation);
break;
}
default:
throw new IOException(
"Unrecognized serialization version " + schemaManagerSerializerVersion);
}
}
}

@ -109,7 +109,7 @@ public class TableChangeInfo {
/** Serializer for {@link TableChangeInfo}. */
public static class Serializer implements SimpleVersionedSerializer<TableChangeInfo> {
public static final int CURRENT_VERSION = 0;
public static final int CURRENT_VERSION = 1;
@Override
public int getVersion() {
@ -140,9 +140,9 @@ public class TableChangeInfo {
DataInputStream in = new DataInputStream(bais)) {
TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
Schema originalSchema =
schemaSerializer.deserialize(new DataInputViewStreamWrapper(in));
schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
Schema transformedSchema =
schemaSerializer.deserialize(new DataInputViewStreamWrapper(in));
schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
return TableChangeInfo.of(tableId, originalSchema, transformedSchema);
}
}

@ -89,15 +89,33 @@ public class SchemaSerializer extends TypeSerializerSingleton<Schema> {
stringSerializer.serialize(record.comment(), target);
}
private static final int CURRENT_VERSION = 1;
@Override
public Schema deserialize(DataInputView source) throws IOException {
return Schema.newBuilder()
.setColumns(columnsSerializer.deserialize(source))
.primaryKey(primaryKeysSerializer.deserialize(source))
.partitionKey(partitionKeysSerializer.deserialize(source))
.options(optionsSerializer.deserialize(source))
.comment(stringSerializer.deserialize(source))
.build();
return deserialize(CURRENT_VERSION, source);
}
public Schema deserialize(int version, DataInputView source) throws IOException {
switch (version) {
case 0:
return Schema.newBuilder()
.setColumns(columnsSerializer.deserialize(source))
.primaryKey(primaryKeysSerializer.deserialize(source))
.options(optionsSerializer.deserialize(source))
.comment(stringSerializer.deserialize(source))
.build();
case 1:
return Schema.newBuilder()
.setColumns(columnsSerializer.deserialize(source))
.primaryKey(primaryKeysSerializer.deserialize(source))
.partitionKey(partitionKeysSerializer.deserialize(source))
.options(optionsSerializer.deserialize(source))
.comment(stringSerializer.deserialize(source))
.build();
default:
throw new IOException("Unrecognized serialization version " + version);
}
}
@Override

@ -195,7 +195,9 @@ class SchemaManagerTest {
schemaManager.applySchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA));
schemaManager.applySchemaChange(new CreateTableEvent(PRODUCTS, PRODUCTS_SCHEMA));
byte[] serialized = SchemaManager.SERIALIZER.serialize(schemaManager);
SchemaManager deserialized = SchemaManager.SERIALIZER.deserialize(0, serialized);
SchemaManager deserialized =
SchemaManager.SERIALIZER.deserialize(
SchemaManager.Serializer.CURRENT_VERSION, serialized);
assertThat(deserialized).isEqualTo(schemaManager);
}
}

Loading…
Cancel
Save