[cdc-common] Introduce createFieldGetters method in SchemaUtils to build FieldGetters of given Schema. (#2762)

This closes #2762.
pull/2777/head
Kunni 1 year ago committed by GitHub
parent 99ffbe03d2
commit a81d4a16b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,7 @@
package com.ververica.cdc.common.utils;
import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.AddColumnEvent;
import com.ververica.cdc.common.event.AlterColumnTypeEvent;
import com.ververica.cdc.common.event.DropColumnEvent;
@ -34,6 +35,18 @@ import java.util.stream.Collectors;
@PublicEvolving
public class SchemaUtils {
/**
* create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from
* RecordData.
*/
public static List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size());
for (int i = 0; i < schema.getColumns().size(); i++) {
fieldGetters.add(RecordData.createFieldGetter(schema.getColumns().get(i).getType(), i));
}
return fieldGetters;
}
/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {

@ -50,7 +50,8 @@ public class ValuesDataFactory implements DataSourceFactory, DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
return new ValuesDataSink(
context.getConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY));
context.getConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY),
context.getConfiguration().get(ValuesDataSinkOptions.PRINT_ENABLED));
}
@Override

@ -16,12 +16,12 @@
package com.ververica.cdc.connectors.values.sink;
import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.api.connector.sink2.SinkWriter;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.ChangeEvent;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
@ -34,12 +34,8 @@ import com.ververica.cdc.common.sink.FlinkSinkProvider;
import com.ververica.cdc.common.sink.MetadataApplier;
import com.ververica.cdc.common.utils.SchemaUtils;
import com.ververica.cdc.connectors.values.ValuesDatabase;
import com.ververica.cdc.runtime.state.TableSchemaState;
import com.ververica.cdc.runtime.state.TableSchemaStateSerializer;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -51,13 +47,16 @@ public class ValuesDataSink implements DataSink, Serializable {
/** {@link ValuesDataSinkOptions#MATERIALIZED_IN_MEMORY}. */
private final boolean materializedInMemory;
public ValuesDataSink(boolean materializedInMemory) {
private final boolean print;
public ValuesDataSink(boolean materializedInMemory, boolean print) {
this.materializedInMemory = materializedInMemory;
this.print = print;
}
@Override
public EventSinkProvider getEventSinkProvider() {
return FlinkSinkProvider.of(new ValuesSink(materializedInMemory));
return FlinkSinkProvider.of(new ValuesSink(materializedInMemory, print));
}
@Override
@ -66,91 +65,82 @@ public class ValuesDataSink implements DataSink, Serializable {
}
/** an e2e {@link Sink} implementation that print all {@link DataChangeEvent} out. */
private static class ValuesSink implements StatefulSink<Event, TableSchemaState> {
private static class ValuesSink implements Sink<Event> {
private final boolean materializedInMemory;
public ValuesSink(boolean materializedInMemory) {
this.materializedInMemory = materializedInMemory;
}
private final boolean print;
@Override
public StatefulSinkWriter<Event, TableSchemaState> createWriter(InitContext context) {
final ValuesSinkWriter writer = new ValuesSinkWriter(materializedInMemory);
writer.open(context.getSubtaskId(), context.getNumberOfParallelSubtasks());
return writer;
}
@Override
public StatefulSinkWriter<Event, TableSchemaState> restoreWriter(
InitContext context, Collection<TableSchemaState> recoveredState) {
final ValuesSinkWriter writer = new ValuesSinkWriter(materializedInMemory);
writer.initializeState(recoveredState);
return writer;
public ValuesSink(boolean materializedInMemory, boolean print) {
this.materializedInMemory = materializedInMemory;
this.print = print;
}
@Override
public SimpleVersionedSerializer<TableSchemaState> getWriterStateSerializer() {
return new TableSchemaStateSerializer();
public SinkWriter<Event> createWriter(InitContext context) {
return new ValuesSinkWriter(materializedInMemory, print);
}
}
/**
* Print {@link DataChangeEvent} to console, and update table records in {@link ValuesDatabase}.
*/
private static class ValuesSinkWriter extends PrintSinkOutputWriter<Event>
implements StatefulSink.StatefulSinkWriter<Event, TableSchemaState> {
private static class ValuesSinkWriter implements SinkWriter<Event> {
private final boolean materializedInMemory;
private final boolean print;
/**
* keep the relationship of TableId and Schema as write method may rely on the schema
* information of DataChangeEvent.
*/
private final Map<TableId, Schema> schemaMaps;
public ValuesSinkWriter(boolean materializedInMemory) {
private final Map<TableId, List<RecordData.FieldGetter>> fieldGetterMaps;
public ValuesSinkWriter(boolean materializedInMemory, boolean print) {
super();
this.materializedInMemory = materializedInMemory;
this.print = print;
schemaMaps = new HashMap<>();
}
private void initializeState(Collection<TableSchemaState> bucketStates) {
bucketStates.forEach(
TableSchemaState ->
schemaMaps.put(
TableSchemaState.getTableId(), TableSchemaState.getSchema()));
fieldGetterMaps = new HashMap<>();
}
@Override
public void write(Event event) {
super.write(event);
public void write(Event event, Context context) {
if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
TableId tableId = schemaChangeEvent.tableId();
if (event instanceof CreateTableEvent) {
schemaMaps.put(tableId, ((CreateTableEvent) event).getSchema());
Schema schema = ((CreateTableEvent) event).getSchema();
schemaMaps.put(tableId, schema);
fieldGetterMaps.put(tableId, SchemaUtils.createFieldGetters(schema));
} else {
if (!schemaMaps.containsKey(tableId)) {
throw new RuntimeException("schema of " + tableId + " is not existed.");
}
schemaMaps.put(
tableId,
Schema schema =
SchemaUtils.applySchemaChangeEvent(
schemaMaps.get(tableId), schemaChangeEvent));
schemaMaps.get(tableId), schemaChangeEvent);
schemaMaps.put(tableId, schema);
fieldGetterMaps.put(tableId, SchemaUtils.createFieldGetters(schema));
}
} else if (materializedInMemory && event instanceof DataChangeEvent) {
ValuesDatabase.applyDataChangeEvent((DataChangeEvent) event);
}
if (print) {
// print the detail message to console for verification.
System.out.println(
ValuesDataSinkHelper.convertEventToStr(
event, fieldGetterMaps.get(((ChangeEvent) event).tableId())));
}
}
@Override
public List<TableSchemaState> snapshotState(long checkpointId) {
List<TableSchemaState> states = new ArrayList<>();
for (Map.Entry<TableId, Schema> entry : schemaMaps.entrySet()) {
states.add(new TableSchemaState(entry.getKey(), entry.getValue()));
}
return states;
}
public void flush(boolean endOfInput) {}
@Override
public void close() {}
}
}

@ -0,0 +1,65 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.values.sink;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import java.util.ArrayList;
import java.util.List;
/** A helper class for {@link ValuesDataSink} to process {@link Event}. */
public class ValuesDataSinkHelper {
/** convert Event to String for {@link ValuesDataSink} to display detail message. */
public static String convertEventToStr(Event event, List<RecordData.FieldGetter> fieldGetters) {
if (event instanceof SchemaChangeEvent) {
return event.toString();
} else if (event instanceof DataChangeEvent) {
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
String eventStr =
"DataChangeEvent{"
+ "tableId="
+ dataChangeEvent.tableId()
+ ", before="
+ getFields(fieldGetters, dataChangeEvent.before())
+ ", after="
+ getFields(fieldGetters, dataChangeEvent.after())
+ ", op="
+ dataChangeEvent.op()
+ ", meta="
+ dataChangeEvent.describeMeta()
+ '}';
return eventStr;
}
return "Event{}";
}
private static List<Object> getFields(
List<RecordData.FieldGetter> fieldGetters, RecordData recordData) {
List<Object> fields = new ArrayList<>(fieldGetters.size());
if (recordData == null) {
return fields;
}
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
fields.add(fieldGetter.getFieldOrNull(recordData));
}
return fields;
}
}

@ -28,4 +28,10 @@ public class ValuesDataSinkOptions {
.defaultValue(false)
.withDescription(
"True if the DataChangeEvent need to be materialized in memory.");
public static final ConfigOption<Boolean> PRINT_ENABLED =
ConfigOptions.key("print.enabled")
.booleanType()
.defaultValue(true)
.withDescription("True if the Event should be print to console.");
}

@ -0,0 +1,94 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.values.sink;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.common.utils.SchemaUtils;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
/** A test for the {@link ValuesDataSinkHelper}. */
public class ValuesDataSinkHelperTest {
@Test
public void testConvertEventToStr() {
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
TableId tableId = TableId.parse("default.default.table1");
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
List<RecordData.FieldGetter> fieldGetters = SchemaUtils.createFieldGetters(schema);
Assert.assertEquals(
"CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
ValuesDataSinkHelper.convertEventToStr(
new CreateTableEvent(tableId, schema), fieldGetters));
DataChangeEvent insertEvent =
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
Assert.assertEquals(
"DataChangeEvent{tableId=default.default.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
ValuesDataSinkHelper.convertEventToStr(insertEvent, fieldGetters));
DataChangeEvent deleteEvent =
DataChangeEvent.deleteEvent(
tableId,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
Assert.assertEquals(
"DataChangeEvent{tableId=default.default.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
ValuesDataSinkHelper.convertEventToStr(deleteEvent, fieldGetters));
DataChangeEvent updateEvent =
DataChangeEvent.updateEvent(
tableId,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}),
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("x")
}));
Assert.assertEquals(
"DataChangeEvent{tableId=default.default.table1, before=[1, 1], after=[1, x], op=UPDATE, meta=()}",
ValuesDataSinkHelper.convertEventToStr(updateEvent, fieldGetters));
}
}

@ -1,88 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.state;
import org.apache.flink.api.connector.sink2.StatefulSink;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Use to persistent the relationship of TableId and Schema by {@link
* StatefulSink.StatefulSinkWriter}.
*/
public class TableSchemaState {
@Nonnull private final TableId tableId;
@Nonnull private final Schema schema;
/**
* The splits are frequently serialized into checkpoints. Caching the byte representation makes
* repeated serialization cheap. This field is used by {@link TableSchemaStateSerializer}.
*/
@Nullable transient byte[] serializedFormCache;
public TableSchemaState(TableId tableId, Schema schema) {
this.tableId = tableId;
this.schema = schema;
}
public TableId getTableId() {
return tableId;
}
public Schema getSchema() {
return schema;
}
@Nullable
public byte[] getSerializedFormCache() {
return serializedFormCache;
}
public void setSerializedFormCache(@Nullable byte[] serializedFormCache) {
this.serializedFormCache = serializedFormCache;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableSchemaState that = (TableSchemaState) o;
return tableId.equals(that.tableId) && schema.equals(that.schema);
}
@Override
public int hashCode() {
return Objects.hash(tableId, schema);
}
@Override
public String toString() {
return "TableSchemaState{" + "tableId=" + tableId + ", schema=" + schema + '}';
}
}

@ -1,72 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.state;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.runtime.serializer.TableIdSerializer;
import com.ververica.cdc.runtime.serializer.schema.SchemaSerializer;
import java.io.IOException;
/** A serializer for the {@link TableSchemaState}, use in {@link StatefulSink}. */
public class TableSchemaStateSerializer implements SimpleVersionedSerializer<TableSchemaState> {
/** the default version for deserialize method. */
public static final int DEFAULT_VERSION = 0;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@Override
public int getVersion() {
return DEFAULT_VERSION;
}
@Override
public byte[] serialize(TableSchemaState state) throws IOException {
// optimization: the splits lazily cache their own serialized form
if (state.serializedFormCache != null) {
return state.serializedFormCache;
}
final DataOutputSerializer out = SERIALIZER_CACHE.get();
TableIdSerializer.INSTANCE.serialize(state.getTableId(), out);
SchemaSerializer.INSTANCE.serialize(state.getSchema(), out);
final byte[] result = out.getCopyOfBuffer();
out.clear();
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
state.serializedFormCache = result;
return result;
}
@Override
public TableSchemaState deserialize(int version, byte[] serialized) throws IOException {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
TableSchemaState event =
new TableSchemaState(
TableIdSerializer.INSTANCE.deserialize(in),
SchemaSerializer.INSTANCE.deserialize(in));
in.releaseArrays();
return event;
}
}

@ -1,46 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.state;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/** A test for the {@link TableSchemaStateSerializer}. */
public class TableSchemaStateSerializerTest {
@Test
public void testDeserialize() throws IOException {
TableSchemaStateSerializer serializer = new TableSchemaStateSerializer();
Schema schema =
new Schema.Builder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
TableSchemaState tableSchemaState =
new TableSchemaState(TableId.parse("default.default.table1"), schema);
byte[] bytes = serializer.serialize(tableSchemaState);
Assert.assertEquals(
tableSchemaState,
serializer.deserialize(TableSchemaStateSerializer.DEFAULT_VERSION, bytes));
}
}
Loading…
Cancel
Save