diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/state/TableSchemaState.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/state/TableSchemaState.java new file mode 100644 index 000000000..00be31e3e --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/state/TableSchemaState.java @@ -0,0 +1,88 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.state; + +import org.apache.flink.api.connector.sink2.StatefulSink; + +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.schema.Schema; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * Use to persistent the relationship of TableId and Schema by {@link + * StatefulSink.StatefulSinkWriter}. + */ +public class TableSchemaState { + + @Nonnull private final TableId tableId; + + @Nonnull private final Schema schema; + + /** + * The splits are frequently serialized into checkpoints. Caching the byte representation makes + * repeated serialization cheap. This field is used by {@link TableSchemaStateSerializer}. + */ + @Nullable transient byte[] serializedFormCache; + + public TableSchemaState(TableId tableId, Schema schema) { + this.tableId = tableId; + this.schema = schema; + } + + public TableId getTableId() { + return tableId; + } + + public Schema getSchema() { + return schema; + } + + @Nullable + public byte[] getSerializedFormCache() { + return serializedFormCache; + } + + public void setSerializedFormCache(@Nullable byte[] serializedFormCache) { + this.serializedFormCache = serializedFormCache; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableSchemaState that = (TableSchemaState) o; + return tableId.equals(that.tableId) && schema.equals(that.schema); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, schema); + } + + @Override + public String toString() { + return "TableSchemaState{" + "tableId=" + tableId + ", schema=" + schema + '}'; + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/state/TableSchemaStateSerializer.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/state/TableSchemaStateSerializer.java new file mode 100644 index 000000000..bc1f29dc4 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/state/TableSchemaStateSerializer.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.state; + +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import com.ververica.cdc.runtime.serializer.TableIdSerializer; +import com.ververica.cdc.runtime.serializer.schema.SchemaSerializer; + +import java.io.IOException; + +/** A serializer for the {@link TableSchemaState}, use in {@link StatefulSink}. */ +public class TableSchemaStateSerializer implements SimpleVersionedSerializer { + + /** the default version for deserialize method. */ + public static final int DEFAULT_VERSION = 0; + + private static final ThreadLocal SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); + + @Override + public int getVersion() { + return DEFAULT_VERSION; + } + + @Override + public byte[] serialize(TableSchemaState state) throws IOException { + // optimization: the splits lazily cache their own serialized form + if (state.serializedFormCache != null) { + return state.serializedFormCache; + } + + final DataOutputSerializer out = SERIALIZER_CACHE.get(); + TableIdSerializer.INSTANCE.serialize(state.getTableId(), out); + SchemaSerializer.INSTANCE.serialize(state.getSchema(), out); + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + + // optimization: cache the serialized from, so we avoid the byte work during repeated + // serialization + state.serializedFormCache = result; + return result; + } + + @Override + public TableSchemaState deserialize(int version, byte[] serialized) throws IOException { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + TableSchemaState event = + new TableSchemaState( + TableIdSerializer.INSTANCE.deserialize(in), + SchemaSerializer.INSTANCE.deserialize(in)); + in.releaseArrays(); + return event; + } +} diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/state/TableSchemaStateSerializerTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/state/TableSchemaStateSerializerTest.java new file mode 100644 index 000000000..c03b2c599 --- /dev/null +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/state/TableSchemaStateSerializerTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.state; + +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.common.types.DataTypes; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** A test for the {@link TableSchemaStateSerializer}. */ +public class TableSchemaStateSerializerTest { + + @Test + public void testDeserialize() throws IOException { + TableSchemaStateSerializer serializer = new TableSchemaStateSerializer(); + Schema schema = + new Schema.Builder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("col2", DataTypes.STRING()) + .primaryKey("col1") + .build(); + TableSchemaState tableSchemaState = + new TableSchemaState(TableId.parse("default.default.table1"), schema); + byte[] bytes = serializer.serialize(tableSchemaState); + Assert.assertEquals( + tableSchemaState, + serializer.deserialize(TableSchemaStateSerializer.DEFAULT_VERSION, bytes)); + } +}