[3.0][cdc-common] Introduce BinaryRecordDataGenerator to help to create BinaryRecordData

This closes #2734.
pull/2737/head
Hang Ruan 1 year ago committed by Leonard Xu
parent cad7424a1d
commit 70614be28c

@ -138,7 +138,7 @@ public final class TimestampData implements Comparable<TimestampData> {
* number is the number of milliseconds before {@code 1970-01-01 00:00:00}
* @param nanosOfMillisecond the nanoseconds within the millisecond, from 0 to 999,999
*/
public static TimestampData fromEpochMillis(long milliseconds, int nanosOfMillisecond) {
public static TimestampData fromMillis(long milliseconds, int nanosOfMillisecond) {
return new TimestampData(milliseconds, nanosOfMillisecond);
}
@ -150,7 +150,7 @@ public final class TimestampData implements Comparable<TimestampData> {
* @param milliseconds the number of milliseconds since {@code 1970-01-01 00:00:00}; a negative
* number is the number of milliseconds before {@code 1970-01-01 00:00:00}
*/
public static TimestampData fromEpochMillis(long milliseconds) {
public static TimestampData fromMillis(long milliseconds) {
return new TimestampData(milliseconds, 0);
}

@ -43,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* An implementation of {@link RecordData} which is backed by {@link MemorySegment} instead of
* Object. It can significantly reduce the serialization/deserialization of Java objects.
*
* <p>A Row has two part: Fixed-length part and variable-length part.
* <p>A BinaryRecordData has two part: Fixed-length part and variable-length part.
*
* <p>Fixed-length part contains 1 byte header and null bit set and field values. Null bit set is
* used for null tracking and is aligned to 8-byte word boundaries. `Field values` holds
@ -67,7 +67,7 @@ public final class BinaryRecordData extends BinarySection implements RecordData,
private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : ~(0xFFL << 56L);
public static final int HEADER_SIZE_IN_BITS = 8;
public static final String TIMESTAMP_DELIMITER = "|";
public static final String TIMESTAMP_DELIMITER = "//";
public static int calculateBitSetWidthInBytes(int arity) {
return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
@ -78,7 +78,7 @@ public final class BinaryRecordData extends BinarySection implements RecordData,
}
/**
* If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place
* If it is a fixed-length field, we can call this BinaryRecordData's setXX method for in-place
* updates. If it is variable-length field, can't use this method, because the underlying data
* is stored continuously.
*/
@ -215,7 +215,7 @@ public final class BinaryRecordData extends BinarySection implements RecordData,
assertIndexIsValid(pos);
if (TimestampData.isCompact(precision)) {
return TimestampData.fromEpochMillis(segments[0].getLong(getFieldOffset(pos)));
return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
}
int fieldOffset = getFieldOffset(pos);
@ -319,7 +319,7 @@ public final class BinaryRecordData extends BinarySection implements RecordData,
if (this == o) {
return true;
}
// both BinaryRowData and NestedRowData have the same memory format
// both BinaryRecordData and NestedRowData have the same memory format
if (!(o instanceof BinaryRecordData)) {
return false;
}

@ -1015,7 +1015,7 @@ public final class BinarySegmentUtils {
final int nanoOfMillisecond = (int) offsetAndNanos;
final int subOffset = (int) (offsetAndNanos >> 32);
final long millisecond = getLong(segments, baseOffset + subOffset);
return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
return TimestampData.fromMillis(millisecond, nanoOfMillisecond);
}
/**

@ -29,7 +29,7 @@ import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.typeutils.RecordDataUtil;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import java.util.ArrayList;
import java.util.Collections;
@ -110,13 +110,13 @@ public class ValuesDataSourceHelper {
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
split1.add(createTableEvent);
RowType rowType = RowType.of(DataTypes.STRING(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
// insert
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -125,8 +125,7 @@ public class ValuesDataSourceHelper {
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
@ -135,8 +134,7 @@ public class ValuesDataSourceHelper {
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
@ -170,8 +168,7 @@ public class ValuesDataSourceHelper {
split1.add(
DataChangeEvent.deleteEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -181,14 +178,12 @@ public class ValuesDataSourceHelper {
split1.add(
DataChangeEvent.updateEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("")
}),
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("x")
@ -214,13 +209,13 @@ public class ValuesDataSourceHelper {
CreateTableEvent createTableEvent2 = new CreateTableEvent(table2, schema);
split1.add(createTableEvent2);
RowType rowType = RowType.of(DataTypes.STRING(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
// insert into table1
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -229,8 +224,7 @@ public class ValuesDataSourceHelper {
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
@ -239,8 +233,7 @@ public class ValuesDataSourceHelper {
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
@ -259,8 +252,7 @@ public class ValuesDataSourceHelper {
insertEvent1 =
DataChangeEvent.insertEvent(
table2,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -269,8 +261,7 @@ public class ValuesDataSourceHelper {
insertEvent2 =
DataChangeEvent.insertEvent(
table2,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
@ -279,8 +270,7 @@ public class ValuesDataSourceHelper {
insertEvent3 =
DataChangeEvent.insertEvent(
table2,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
@ -306,8 +296,7 @@ public class ValuesDataSourceHelper {
split1.add(
DataChangeEvent.deleteEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -317,14 +306,12 @@ public class ValuesDataSourceHelper {
split1.add(
DataChangeEvent.updateEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}),
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("x")

@ -33,7 +33,7 @@ import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.common.types.VarCharType;
import com.ververica.cdc.runtime.typeutils.RecordDataUtil;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -106,12 +106,12 @@ public class ValuesDatabaseTest {
createTableEvent = new CreateTableEvent(TableId.parse("table3"), schema);
metadataApplier.applySchemaChange(createTableEvent);
RowType rowType = RowType.of(DataTypes.STRING(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -120,8 +120,7 @@ public class ValuesDatabaseTest {
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
@ -130,8 +129,7 @@ public class ValuesDatabaseTest {
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
@ -224,12 +222,12 @@ public class ValuesDatabaseTest {
results.add("default.default.table1:col1=3;col2=3");
Assert.assertEquals(results, ValuesDatabase.getResults(table1));
RowType rowType = RowType.of(DataTypes.STRING(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent deleteEvent =
DataChangeEvent.deleteEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -243,14 +241,12 @@ public class ValuesDatabaseTest {
DataChangeEvent updateEvent =
DataChangeEvent.updateEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}),
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("x")

@ -33,8 +33,8 @@ import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.connectors.values.ValuesDatabase;
import com.ververica.cdc.connectors.values.factory.ValuesDataFactory;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import com.ververica.cdc.runtime.typeutils.EventTypeInfo;
import com.ververica.cdc.runtime.typeutils.RecordDataUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -149,12 +149,12 @@ public class ValuesDataSourceHelperTest {
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
split1.add(createTableEvent);
RowType rowType = RowType.of(DataTypes.STRING(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
@ -163,8 +163,7 @@ public class ValuesDataSourceHelperTest {
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
table1,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")

@ -54,12 +54,12 @@ public class TimestampDataSerializer extends TypeSerializer<TimestampData> {
@Override
public TimestampData createInstance() {
return TimestampData.fromEpochMillis(0, 0);
return TimestampData.fromMillis(0, 0);
}
@Override
public TimestampData copy(TimestampData from) {
return TimestampData.fromEpochMillis(from.getMillisecond(), from.getNanoOfMillisecond());
return TimestampData.fromMillis(from.getMillisecond(), from.getNanoOfMillisecond());
}
@Override
@ -87,11 +87,11 @@ public class TimestampDataSerializer extends TypeSerializer<TimestampData> {
public TimestampData deserialize(DataInputView source) throws IOException {
if (TimestampData.isCompact(precision)) {
long val = source.readLong();
return TimestampData.fromEpochMillis(val, 0);
return TimestampData.fromMillis(val, 0);
} else {
long longVal = source.readLong();
int intVal = source.readInt();
return TimestampData.fromEpochMillis(longVal, intVal);
return TimestampData.fromMillis(longVal, intVal);
}
}

@ -35,7 +35,6 @@ import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.data.binary.BinarySegmentUtils;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.runtime.serializer.data.ArrayDataSerializer;
import com.ververica.cdc.runtime.serializer.data.RecordDataSerializer;
import java.io.IOException;
import java.io.OutputStream;
@ -120,7 +119,7 @@ abstract class AbstractBinaryWriter implements BinaryWriter {
}
@Override
public void writeRecord(int pos, RecordData input, RecordDataSerializer serializer) {
public void writeRecord(int pos, RecordData input, TypeSerializer<RecordData> serializer) {
// BinaryRecordData is the only implementation of RecordData
BinaryRecordData recordData = (BinaryRecordData) input;
writeSegmentsToVarLenPart(

@ -33,7 +33,6 @@ import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.types.ZonedTimestampType;
import com.ververica.cdc.runtime.serializer.data.ArrayDataSerializer;
import com.ververica.cdc.runtime.serializer.data.RecordDataSerializer;
/**
* Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write
@ -79,7 +78,7 @@ public interface BinaryWriter {
void writeMap(int pos, MapData value, TypeSerializer<MapData> serializer);
void writeRecord(int pos, RecordData value, RecordDataSerializer serializer);
void writeRecord(int pos, RecordData value, TypeSerializer<RecordData> serializer);
/** Finally, complete write to set real size to binary. */
void complete();
@ -138,7 +137,7 @@ public interface BinaryWriter {
writer.writeMap(pos, (MapData) o, (TypeSerializer<MapData>) serializer);
break;
case ROW:
writer.writeRecord(pos, (RecordData) o, (RecordDataSerializer) serializer);
writer.writeRecord(pos, (RecordData) o, (TypeSerializer<RecordData>) serializer);
break;
case BINARY:
case VARBINARY:

@ -0,0 +1,95 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.typeutils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.serializer.InternalSerializers;
import com.ververica.cdc.runtime.serializer.NullableSerializerWrapper;
import com.ververica.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
import com.ververica.cdc.runtime.serializer.data.writer.BinaryWriter;
import java.util.Arrays;
import static com.ververica.cdc.common.utils.Preconditions.checkArgument;
/** This class is used to create {@link BinaryRecordData}. */
@PublicEvolving
public class BinaryRecordDataGenerator {
private final DataType[] dataTypes;
private final TypeSerializer[] serializers;
private transient BinaryRecordData reuseRecordData;
private transient BinaryRecordDataWriter reuseWriter;
public BinaryRecordDataGenerator(RowType recordType) {
this(recordType.getChildren().toArray(new DataType[0]));
}
public BinaryRecordDataGenerator(DataType[] dataTypes) {
this(
dataTypes,
Arrays.stream(dataTypes)
.map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new));
}
public BinaryRecordDataGenerator(DataType[] dataTypes, TypeSerializer[] serializers) {
checkArgument(
dataTypes.length == serializers.length,
String.format(
"The types and serializers must have the same length. But types is %d and serializers is %d",
dataTypes.length, serializers.length));
this.dataTypes = dataTypes;
this.serializers = serializers;
this.reuseRecordData = new BinaryRecordData(dataTypes.length);
this.reuseWriter = new BinaryRecordDataWriter(reuseRecordData);
}
/**
* Creates an instance of {@link BinaryRecordData} with given field values.
*
* <p>Note: All fields of the record must be internal data structures. See {@link RecordData}.
*/
public BinaryRecordData generate(Object[] rowFields) {
checkArgument(
dataTypes.length == rowFields.length,
String.format(
"The types and values must have the same length. But types is %d and values is %d",
dataTypes.length, rowFields.length));
reuseWriter.reset();
for (int i = 0; i < dataTypes.length; i++) {
if (rowFields[i] == null) {
reuseWriter.setNullAt(i);
} else {
BinaryWriter.write(reuseWriter, i, rowFields[i], dataTypes[i], serializers[i]);
}
}
reuseWriter.complete();
return reuseRecordData.copy();
}
}

@ -1,77 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.typeutils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.serializer.InternalSerializers;
import com.ververica.cdc.runtime.serializer.NullableSerializerWrapper;
import com.ververica.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
import com.ververica.cdc.runtime.serializer.data.writer.BinaryWriter;
import java.util.Arrays;
import static com.ververica.cdc.common.utils.Preconditions.checkArgument;
/** Utils to create {@link BinaryRecordData}. */
public class RecordDataUtil {
/**
* Creates an instance of {@link BinaryRecordData} with given field values and {@link RowType}.
*
* <p>Note: All fields of the record must be internal data structures.
*/
public static BinaryRecordData of(RowType rowType, Object[] values) {
return of(rowType.getChildren().toArray(new DataType[0]), values);
}
/**
* Creates an instance of {@link BinaryRecordData} with given field values and {@link
* DataType}s.
*
* <p>Note: All fields of the record must be internal data structures.
*/
public static BinaryRecordData of(DataType[] types, Object[] values) {
checkArgument(
types.length == values.length,
String.format(
"The types and values must have the same length. But types is %d and values is %d",
types.length, values.length));
TypeSerializer[] serializers =
Arrays.stream(types)
.map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new);
BinaryRecordData recordData = new BinaryRecordData(types.length);
BinaryRecordDataWriter writer = new BinaryRecordDataWriter(recordData);
writer.reset();
for (int i = 0; i < types.length; i++) {
if (values[i] == null) {
writer.setNullAt(i);
} else {
BinaryWriter.write(writer, i, values[i], types[i], serializers[i]);
}
}
writer.complete();
return recordData;
}
}

@ -27,7 +27,7 @@ import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.serializer.event.EventSerializer;
import com.ververica.cdc.runtime.typeutils.RecordDataUtil;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@ -40,7 +40,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** A test for the {@link SchemaOperator}. */
/** Unit tests for the {@link SchemaOperator}. */
public class SchemaOperatorTest {
@Test
void testProcessElement() throws Exception {
@ -60,24 +60,22 @@ public class SchemaOperatorTest {
Map<String, String> meta = new HashMap<>();
meta.put("subtask", String.valueOf(subtaskIndex));
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
List<Event> testData =
Arrays.asList(
DataChangeEvent.updateEvent(
tableId,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {1L, BinaryStringData.fromString("1")}),
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {2L, BinaryStringData.fromString("2")}),
meta),
DataChangeEvent.updateEvent(
tableId,
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {3L, BinaryStringData.fromString("3")}),
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {4L, BinaryStringData.fromString("4")}),
meta));
for (Event event : testData) {

@ -21,7 +21,7 @@ import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.serializer.SerializerTestBase;
import com.ververica.cdc.runtime.typeutils.RecordDataUtil;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
/** A test for the {@link StringDataSerializer}. */
public class RecordDataSerializerTest extends SerializerTestBase<RecordData> {
@ -43,11 +43,12 @@ public class RecordDataSerializerTest extends SerializerTestBase<RecordData> {
@Override
protected RecordData[] getTestData() {
RowType rowType = RowType.of(DataTypes.BIGINT(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.BIGINT(), DataTypes.STRING()));
return new RecordData[] {
RecordDataUtil.of(rowType, new Object[] {1L, BinaryStringData.fromString("test1")}),
RecordDataUtil.of(rowType, new Object[] {2L, BinaryStringData.fromString("test2")}),
RecordDataUtil.of(rowType, new Object[] {3L, null})
generator.generate(new Object[] {1L, BinaryStringData.fromString("test1")}),
generator.generate(new Object[] {2L, BinaryStringData.fromString("test2")}),
generator.generate(new Object[] {3L, null})
};
}
}

@ -42,17 +42,17 @@ abstract class TimestampDataSerializerTest extends SerializerTestBase<TimestampD
protected TimestampData[] getTestData() {
if (getPrecision() > 3) {
return new TimestampData[] {
TimestampData.fromEpochMillis(1, 1),
TimestampData.fromEpochMillis(2, 2),
TimestampData.fromEpochMillis(3, 3),
TimestampData.fromEpochMillis(4, 4)
TimestampData.fromMillis(1, 1),
TimestampData.fromMillis(2, 2),
TimestampData.fromMillis(3, 3),
TimestampData.fromMillis(4, 4)
};
} else {
return new TimestampData[] {
TimestampData.fromEpochMillis(1, 0),
TimestampData.fromEpochMillis(2, 0),
TimestampData.fromEpochMillis(3, 0),
TimestampData.fromEpochMillis(4, 0)
TimestampData.fromMillis(1, 0),
TimestampData.fromMillis(2, 0),
TimestampData.fromMillis(3, 0),
TimestampData.fromMillis(4, 0)
};
}
}

@ -25,7 +25,7 @@ import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.serializer.SerializerTestBase;
import com.ververica.cdc.runtime.typeutils.RecordDataUtil;
import com.ververica.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import java.util.HashMap;
import java.util.Map;
@ -52,18 +52,18 @@ public class DataChangeEventSerializerTest extends SerializerTestBase<DataChange
Map<String, String> meta = new HashMap<>();
meta.put("option", "meta1");
RowType rowType = RowType.of(DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING());
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()));
RecordData before =
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {
1L,
BinaryStringData.fromString("test"),
BinaryStringData.fromString("comment")
});
RecordData after =
RecordDataUtil.of(
rowType,
generator.generate(
new Object[] {1L, null, BinaryStringData.fromString("updateComment")});
return new DataChangeEvent[] {
DataChangeEvent.insertEvent(TableId.tableId("table"), after),

@ -0,0 +1,140 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.typeutils;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.LocalZonedTimestampData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.ZonedTimestampData;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.types.ZonedTimestampType;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for {@link BinaryRecordDataGenerator}. */
public class BinaryRecordDataGeneratorTest {
@Test
void testOf() {
RowType rowType =
RowType.of(
DataTypes.BOOLEAN(),
DataTypes.BINARY(3),
DataTypes.VARBINARY(10),
DataTypes.BYTES(),
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.INT(),
DataTypes.BIGINT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.DECIMAL(6, 3),
DataTypes.CHAR(5),
DataTypes.VARCHAR(10),
DataTypes.STRING(),
DataTypes.DATE(),
DataTypes.TIME(),
DataTypes.TIME(6),
DataTypes.TIMESTAMP(),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP_LTZ(),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_TZ(),
DataTypes.TIMESTAMP_TZ(3),
DataTypes.ROW(
DataTypes.FIELD("t1", DataTypes.STRING()),
DataTypes.FIELD("t2", DataTypes.BIGINT())),
DataTypes.STRING());
Object[] testData =
new Object[] {
true,
new byte[] {1, 2},
new byte[] {3, 4},
new byte[] {5, 6, 7},
(byte) 1,
(short) 2,
3,
4L,
5.1f,
6.2,
DecimalData.fromBigDecimal(new BigDecimal(7.123), 6, 3),
BinaryStringData.fromString("test1"),
BinaryStringData.fromString("test2"),
BinaryStringData.fromString("test3"),
100,
200,
300,
TimestampData.fromMillis(100, 1),
TimestampData.fromMillis(200, 0),
LocalZonedTimestampData.fromEpochMillis(300, 1),
LocalZonedTimestampData.fromEpochMillis(400),
ZonedTimestampData.of(500, 1, "UTC"),
ZonedTimestampData.of(600, 0, "UTC"),
new BinaryRecordDataGenerator(
RowType.of(DataTypes.STRING(), DataTypes.BIGINT()))
.generate(new Object[] {BinaryStringData.fromString("test"), 23L}),
null
};
BinaryRecordData actual = new BinaryRecordDataGenerator(rowType).generate(testData);
assertThat(actual.getBoolean(0)).isTrue();
assertThat(actual.getBinary(1)).containsExactly((byte[]) testData[1]);
assertThat(actual.getBinary(2)).containsExactly((byte[]) testData[2]);
assertThat(actual.getBinary(3)).containsExactly((byte[]) testData[3]);
assertThat(actual.getByte(4)).isEqualTo(testData[4]);
assertThat(actual.getShort(5)).isEqualTo(testData[5]);
assertThat(actual.getInt(6)).isEqualTo(testData[6]);
assertThat(actual.getLong(7)).isEqualTo(testData[7]);
assertThat(actual.getFloat(8)).isEqualTo(testData[8]);
assertThat(actual.getDouble(9)).isEqualTo(testData[9]);
assertThat(actual.getDecimal(10, 6, 3)).isEqualTo(testData[10]);
assertThat(actual.getString(11)).isEqualTo(BinaryStringData.fromString("test1"));
assertThat(actual.getString(12)).isEqualTo(BinaryStringData.fromString("test2"));
assertThat(actual.getString(13)).isEqualTo(BinaryStringData.fromString("test3"));
assertThat(actual.getInt(14)).isEqualTo(testData[14]);
assertThat(actual.getInt(15)).isEqualTo(testData[15]);
assertThat(actual.getInt(16)).isEqualTo(testData[16]);
assertThat(actual.getTimestamp(17, TimestampType.DEFAULT_PRECISION))
.isEqualTo(testData[17]);
assertThat(actual.getTimestamp(18, 3)).isEqualTo(testData[18]);
assertThat(actual.getLocalZonedTimestampData(19, LocalZonedTimestampType.DEFAULT_PRECISION))
.isEqualTo(testData[19]);
assertThat(actual.getLocalZonedTimestampData(20, 3)).isEqualTo(testData[20]);
assertThat(actual.getZonedTimestamp(21, ZonedTimestampType.DEFAULT_PRECISION))
.isEqualTo(testData[21]);
assertThat(actual.getZonedTimestamp(22, 3)).isEqualTo(testData[22]);
assertThat(actual.getRow(23, 2).getString(0))
.isEqualTo(BinaryStringData.fromString("test"));
assertThat(actual.getRow(23, 2).getLong(1)).isEqualTo(23L);
assertThat(actual.isNullAt(24)).isTrue();
}
}
Loading…
Cancel
Save