[FLINK-35893][cdc-runtime] Write CURRENT_VERSION of TableChangeInfo to state

This closes #2944.
pull/3516/head
Leonard Xu 6 months ago committed by Leonard Xu
parent 4561a8a32b
commit 03a2ae3ca7

@ -109,8 +109,14 @@ public class TableChangeInfo {
/** Serializer for {@link TableChangeInfo}. */
public static class Serializer implements SimpleVersionedSerializer<TableChangeInfo> {
/** The latest version before change of state compatibility. */
public static final int VERSION_BEFORE_STATE_COMPATIBILITY = 1;
public static final int CURRENT_VERSION = 2;
/** Used to distinguish with the state which CURRENT_VERSION was not written. */
public static final TableId MAGIC_TABLE_ID = TableId.tableId("__magic_table__");
@Override
public int getVersion() {
return CURRENT_VERSION;
@ -122,6 +128,8 @@ public class TableChangeInfo {
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
tableIdSerializer.serialize(MAGIC_TABLE_ID, new DataOutputViewStreamWrapper(out));
out.writeInt(CURRENT_VERSION);
tableIdSerializer.serialize(
tableChangeInfo.getTableId(), new DataOutputViewStreamWrapper(out));
schemaSerializer.serialize(
@ -139,6 +147,12 @@ public class TableChangeInfo {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
if (tableId.equals(MAGIC_TABLE_ID)) {
version = in.readInt();
tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
} else {
version = VERSION_BEFORE_STATE_COMPATIBILITY;
}
Schema originalSchema =
schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
Schema transformedSchema =

Loading…
Cancel
Save