[3.0][cdc-common] Introduce TableSchemaState and its Serializer for DataSink to hold the table schemas.
parent
40184ae19f
commit
e88bea3514
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2023 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.ververica.cdc.runtime.state;
|
||||
|
||||
import org.apache.flink.api.connector.sink2.StatefulSink;
|
||||
|
||||
import com.ververica.cdc.common.event.TableId;
|
||||
import com.ververica.cdc.common.schema.Schema;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Use to persistent the relationship of TableId and Schema by {@link
|
||||
* StatefulSink.StatefulSinkWriter}.
|
||||
*/
|
||||
public class TableSchemaState {
|
||||
|
||||
@Nonnull private final TableId tableId;
|
||||
|
||||
@Nonnull private final Schema schema;
|
||||
|
||||
/**
|
||||
* The splits are frequently serialized into checkpoints. Caching the byte representation makes
|
||||
* repeated serialization cheap. This field is used by {@link TableSchemaStateSerializer}.
|
||||
*/
|
||||
@Nullable transient byte[] serializedFormCache;
|
||||
|
||||
public TableSchemaState(TableId tableId, Schema schema) {
|
||||
this.tableId = tableId;
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
public TableId getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public Schema getSchema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public byte[] getSerializedFormCache() {
|
||||
return serializedFormCache;
|
||||
}
|
||||
|
||||
public void setSerializedFormCache(@Nullable byte[] serializedFormCache) {
|
||||
this.serializedFormCache = serializedFormCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TableSchemaState that = (TableSchemaState) o;
|
||||
return tableId.equals(that.tableId) && schema.equals(that.schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(tableId, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TableSchemaState{" + "tableId=" + tableId + ", schema=" + schema + '}';
|
||||
}
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright 2023 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.ververica.cdc.runtime.state;
|
||||
|
||||
import org.apache.flink.api.connector.sink2.StatefulSink;
|
||||
import org.apache.flink.core.io.SimpleVersionedSerializer;
|
||||
import org.apache.flink.core.memory.DataInputDeserializer;
|
||||
import org.apache.flink.core.memory.DataOutputSerializer;
|
||||
|
||||
import com.ververica.cdc.runtime.serializer.TableIdSerializer;
|
||||
import com.ververica.cdc.runtime.serializer.schema.SchemaSerializer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** A serializer for the {@link TableSchemaState}, use in {@link StatefulSink}. */
|
||||
public class TableSchemaStateSerializer implements SimpleVersionedSerializer<TableSchemaState> {
|
||||
|
||||
/** the default version for deserialize method. */
|
||||
public static final int DEFAULT_VERSION = 0;
|
||||
|
||||
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
|
||||
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
|
||||
|
||||
@Override
|
||||
public int getVersion() {
|
||||
return DEFAULT_VERSION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(TableSchemaState state) throws IOException {
|
||||
// optimization: the splits lazily cache their own serialized form
|
||||
if (state.serializedFormCache != null) {
|
||||
return state.serializedFormCache;
|
||||
}
|
||||
|
||||
final DataOutputSerializer out = SERIALIZER_CACHE.get();
|
||||
TableIdSerializer.INSTANCE.serialize(state.getTableId(), out);
|
||||
SchemaSerializer.INSTANCE.serialize(state.getSchema(), out);
|
||||
final byte[] result = out.getCopyOfBuffer();
|
||||
out.clear();
|
||||
|
||||
// optimization: cache the serialized from, so we avoid the byte work during repeated
|
||||
// serialization
|
||||
state.serializedFormCache = result;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSchemaState deserialize(int version, byte[] serialized) throws IOException {
|
||||
final DataInputDeserializer in = new DataInputDeserializer(serialized);
|
||||
TableSchemaState event =
|
||||
new TableSchemaState(
|
||||
TableIdSerializer.INSTANCE.deserialize(in),
|
||||
SchemaSerializer.INSTANCE.deserialize(in));
|
||||
in.releaseArrays();
|
||||
return event;
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright 2023 Ververica Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.ververica.cdc.runtime.state;
|
||||
|
||||
import com.ververica.cdc.common.event.TableId;
|
||||
import com.ververica.cdc.common.schema.Schema;
|
||||
import com.ververica.cdc.common.types.DataTypes;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/** A test for the {@link TableSchemaStateSerializer}. */
|
||||
public class TableSchemaStateSerializerTest {
|
||||
|
||||
@Test
|
||||
public void testDeserialize() throws IOException {
|
||||
TableSchemaStateSerializer serializer = new TableSchemaStateSerializer();
|
||||
Schema schema =
|
||||
new Schema.Builder()
|
||||
.physicalColumn("col1", DataTypes.STRING())
|
||||
.physicalColumn("col2", DataTypes.STRING())
|
||||
.primaryKey("col1")
|
||||
.build();
|
||||
TableSchemaState tableSchemaState =
|
||||
new TableSchemaState(TableId.parse("default.default.table1"), schema);
|
||||
byte[] bytes = serializer.serialize(tableSchemaState);
|
||||
Assert.assertEquals(
|
||||
tableSchemaState,
|
||||
serializer.deserialize(TableSchemaStateSerializer.DEFAULT_VERSION, bytes));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue