[FLINK-37017][cdc-common] Supports map and array types for binary record data

This closes #3434.
pull/3830/merge
Umesh Dangat 4 weeks ago committed by GitHub
parent 6a345aaa80
commit baba9c6ae6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,623 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.data.binary;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import java.lang.reflect.Array;
import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
/**
* A binary implementation of {@link ArrayData} which is backed by {@link MemorySegment}s.
*
* <p>This class provides a way to store array data in a binary format that is compact and
* efficient. It uses {@link MemorySegment}s to manage the binary representation of the data,
* allowing for efficient storage and access.
*
* <p>The binary layout of {@link BinaryArrayData} is structured as follows:
*
* <pre>
* [size(int)] + [null bits(4-byte word boundaries)] + [values or offset&length] + [variable length part].
* </pre>
*
* <ul>
* <li><b>size:</b> The first 4 bytes store the number of elements in the array.
* <li><b>null bits:</b> A bitmap to track null values, aligned to 4-byte word boundaries. Each
* bit represents whether an element is null.
* <li><b>values or offset&length:</b> The values of the array elements. For fixed-length
* primitive types, the values are stored directly. For variable-length types (e.g., strings,
* maps), this part stores the offset and length of the actual data in the variable length
* part.
* <li><b>variable length part:</b> This part of the memory segment stores the actual data for
* variable-length types (e.g., strings, maps).
* </ul>
*
* <p>The header size is calculated based on the number of elements in the array, ensuring efficient
* alignment and access.
*
* <p>For fields that hold fixed-length primitive types, such as long, double, or int, they are
* stored compactly in bytes, just like the original Java array.
*
* <p>The class also provides methods to convert the binary data back into Java primitive arrays,
* handling various types such as boolean, byte, short, int, long, float, and double.
*/
public final class BinaryArrayData extends BinarySection implements ArrayData {
/** Offset for Arrays. */
private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class);
private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class);
private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class);
private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class);
private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class);
private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class);
/**
* Calculates the size of the header in bytes for an array with the specified number of fields.
*
* <p>The header consists of:
*
* <ul>
* <li>4 bytes to store the size of the array (number of elements).
* <li>A bitmap to track null values, where each bit represents whether an element is null.
* This bitmap is aligned to 4-byte word boundaries for efficient memory access and to
* facilitate the use of bitwise operations.
* </ul>
*
* <p>The size of the bitmap is determined by the number of elements in the array:
*
* <ul>
* <li>Each element requires 1 bit in the bitmap.
* <li>The total number of bits is rounded up to the nearest multiple of 32 to ensure
* alignment to 4-byte word boundaries (i.e., a 32-bit integer).
* </ul>
*
* <p>The formula for calculating the size of the header is:
*
* <pre>
* header size = 4 bytes (for array size) + ((numFields + 31) / 32) * 4 bytes (for null bitmap)
* </pre>
*
* @param numFields the number of elements in the array
* @return the size of the header in bytes
*/
public static int calculateHeaderInBytes(int numFields) {
return 4 + ((numFields + 31) / 32) * 4;
}
/**
* It store real value when type is primitive. It store the length and offset of variable-length
* part when type is string, map, etc.
*/
public static int calculateFixLengthPartSize(DataType type) {
// ordered by type root definition
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
return 1;
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
case DECIMAL:
case BIGINT:
case DOUBLE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case ARRAY:
case MAP:
case ROW:
// long and double are 8 bytes;
// otherwise it stores the length and offset of the variable-length part for types
// such as is string, map, etc.
return 8;
case TIMESTAMP_WITH_TIME_ZONE:
throw new UnsupportedOperationException();
case SMALLINT:
return 2;
case INTEGER:
case FLOAT:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return 4;
default:
throw new IllegalArgumentException();
}
}
// The number of elements in this array
private int size;
/** The position to start storing array elements. */
private int elementOffset;
public BinaryArrayData() {}
private void assertIndexIsValid(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < size : "index (" + index + ") should < " + size;
}
private int getElementOffset(int ordinal, int elementSize) {
return elementOffset + ordinal * elementSize;
}
@Override
public int size() {
return size;
}
@Override
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
// Read the number of elements from the first 4 bytes.
final int size = BinarySegmentUtils.getInt(segments, offset);
assert size >= 0 : "size (" + size + ") should >= 0";
this.size = size;
super.pointTo(segments, offset, sizeInBytes);
this.elementOffset = offset + calculateHeaderInBytes(this.size);
}
@Override
public boolean isNullAt(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.bitGet(segments, offset + 4, pos);
}
public void setNullAt(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
}
public void setNotNullAt(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitUnSet(segments, offset + 4, pos);
}
@Override
public long getLong(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getLong(segments, getElementOffset(pos, 8));
}
public void setLong(int pos, long value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setLong(segments, getElementOffset(pos, 8), value);
}
public void setNullLong(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setLong(segments, getElementOffset(pos, 8), 0L);
}
@Override
public int getInt(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getInt(segments, getElementOffset(pos, 4));
}
public void setInt(int pos, int value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setInt(segments, getElementOffset(pos, 4), value);
}
public void setNullInt(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setInt(segments, getElementOffset(pos, 4), 0);
}
@Override
public StringData getString(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset);
return BinarySegmentUtils.readStringData(segments, offset, fieldOffset, offsetAndSize);
}
@Override
public DecimalData getDecimal(int pos, int precision, int scale) {
assertIndexIsValid(pos);
if (DecimalData.isCompact(precision)) {
return DecimalData.fromUnscaledLong(
BinarySegmentUtils.getLong(segments, getElementOffset(pos, 8)),
precision,
scale);
}
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset);
return BinarySegmentUtils.readDecimalData(
segments, offset, offsetAndSize, precision, scale);
}
@Override
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);
if (TimestampData.isCompact(precision)) {
return TimestampData.fromMillis(
BinarySegmentUtils.getLong(segments, getElementOffset(pos, 8)));
}
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndNanoOfMilli = BinarySegmentUtils.getLong(segments, fieldOffset);
return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli);
}
@Override
public LocalZonedTimestampData getLocalZonedTimestamp(int pos, int precision) {
throw new UnsupportedOperationException("Not support LocalZonedTimestampData");
}
@Override
public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
throw new UnsupportedOperationException("Not support ZonedTimestampData");
}
@Override
public byte[] getBinary(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset);
return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
}
@Override
public ArrayData getArray(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos));
}
@Override
public MapData getMap(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readMapData(segments, offset, getLong(pos));
}
@Override
public RecordData getRecord(int pos, int numFields) {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset);
return BinarySegmentUtils.readRecordData(segments, numFields, offset, offsetAndSize);
}
@Override
public boolean getBoolean(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getBoolean(segments, getElementOffset(pos, 1));
}
public void setBoolean(int pos, boolean value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setBoolean(segments, getElementOffset(pos, 1), value);
}
public void setNullBoolean(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setBoolean(segments, getElementOffset(pos, 1), false);
}
@Override
public byte getByte(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getByte(segments, getElementOffset(pos, 1));
}
public void setByte(int pos, byte value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setByte(segments, getElementOffset(pos, 1), value);
}
public void setNullByte(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setByte(segments, getElementOffset(pos, 1), (byte) 0);
}
@Override
public short getShort(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getShort(segments, getElementOffset(pos, 2));
}
public void setShort(int pos, short value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setShort(segments, getElementOffset(pos, 2), value);
}
public void setNullShort(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setShort(segments, getElementOffset(pos, 2), (short) 0);
}
@Override
public float getFloat(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getFloat(segments, getElementOffset(pos, 4));
}
public void setFloat(int pos, float value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setFloat(segments, getElementOffset(pos, 4), value);
}
public void setNullFloat(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setFloat(segments, getElementOffset(pos, 4), 0F);
}
@Override
public double getDouble(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.getDouble(segments, getElementOffset(pos, 8));
}
public void setDouble(int pos, double value) {
assertIndexIsValid(pos);
setNotNullAt(pos);
BinarySegmentUtils.setDouble(segments, getElementOffset(pos, 8), value);
}
public void setNullDouble(int pos) {
assertIndexIsValid(pos);
BinarySegmentUtils.bitSet(segments, offset + 4, pos);
BinarySegmentUtils.setDouble(segments, getElementOffset(pos, 8), 0.0);
}
public void setDecimal(int pos, DecimalData value, int precision) {
assertIndexIsValid(pos);
if (DecimalData.isCompact(precision)) {
// compact format
setLong(pos, value.toUnscaledLong());
} else {
int fieldOffset = getElementOffset(pos, 8);
int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
assert cursor > 0 : "invalid cursor " + cursor;
// zero-out the bytes
BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
BinarySegmentUtils.setLong(segments, offset + cursor + 8, 0L);
if (value == null) {
setNullAt(pos);
// keep the offset for future update
BinarySegmentUtils.setLong(segments, fieldOffset, ((long) cursor) << 32);
} else {
byte[] bytes = value.toUnscaledBytes();
assert (bytes.length <= 16);
// Write the bytes to the variable length portion.
BinarySegmentUtils.copyFromBytes(segments, offset + cursor, bytes, 0, bytes.length);
setLong(pos, ((long) cursor << 32) | ((long) bytes.length));
}
}
}
public void setTimestamp(int pos, TimestampData value, int precision) {
assertIndexIsValid(pos);
if (TimestampData.isCompact(precision)) {
setLong(pos, value.getMillisecond());
} else {
int fieldOffset = getElementOffset(pos, 8);
int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
assert cursor > 0 : "invalid cursor " + cursor;
if (value == null) {
setNullAt(pos);
// zero-out the bytes
BinarySegmentUtils.setLong(segments, offset + cursor, 0L);
// keep the offset for future update
BinarySegmentUtils.setLong(segments, fieldOffset, ((long) cursor) << 32);
} else {
// write millisecond to the variable length portion.
BinarySegmentUtils.setLong(segments, offset + cursor, value.getMillisecond());
// write nanoOfMillisecond to the fixed-length portion.
setLong(pos, ((long) cursor << 32) | (long) value.getNanoOfMillisecond());
}
}
}
public boolean anyNull() {
for (int i = offset + 4; i < elementOffset; i += 4) {
if (BinarySegmentUtils.getInt(segments, i) != 0) {
return true;
}
}
return false;
}
private void checkNoNull() {
if (anyNull()) {
throw new RuntimeException("Primitive array must not contain a null value.");
}
}
@Override
public boolean[] toBooleanArray() {
checkNoNull();
boolean[] values = new boolean[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, BOOLEAN_ARRAY_OFFSET, size);
return values;
}
@Override
public byte[] toByteArray() {
checkNoNull();
byte[] values = new byte[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, BYTE_ARRAY_BASE_OFFSET, size);
return values;
}
@Override
public short[] toShortArray() {
checkNoNull();
short[] values = new short[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, SHORT_ARRAY_OFFSET, size * 2);
return values;
}
@Override
public int[] toIntArray() {
checkNoNull();
int[] values = new int[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, INT_ARRAY_OFFSET, size * 4);
return values;
}
@Override
public long[] toLongArray() {
checkNoNull();
long[] values = new long[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, LONG_ARRAY_OFFSET, size * 8);
return values;
}
@Override
public float[] toFloatArray() {
checkNoNull();
float[] values = new float[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, FLOAT_ARRAY_OFFSET, size * 4);
return values;
}
@Override
public double[] toDoubleArray() {
checkNoNull();
double[] values = new double[size];
BinarySegmentUtils.copyToUnsafe(
segments, elementOffset, values, DOUBLE_ARRAY_OFFSET, size * 8);
return values;
}
@SuppressWarnings("unchecked")
public <T> T[] toObjectArray(DataType elementType) {
Class<T> elementClass = (Class<T>) DataTypeUtils.toInternalConversionClass(elementType);
ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
T[] values = (T[]) Array.newInstance(elementClass, size);
for (int i = 0; i < size; i++) {
if (!isNullAt(i)) {
values[i] = (T) elementGetter.getElementOrNull(this, i);
}
}
return values;
}
public BinaryArrayData copy() {
return copy(new BinaryArrayData());
}
public BinaryArrayData copy(BinaryArrayData reuse) {
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
return reuse;
}
@Override
public int hashCode() {
return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
}
// ------------------------------------------------------------------------------------------
// Construction Utilities
// ------------------------------------------------------------------------------------------
public static BinaryArrayData fromPrimitiveArray(boolean[] arr) {
return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, arr.length, 1);
}
public static BinaryArrayData fromPrimitiveArray(byte[] arr) {
return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, arr.length, 1);
}
public static BinaryArrayData fromPrimitiveArray(short[] arr) {
return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2);
}
public static BinaryArrayData fromPrimitiveArray(int[] arr) {
return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
}
public static BinaryArrayData fromPrimitiveArray(long[] arr) {
return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 8);
}
public static BinaryArrayData fromPrimitiveArray(float[] arr) {
return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 4);
}
public static BinaryArrayData fromPrimitiveArray(double[] arr) {
return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 8);
}
private static BinaryArrayData fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderInBytes(length);
final long valueRegionInBytes = elementSize * length;
// must align by 8 bytes
long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
throw new UnsupportedOperationException(
"Cannot convert this array to unsafe format as " + "it's too big.");
}
long totalSize = totalSizeInLongs * 8;
final byte[] data = new byte[(int) totalSize];
UNSAFE.putInt(data, (long) BYTE_ARRAY_BASE_OFFSET, length);
UNSAFE.copyMemory(
arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes);
BinaryArrayData result = new BinaryArrayData();
result.pointTo(MemorySegmentFactory.wrap(data), 0, (int) totalSize);
return result;
}
}

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.common.data.binary;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import java.util.HashMap;
import java.util.Map;
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
/**
* [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
*
* <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
*/
@Internal
public class BinaryMapData extends BinarySection implements MapData {
private final BinaryArrayData keys;
private final BinaryArrayData values;
public BinaryMapData() {
keys = new BinaryArrayData();
values = new BinaryArrayData();
}
public int size() {
return keys.size();
}
@Override
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
// Read the numBytes of key array from the first 4 bytes.
final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";
keys.pointTo(segments, offset + 4, keyArrayBytes);
values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
assert keys.size() == values.size();
this.segments = segments;
this.offset = offset;
this.sizeInBytes = sizeInBytes;
}
public BinaryArrayData keyArray() {
return keys;
}
public BinaryArrayData valueArray() {
return values;
}
public Map<?, ?> toJavaMap(DataType keyType, DataType valueType) {
Object[] keyArray = keys.toObjectArray(keyType);
Object[] valueArray = values.toObjectArray(valueType);
Map<Object, Object> map = new HashMap<>();
for (int i = 0; i < keyArray.length; i++) {
map.put(keyArray[i], valueArray[i]);
}
return map;
}
public BinaryMapData copy() {
return copy(new BinaryMapData());
}
public BinaryMapData copy(BinaryMapData reuse) {
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
return reuse;
}
@Override
public int hashCode() {
return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
}
// ------------------------------------------------------------------------------------------
// Construction Utilities
// ------------------------------------------------------------------------------------------
public static BinaryMapData valueOf(BinaryArrayData key, BinaryArrayData value) {
checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
MemorySegment segment = MemorySegmentFactory.wrap(bytes);
segment.putInt(0, key.sizeInBytes);
key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
value.getSegments()[0].copyTo(
value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
BinaryMapData map = new BinaryMapData();
map.pointTo(segment, 0, bytes.length);
return map;
}
}

@ -200,12 +200,14 @@ public final class BinaryRecordData extends BinarySection implements RecordData,
@Override @Override
public ArrayData getArray(int pos) { public ArrayData getArray(int pos) {
throw new UnsupportedOperationException("Not support ArrayData"); assertIndexIsValid(pos);
return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos));
} }
@Override @Override
public MapData getMap(int pos) { public MapData getMap(int pos) {
throw new UnsupportedOperationException("Not support MapData."); assertIndexIsValid(pos);
return BinarySegmentUtils.readMapData(segments, offset, getLong(pos));
} }
@Override @Override

@ -18,8 +18,10 @@
package org.apache.flink.cdc.common.data.binary; package org.apache.flink.cdc.common.data.binary;
import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData; import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.TimestampData;
@ -318,7 +320,7 @@ public final class BinarySegmentUtils {
} }
} }
static boolean equalsMultiSegments( public static boolean equalsMultiSegments(
MemorySegment[] segments1, MemorySegment[] segments1,
int offset1, int offset1,
MemorySegment[] segments2, MemorySegment[] segments2,
@ -1154,4 +1156,24 @@ public final class BinarySegmentUtils {
} }
return -1; return -1;
} }
/** Gets an instance of {@link MapData} from underlying {@link MemorySegment}. */
public static MapData readMapData(
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
BinaryMapData map = new BinaryMapData();
map.pointTo(segments, offset + baseOffset, size);
return map;
}
/** Gets an instance of {@link ArrayData} from underlying {@link MemorySegment}. */
public static ArrayData readArrayData(
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
BinaryArrayData array = new BinaryArrayData();
array.pointTo(segments, offset + baseOffset, size);
return array;
}
} }

@ -17,20 +17,31 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2; package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks; import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal; import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.Timestamp;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowKind;
import java.nio.ByteBuffer;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -118,7 +129,11 @@ public class PaimonWriterHelper {
break; break;
case ROW: case ROW:
final int rowFieldCount = getFieldCount(fieldType); final int rowFieldCount = getFieldCount(fieldType);
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount); fieldGetter = new BinaryFieldDataGetter(fieldPos, DataTypeRoot.ROW, rowFieldCount);
break;
case ARRAY:
case MAP:
fieldGetter = new BinaryFieldDataGetter(fieldPos, fieldType.getTypeRoot());
break; break;
default: default:
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -163,4 +178,121 @@ public class PaimonWriterHelper {
} }
return genericRow; return genericRow;
} }
/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */
public static class BinaryFieldDataGetter implements RecordData.FieldGetter {
private final int fieldPos;
private final DataTypeRoot dataTypeRoot;
private final int rowFieldCount;
BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot) {
this(fieldPos, dataTypeRoot, -1);
}
BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot, int rowFieldCount) {
this.fieldPos = fieldPos;
this.dataTypeRoot = dataTypeRoot;
this.rowFieldCount = rowFieldCount;
}
@Override
public Object getFieldOrNull(RecordData row) {
switch (dataTypeRoot) {
case ARRAY:
return getArrayField(row);
case MAP:
return getMapField(row);
case ROW:
return getRecordField(row);
default:
throw new IllegalArgumentException("Unsupported field type: " + dataTypeRoot);
}
}
private Object getArrayField(RecordData row) {
ArrayData arrayData = row.getArray(fieldPos);
if (!(arrayData instanceof BinaryArrayData)) {
throw new IllegalArgumentException(
"Expected BinaryArrayData but was " + arrayData.getClass().getSimpleName());
}
BinaryArrayData binaryArrayData = (BinaryArrayData) arrayData;
return convertSegments(
binaryArrayData.getSegments(),
binaryArrayData.getOffset(),
binaryArrayData.getSizeInBytes(),
MemorySegmentUtils::readArrayData);
}
private Object getMapField(RecordData row) {
MapData mapData = row.getMap(fieldPos);
if (!(mapData instanceof BinaryMapData)) {
throw new IllegalArgumentException(
"Expected BinaryMapData but was " + mapData.getClass().getSimpleName());
}
BinaryMapData binaryMapData = (BinaryMapData) mapData;
return convertSegments(
binaryMapData.getSegments(),
binaryMapData.getOffset(),
binaryMapData.getSizeInBytes(),
MemorySegmentUtils::readMapData);
}
private Object getRecordField(RecordData row) {
RecordData recordData = row.getRow(fieldPos, rowFieldCount);
if (!(recordData instanceof BinaryRecordData)) {
throw new IllegalArgumentException(
"Expected BinaryRecordData but was "
+ recordData.getClass().getSimpleName());
}
BinaryRecordData binaryRecordData = (BinaryRecordData) recordData;
return convertSegments(
binaryRecordData.getSegments(),
binaryRecordData.getOffset(),
binaryRecordData.getSizeInBytes(),
(segments, offset, sizeInBytes) ->
MemorySegmentUtils.readRowData(
segments, rowFieldCount, offset, sizeInBytes));
}
private <T> T convertSegments(
MemorySegment[] segments,
int offset,
int sizeInBytes,
SegmentConverter<T> converter) {
org.apache.paimon.memory.MemorySegment[] paimonMemorySegments =
new org.apache.paimon.memory.MemorySegment[segments.length];
for (int i = 0; i < segments.length; i++) {
MemorySegment currMemorySegment = segments[i];
ByteBuffer byteBuffer = currMemorySegment.wrap(0, currMemorySegment.size());
// Allocate a new byte array and copy the data from the ByteBuffer
byte[] bytes = new byte[currMemorySegment.size()];
byteBuffer.get(bytes);
paimonMemorySegments[i] = org.apache.paimon.memory.MemorySegment.wrap(bytes);
}
return converter.convert(paimonMemorySegments, offset, sizeInBytes);
}
private interface SegmentConverter<T> {
T convert(
org.apache.paimon.memory.MemorySegment[] segments, int offset, int sizeInBytes);
}
/**
* Gets an instance of {@link InternalRow} from underlying {@link
* org.apache.paimon.memory.MemorySegment}.
*/
public InternalRow readRowData(
org.apache.paimon.memory.MemorySegment[] segments,
int numFields,
int baseOffset,
long offsetAndSize) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
BinaryRow row = new BinaryRow(numFields);
row.pointTo(segments, offset + baseOffset, size);
return row;
}
}
} }

@ -18,9 +18,11 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2; package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DataChangeEvent;
@ -28,11 +30,14 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal; import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.NestedRow;
import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowKind;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -41,7 +46,9 @@ import org.junit.jupiter.api.Test;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** Tests for {@link PaimonWriterHelper}. */ /** Tests for {@link PaimonWriterHelper}. */
public class PaimonWriterHelperTest { public class PaimonWriterHelperTest {
@ -171,4 +178,78 @@ public class PaimonWriterHelperTest {
genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters); genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
Assertions.assertEquals(genericRow.getRowKind(), RowKind.INSERT); Assertions.assertEquals(genericRow.getRowKind(), RowKind.INSERT);
} }
@Test
public void testConvertEventToGenericRowWithNestedRow() {
// Define the inner row type with an integer and a map
RowType innerRowType =
RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
// Define the outer row type with a nested row
RowType rowType = RowType.of(DataTypes.ROW(innerRowType));
// Create a map serializer and a map data instance
MapDataSerializer mapDataserializer =
new MapDataSerializer(DataTypes.STRING(), DataTypes.STRING());
Map<BinaryStringData, BinaryStringData> map = new HashMap<>();
map.put(BinaryStringData.fromString("key1"), BinaryStringData.fromString("value1"));
map.put(BinaryStringData.fromString("key2"), BinaryStringData.fromString("value2"));
GenericMapData genMapData = new GenericMapData(map);
BinaryMapData binMap = mapDataserializer.toBinaryMap(genMapData);
// Create inner row data
Object[] innerRowData =
new Object[] {
42, // Integer field
binMap // Map field
};
// Generate inner BinaryRecordData
BinaryRecordData innerRecordData =
new BinaryRecordDataGenerator(innerRowType).generate(innerRowData);
// Create outer row data containing the inner record data
Object[] testData = new Object[] {innerRecordData};
// Generate outer BinaryRecordData
BinaryRecordData recordData = new BinaryRecordDataGenerator(rowType).generate(testData);
// Create schema and field getters
Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
List<RecordData.FieldGetter> fieldGetters =
PaimonWriterHelper.createFieldGetters(schema, ZoneId.of("UTC+8"));
// Create a data change event
DataChangeEvent dataChangeEvent =
DataChangeEvent.insertEvent(TableId.parse("database.table"), recordData);
// Convert the event to GenericRow
GenericRow genericRow =
PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
// Verify the nested row field
NestedRow innerRow = (NestedRow) genericRow.getField(0);
NestedRow nestedRow = (NestedRow) innerRow.getRow(0, 2);
int actual = nestedRow.getRow(0, 2).getInt(0); // 42
Assertions.assertEquals(42, actual);
Map<BinaryString, BinaryString> expectedMap = new HashMap<>();
expectedMap.put(BinaryString.fromString("key1"), BinaryString.fromString("value1"));
expectedMap.put(BinaryString.fromString("key2"), BinaryString.fromString("value2"));
InternalMap internalMap = nestedRow.getMap(1);
Map<BinaryString, BinaryString> extractedMap = extractMap(internalMap);
for (Map.Entry<BinaryString, BinaryString> entry : expectedMap.entrySet()) {
Assertions.assertEquals(entry.getValue(), extractedMap.get(entry.getKey()));
}
}
private static Map<BinaryString, BinaryString> extractMap(InternalMap internalMap) {
Map<BinaryString, BinaryString> resultMap = new HashMap<>();
for (int i = 0; i < internalMap.size(); i++) {
BinaryString key = internalMap.keyArray().getString(i);
BinaryString value = internalMap.valueArray().getString(i);
resultMap.put(key, value);
}
return resultMap;
}
} }

@ -20,9 +20,11 @@ package org.apache.flink.cdc.runtime.serializer;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.types.ArrayType; import org.apache.flink.cdc.common.types.ArrayType;
import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.MapType;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.DecimalDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.DecimalDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.LocalZonedTimestampDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.LocalZonedTimestampDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.RecordDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.RecordDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.StringDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.StringDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.TimestampDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.TimestampDataSerializer;
@ -79,6 +81,8 @@ public class InternalSerializers {
case ROW: case ROW:
return new RecordDataSerializer(); return new RecordDataSerializer();
case MAP: case MAP:
MapType mapType = (MapType) type;
return new MapDataSerializer(mapType.getKeyType(), mapType.getValueType());
default: default:
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Unsupported type '" + type + "' to get internal serializer"); "Unsupported type '" + type + "' to get internal serializer");

@ -123,6 +123,10 @@ public class NullableSerializerWrapper<T> extends TypeSerializer<T> {
return new NullableSerializerWrapperSnapshot<>(innerSerializer); return new NullableSerializerWrapperSnapshot<>(innerSerializer);
} }
public TypeSerializer<T> getWrappedSerializer() {
return innerSerializer;
}
/** Serializer configuration snapshot for compatibility and format evolution. */ /** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public static final class NullableSerializerWrapperSnapshot<T> public static final class NullableSerializerWrapperSnapshot<T>

@ -25,11 +25,14 @@ import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.ArrayData; import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.GenericArrayData; import org.apache.flink.cdc.common.data.GenericArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.InstantiationUtil; import org.apache.flink.cdc.common.utils.InstantiationUtil;
import org.apache.flink.cdc.runtime.serializer.InternalSerializers; import org.apache.flink.cdc.runtime.serializer.InternalSerializers;
import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper; import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter;
import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
@ -45,6 +48,8 @@ public class ArrayDataSerializer extends TypeSerializer<ArrayData> {
private final DataType eleType; private final DataType eleType;
private final TypeSerializer<Object> eleSer; private final TypeSerializer<Object> eleSer;
private final ArrayData.ElementGetter elementGetter; private final ArrayData.ElementGetter elementGetter;
private transient BinaryArrayData reuseArray;
private transient BinaryArrayWriter reuseWriter;
public ArrayDataSerializer(DataType eleType) { public ArrayDataSerializer(DataType eleType) {
this(eleType, InternalSerializers.create(eleType)); this(eleType, InternalSerializers.create(eleType));
@ -130,6 +135,38 @@ public class ArrayDataSerializer extends TypeSerializer<ArrayData> {
} }
} }
public BinaryArrayData toBinaryArray(ArrayData from) {
if (from instanceof BinaryArrayData) {
return (BinaryArrayData) from;
}
int numElements = from.size();
if (reuseArray == null) {
reuseArray = new BinaryArrayData();
}
if (reuseWriter == null || reuseWriter.getNumElements() != numElements) {
reuseWriter =
new BinaryArrayWriter(
reuseArray,
numElements,
BinaryArrayData.calculateFixLengthPartSize(eleType));
} else {
reuseWriter.reset();
}
for (int i = 0; i < numElements; i++) {
if (from.isNullAt(i)) {
reuseWriter.setNullAt(i, eleType);
} else {
BinaryWriter.write(
reuseWriter, i, elementGetter.getElementOrNull(from, i), eleType, eleSer);
}
}
reuseWriter.complete();
return reuseArray;
}
@Override @Override
public ArrayData deserialize(DataInputView source) throws IOException { public ArrayData deserialize(DataInputView source) throws IOException {
int size = source.readInt(); int size = source.readInt();

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.serializer.InternalSerializers;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
/** Serializer for {@link MapData}. */
@Internal
public class MapDataSerializer extends TypeSerializer<MapData> {
private final DataType keyType;
private final DataType valueType;
private final TypeSerializer keySerializer;
private final TypeSerializer valueSerializer;
private final ArrayData.ElementGetter keyGetter;
private final ArrayData.ElementGetter valueGetter;
private transient BinaryArrayData reuseKeyArray;
private transient BinaryArrayData reuseValueArray;
private transient BinaryArrayWriter reuseKeyWriter;
private transient BinaryArrayWriter reuseValueWriter;
public MapDataSerializer(DataType keyType, DataType valueType) {
this(
keyType,
valueType,
InternalSerializers.create(keyType),
InternalSerializers.create(valueType));
}
private MapDataSerializer(
DataType keyType,
DataType valueType,
TypeSerializer keySerializer,
TypeSerializer valueSerializer) {
this.keyType = keyType;
this.valueType = valueType;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.keyGetter = ArrayData.createElementGetter(keyType);
this.valueGetter = ArrayData.createElementGetter(valueType);
}
@Override
public boolean isImmutableType() {
return false;
}
@Override
public TypeSerializer<MapData> duplicate() {
return new MapDataSerializer(
keyType, valueType, keySerializer.duplicate(), valueSerializer.duplicate());
}
@Override
public MapData createInstance() {
return new BinaryMapData();
}
/**
* NOTE: Map should be a HashMap, when we insert the key/value pairs of the TreeMap into a
* HashMap, problems maybe occur.
*/
@Override
public MapData copy(MapData from) {
if (from instanceof BinaryMapData) {
return ((BinaryMapData) from).copy();
} else {
return toBinaryMap(from);
}
}
@Override
public MapData copy(MapData from, MapData reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(MapData record, DataOutputView target) throws IOException {
BinaryMapData binaryMap = toBinaryMap(record);
target.writeInt(binaryMap.getSizeInBytes());
BinarySegmentUtils.copyToView(
binaryMap.getSegments(), binaryMap.getOffset(), binaryMap.getSizeInBytes(), target);
}
public BinaryMapData toBinaryMap(MapData from) {
if (from instanceof BinaryMapData) {
return (BinaryMapData) from;
}
int numElements = from.size();
if (reuseKeyArray == null) {
reuseKeyArray = new BinaryArrayData();
}
if (reuseValueArray == null) {
reuseValueArray = new BinaryArrayData();
}
if (reuseKeyWriter == null || reuseKeyWriter.getNumElements() != numElements) {
reuseKeyWriter =
new BinaryArrayWriter(
reuseKeyArray,
numElements,
BinaryArrayData.calculateFixLengthPartSize(keyType));
} else {
reuseKeyWriter.reset();
}
if (reuseValueWriter == null || reuseValueWriter.getNumElements() != numElements) {
reuseValueWriter =
new BinaryArrayWriter(
reuseValueArray,
numElements,
BinaryArrayData.calculateFixLengthPartSize(valueType));
} else {
reuseValueWriter.reset();
}
ArrayData keyArray = from.keyArray();
ArrayData valueArray = from.valueArray();
for (int i = 0; i < from.size(); i++) {
Object key = keyGetter.getElementOrNull(keyArray, i);
Object value = valueGetter.getElementOrNull(valueArray, i);
if (key == null) {
reuseKeyWriter.setNullAt(i, keyType);
} else {
BinaryWriter.write(reuseKeyWriter, i, key, keyType, keySerializer);
}
if (value == null) {
reuseValueWriter.setNullAt(i, valueType);
} else {
BinaryWriter.write(reuseValueWriter, i, value, valueType, valueSerializer);
}
}
reuseKeyWriter.complete();
reuseValueWriter.complete();
return BinaryMapData.valueOf(reuseKeyArray, reuseValueArray);
}
@Override
public MapData deserialize(DataInputView source) throws IOException {
return deserializeReuse(new BinaryMapData(), source);
}
@Override
public MapData deserialize(MapData reuse, DataInputView source) throws IOException {
return deserializeReuse(
reuse instanceof BinaryMapData ? (BinaryMapData) reuse : new BinaryMapData(),
source);
}
private BinaryMapData deserializeReuse(BinaryMapData reuse, DataInputView source)
throws IOException {
int length = source.readInt();
byte[] bytes = new byte[length];
source.readFully(bytes);
reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
return reuse;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int length = source.readInt();
target.writeInt(length);
target.write(source, length);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MapDataSerializer that = (MapDataSerializer) o;
return keyType.equals(that.keyType) && valueType.equals(that.valueType);
}
@Override
public int hashCode() {
int result = keyType.hashCode();
result = 31 * result + valueType.hashCode();
return result;
}
@VisibleForTesting
public TypeSerializer getKeySerializer() {
return keySerializer;
}
@VisibleForTesting
public TypeSerializer getValueSerializer() {
return valueSerializer;
}
@Override
public TypeSerializerSnapshot<MapData> snapshotConfiguration() {
return new MapDataSerializer.MapDataSerializerSnapshot(
keyType, valueType, keySerializer, valueSerializer);
}
/** {@link TypeSerializerSnapshot} for {@link MapDataSerializer}. */
public static final class MapDataSerializerSnapshot implements TypeSerializerSnapshot<MapData> {
private static final int CURRENT_VERSION = 0;
private DataType keyType;
private DataType valueType;
private TypeSerializer keySerializer;
private TypeSerializer valueSerializer;
@SuppressWarnings("unused")
public MapDataSerializerSnapshot() {
// this constructor is used when restoring from a checkpoint/savepoint.
}
MapDataSerializerSnapshot(
DataType keyT, DataType valueT, TypeSerializer keySer, TypeSerializer valueSer) {
this.keyType = keyT;
this.valueType = valueT;
this.keySerializer = keySer;
this.valueSerializer = valueSer;
}
@Override
public int getCurrentVersion() {
return CURRENT_VERSION;
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
DataOutputViewStream outStream = new DataOutputViewStream(out);
InstantiationUtil.serializeObject(outStream, keyType);
InstantiationUtil.serializeObject(outStream, valueType);
InstantiationUtil.serializeObject(outStream, keySerializer);
InstantiationUtil.serializeObject(outStream, valueSerializer);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
try {
DataInputViewStream inStream = new DataInputViewStream(in);
this.keyType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
this.valueType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
this.keySerializer =
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
this.valueSerializer =
InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
@Override
public TypeSerializer<MapData> restoreSerializer() {
return new MapDataSerializer(keyType, valueType, keySerializer, valueSerializer);
}
@Override
public TypeSerializerSchemaCompatibility<MapData> resolveSchemaCompatibility(
TypeSerializer<MapData> newSerializer) {
if (!(newSerializer instanceof MapDataSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
MapDataSerializer newMapDataSerializer = (MapDataSerializer) newSerializer;
if (!keyType.equals(newMapDataSerializer.keyType)
|| !valueType.equals(newMapDataSerializer.valueType)
|| !keySerializer.equals(newMapDataSerializer.keySerializer)
|| !valueSerializer.equals(newMapDataSerializer.valueSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
} else {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
}
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data.binary;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryUtils;
/**
* Utilities for {@link BinaryRecordDataDataUtil}. Many of the methods in this class are used in
* code generation.
*/
public class BinaryRecordDataDataUtil {
public static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
public static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
public static final BinaryRecordData EMPTY_ROW = new BinaryRecordData(0);
static {
int size = EMPTY_ROW.getFixedLengthPartSize();
byte[] bytes = new byte[size];
EMPTY_ROW.pointTo(MemorySegmentFactory.wrap(bytes), 0, size);
}
public static boolean byteArrayEquals(byte[] left, byte[] right, int length) {
return byteArrayEquals(left, BYTE_ARRAY_BASE_OFFSET, right, BYTE_ARRAY_BASE_OFFSET, length);
}
public static boolean byteArrayEquals(
Object left, long leftOffset, Object right, long rightOffset, int length) {
int i = 0;
while (i <= length - 8) {
if (UNSAFE.getLong(left, leftOffset + i) != UNSAFE.getLong(right, rightOffset + i)) {
return false;
}
i += 8;
}
while (i < length) {
if (UNSAFE.getByte(left, leftOffset + i) != UNSAFE.getByte(right, rightOffset + i)) {
return false;
}
i += 1;
}
return true;
}
}

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data.util;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.types.DataType;
import java.util.HashMap;
import java.util.Map;
/** Utilities for {@link MapData}. */
public final class MapDataUtil {
/**
* Converts a {@link MapData} into Java {@link Map}, the keys and values of the Java map still
* holds objects of internal data structures.
*/
public static Map<Object, Object> convertToJavaMap(
MapData map, DataType keyType, DataType valueType) {
ArrayData keyArray = map.keyArray();
ArrayData valueArray = map.valueArray();
Map<Object, Object> javaMap = new HashMap<>();
ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType);
ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
for (int i = 0; i < map.size(); i++) {
Object key = keyGetter.getElementOrNull(keyArray, i);
Object value = valueGetter.getElementOrNull(valueArray, i);
javaMap.put(key, value);
}
return javaMap;
}
}

@ -27,11 +27,14 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData; import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryFormat; import org.apache.flink.cdc.common.data.binary.BinaryFormat;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils; import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemorySegmentFactory;
@ -103,12 +106,16 @@ abstract class AbstractBinaryWriter implements BinaryWriter {
@Override @Override
public void writeArray(int pos, ArrayData input, ArrayDataSerializer serializer) { public void writeArray(int pos, ArrayData input, ArrayDataSerializer serializer) {
throw new UnsupportedOperationException("Not support array data."); BinaryArrayData binary = serializer.toBinaryArray(input);
writeSegmentsToVarLenPart(
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
} }
@Override @Override
public void writeMap(int pos, MapData input, TypeSerializer<MapData> serializer) { public void writeMap(int pos, MapData input, MapDataSerializer serializer) {
throw new UnsupportedOperationException("Not support map data."); BinaryMapData binary = serializer.toBinaryMap(input);
writeSegmentsToVarLenPart(
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
} }
private DataOutputViewStreamWrapper getOutputView() { private DataOutputViewStreamWrapper getOutputView() {

@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data.writer;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.core.memory.MemorySegmentFactory;
import java.io.Serializable;
/** Writer for binary array. See {@link BinaryArrayData}. */
@Internal
public final class BinaryArrayWriter extends AbstractBinaryWriter {
private final int nullBitsSizeInBytes;
private final BinaryArrayData array;
private final int numElements;
private int fixedSize;
public BinaryArrayWriter(BinaryArrayData array, int numElements, int elementSize) {
this.nullBitsSizeInBytes = BinaryArrayData.calculateHeaderInBytes(numElements);
this.fixedSize =
roundNumberOfBytesToNearestWord(nullBitsSizeInBytes + elementSize * numElements);
this.cursor = fixedSize;
this.numElements = numElements;
this.segment = MemorySegmentFactory.wrap(new byte[fixedSize]);
this.segment.putInt(0, numElements);
this.array = array;
}
/** First, reset. */
@Override
public void reset() {
this.cursor = fixedSize;
for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
segment.putLong(i, 0L);
}
this.segment.putInt(0, numElements);
}
@Override
public void setNullBit(int ordinal) {
BinarySegmentUtils.bitSet(segment, 4, ordinal);
}
public void setNullBoolean(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.putBoolean(getElementOffset(ordinal, 1), false);
}
public void setNullByte(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.put(getElementOffset(ordinal, 1), (byte) 0);
}
public void setNullShort(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.putShort(getElementOffset(ordinal, 2), (short) 0);
}
public void setNullInt(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.putInt(getElementOffset(ordinal, 4), 0);
}
public void setNullLong(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.putLong(getElementOffset(ordinal, 8), (long) 0);
}
public void setNullFloat(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.putFloat(getElementOffset(ordinal, 4), (float) 0);
}
public void setNullDouble(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
segment.putDouble(getElementOffset(ordinal, 8), (double) 0);
}
@Override
public void setNullAt(int ordinal) {
setNullLong(ordinal);
}
/**
* @deprecated Use {@link #createNullSetter(DataType)} for avoiding logical types during
* runtime.
*/
@Deprecated
public void setNullAt(int pos, DataType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
setNullBoolean(pos);
break;
case TINYINT:
setNullByte(pos);
break;
case SMALLINT:
setNullShort(pos);
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
setNullInt(pos);
break;
case BIGINT:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
setNullLong(pos);
break;
case FLOAT:
setNullFloat(pos);
break;
case DOUBLE:
setNullDouble(pos);
break;
default:
setNullAt(pos);
}
}
private int getElementOffset(int pos, int elementSize) {
return nullBitsSizeInBytes + elementSize * pos;
}
@Override
public int getFieldOffset(int pos) {
return getElementOffset(pos, 8);
}
@Override
public void setOffsetAndSize(int pos, int offset, long size) {
final long offsetAndSize = ((long) offset << 32) | size;
segment.putLong(getElementOffset(pos, 8), offsetAndSize);
}
@Override
public void writeBoolean(int pos, boolean value) {
segment.putBoolean(getElementOffset(pos, 1), value);
}
@Override
public void writeByte(int pos, byte value) {
segment.put(getElementOffset(pos, 1), value);
}
@Override
public void writeShort(int pos, short value) {
segment.putShort(getElementOffset(pos, 2), value);
}
@Override
public void writeInt(int pos, int value) {
segment.putInt(getElementOffset(pos, 4), value);
}
@Override
public void writeLong(int pos, long value) {
segment.putLong(getElementOffset(pos, 8), value);
}
@Override
public void writeFloat(int pos, float value) {
if (Float.isNaN(value)) {
value = Float.NaN;
}
segment.putFloat(getElementOffset(pos, 4), value);
}
@Override
public void writeDouble(int pos, double value) {
if (Double.isNaN(value)) {
value = Double.NaN;
}
segment.putDouble(getElementOffset(pos, 8), value);
}
@Override
public void afterGrow() {
array.pointTo(segment, 0, segment.size());
}
/** Finally, complete write to set real size to row. */
@Override
public void complete() {
array.pointTo(segment, 0, cursor);
}
public int getNumElements() {
return numElements;
}
// --------------------------------------------------------------------------------------------
/**
* Creates an for accessor setting the elements of an array writer to {@code null} during
* runtime.
*
* @param elementType the element type of the array
*/
public static BinaryArrayWriter.NullSetter createNullSetter(DataType elementType) {
// ordered by type root definition
switch (elementType.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
case DECIMAL:
case BIGINT:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case ARRAY:
case MAP:
case ROW:
return BinaryArrayWriter::setNullLong;
case BOOLEAN:
return BinaryArrayWriter::setNullBoolean;
case TINYINT:
return BinaryArrayWriter::setNullByte;
case SMALLINT:
return BinaryArrayWriter::setNullShort;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
return BinaryArrayWriter::setNullInt;
case FLOAT:
return BinaryArrayWriter::setNullFloat;
case DOUBLE:
return BinaryArrayWriter::setNullDouble;
case TIMESTAMP_WITH_TIME_ZONE:
throw new UnsupportedOperationException();
default:
throw new IllegalArgumentException();
}
}
/**
* Accessor for setting the elements of an array writer to {@code null} during runtime.
*
* @see #createNullSetter(DataType)
*/
public interface NullSetter extends Serializable {
void setNull(BinaryArrayWriter writer, int pos);
}
}

@ -32,7 +32,9 @@ import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
/** /**
* Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write * Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write
@ -76,7 +78,7 @@ public interface BinaryWriter {
void writeArray(int pos, ArrayData value, ArrayDataSerializer serializer); void writeArray(int pos, ArrayData value, ArrayDataSerializer serializer);
void writeMap(int pos, MapData value, TypeSerializer<MapData> serializer); void writeMap(int pos, MapData value, MapDataSerializer serializer);
void writeRecord(int pos, RecordData value, TypeSerializer<RecordData> serializer); void writeRecord(int pos, RecordData value, TypeSerializer<RecordData> serializer);
@ -131,10 +133,16 @@ public interface BinaryWriter {
writer.writeDecimal(pos, (DecimalData) o, decimalType.getPrecision()); writer.writeDecimal(pos, (DecimalData) o, decimalType.getPrecision());
break; break;
case ARRAY: case ARRAY:
if (serializer instanceof NullableSerializerWrapper) {
serializer = ((NullableSerializerWrapper) serializer).getWrappedSerializer();
}
writer.writeArray(pos, (ArrayData) o, (ArrayDataSerializer) serializer); writer.writeArray(pos, (ArrayData) o, (ArrayDataSerializer) serializer);
break; break;
case MAP: case MAP:
writer.writeMap(pos, (MapData) o, (TypeSerializer<MapData>) serializer); if (serializer instanceof NullableSerializerWrapper) {
serializer = ((NullableSerializerWrapper) serializer).getWrappedSerializer();
}
writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer);
break; break;
case ROW: case ROW:
writer.writeRecord(pos, (RecordData) o, (TypeSerializer<RecordData>) serializer); writer.writeRecord(pos, (RecordData) o, (TypeSerializer<RecordData>) serializer);

@ -39,7 +39,14 @@ public class DataFieldSerializer extends TypeSerializerSingleton<DataField> {
public static final DataFieldSerializer INSTANCE = new DataFieldSerializer(); public static final DataFieldSerializer INSTANCE = new DataFieldSerializer();
private final StringSerializer stringSerializer = StringSerializer.INSTANCE; private final StringSerializer stringSerializer = StringSerializer.INSTANCE;
private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer(); private DataTypeSerializer dataTypeSerializer;
private DataTypeSerializer getDataTypeSerializer() {
if (dataTypeSerializer == null) {
dataTypeSerializer = new DataTypeSerializer();
}
return dataTypeSerializer;
}
@Override @Override
public boolean isImmutableType() { public boolean isImmutableType() {
@ -55,7 +62,7 @@ public class DataFieldSerializer extends TypeSerializerSingleton<DataField> {
public DataField copy(DataField from) { public DataField copy(DataField from) {
return new DataField( return new DataField(
stringSerializer.copy(from.getName()), stringSerializer.copy(from.getName()),
dataTypeSerializer.copy(from.getType()), getDataTypeSerializer().copy(from.getType()),
stringSerializer.copy(from.getDescription())); stringSerializer.copy(from.getDescription()));
} }
@ -72,14 +79,14 @@ public class DataFieldSerializer extends TypeSerializerSingleton<DataField> {
@Override @Override
public void serialize(DataField record, DataOutputView target) throws IOException { public void serialize(DataField record, DataOutputView target) throws IOException {
stringSerializer.serialize(record.getName(), target); stringSerializer.serialize(record.getName(), target);
dataTypeSerializer.serialize(record.getType(), target); getDataTypeSerializer().serialize(record.getType(), target);
stringSerializer.serialize(record.getDescription(), target); stringSerializer.serialize(record.getDescription(), target);
} }
@Override @Override
public DataField deserialize(DataInputView source) throws IOException { public DataField deserialize(DataInputView source) throws IOException {
String name = stringSerializer.deserialize(source); String name = stringSerializer.deserialize(source);
DataType type = dataTypeSerializer.deserialize(source); DataType type = getDataTypeSerializer().deserialize(source);
String desc = stringSerializer.deserialize(source); String desc = stringSerializer.deserialize(source);
return new DataField(name, type, desc); return new DataField(name, type, desc);
} }

@ -55,7 +55,14 @@ public class DataTypeSerializer extends TypeSerializer<DataType> {
private final EnumSerializer<DataTypeClass> enumSerializer = private final EnumSerializer<DataTypeClass> enumSerializer =
new EnumSerializer<>(DataTypeClass.class); new EnumSerializer<>(DataTypeClass.class);
private final RowTypeSerializer rowTypeSerializer = RowTypeSerializer.INSTANCE; private RowTypeSerializer rowTypeSerializer;
private RowTypeSerializer getRowTypeSerializer() {
if (rowTypeSerializer == null) {
rowTypeSerializer = RowTypeSerializer.INSTANCE;
}
return rowTypeSerializer;
}
@Override @Override
public boolean isImmutableType() { public boolean isImmutableType() {
@ -75,7 +82,7 @@ public class DataTypeSerializer extends TypeSerializer<DataType> {
@Override @Override
public DataType copy(DataType from) { public DataType copy(DataType from) {
if (from instanceof RowType) { if (from instanceof RowType) {
return rowTypeSerializer.copy((RowType) from); return getRowTypeSerializer().copy((RowType) from);
} }
return from; return from;
} }
@ -94,7 +101,7 @@ public class DataTypeSerializer extends TypeSerializer<DataType> {
public void serialize(DataType record, DataOutputView target) throws IOException { public void serialize(DataType record, DataOutputView target) throws IOException {
if (record instanceof RowType) { if (record instanceof RowType) {
enumSerializer.serialize(DataTypeClass.ROW, target); enumSerializer.serialize(DataTypeClass.ROW, target);
rowTypeSerializer.serialize((RowType) record, target); getRowTypeSerializer().serialize((RowType) record, target);
} else if (record instanceof BinaryType) { } else if (record instanceof BinaryType) {
enumSerializer.serialize(DataTypeClass.BINARY, target); enumSerializer.serialize(DataTypeClass.BINARY, target);
target.writeBoolean(record.isNullable()); target.writeBoolean(record.isNullable());
@ -178,7 +185,7 @@ public class DataTypeSerializer extends TypeSerializer<DataType> {
public DataType deserialize(DataInputView source) throws IOException { public DataType deserialize(DataInputView source) throws IOException {
DataTypeClass dataTypeClass = enumSerializer.deserialize(source); DataTypeClass dataTypeClass = enumSerializer.deserialize(source);
if (dataTypeClass == DataTypeClass.ROW) { if (dataTypeClass == DataTypeClass.ROW) {
return rowTypeSerializer.deserialize(source); return getRowTypeSerializer().deserialize(source);
} }
boolean isNullable = source.readBoolean(); boolean isNullable = source.readBoolean();
switch (dataTypeClass) { switch (dataTypeClass) {
@ -255,12 +262,12 @@ public class DataTypeSerializer extends TypeSerializer<DataType> {
} }
DataTypeSerializer that = (DataTypeSerializer) o; DataTypeSerializer that = (DataTypeSerializer) o;
return Objects.equals(enumSerializer, that.enumSerializer) return Objects.equals(enumSerializer, that.enumSerializer)
&& Objects.equals(rowTypeSerializer, that.rowTypeSerializer); && Objects.equals(getRowTypeSerializer(), that.getRowTypeSerializer());
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(enumSerializer, rowTypeSerializer); return Objects.hash(enumSerializer, getRowTypeSerializer());
} }
@Override @Override

@ -38,8 +38,18 @@ public class RowTypeSerializer extends TypeSerializerSingleton<RowType> {
/** Sharable instance of the TableIdSerializer. */ /** Sharable instance of the TableIdSerializer. */
public static final RowTypeSerializer INSTANCE = new RowTypeSerializer(); public static final RowTypeSerializer INSTANCE = new RowTypeSerializer();
private final ListSerializer<DataField> fieldsSerializer = private volatile ListSerializer<DataField> fieldsSerializer;
new ListSerializer<>(DataFieldSerializer.INSTANCE);
private ListSerializer<DataField> getFieldsSerializer() {
if (fieldsSerializer == null) {
synchronized (this) {
if (fieldsSerializer == null) {
fieldsSerializer = new ListSerializer<>(DataFieldSerializer.INSTANCE);
}
}
}
return fieldsSerializer;
}
@Override @Override
public boolean isImmutableType() { public boolean isImmutableType() {
@ -53,7 +63,7 @@ public class RowTypeSerializer extends TypeSerializerSingleton<RowType> {
@Override @Override
public RowType copy(RowType from) { public RowType copy(RowType from) {
return new RowType(from.isNullable(), fieldsSerializer.copy(from.getFields())); return new RowType(from.isNullable(), getFieldsSerializer().copy(from.getFields()));
} }
@Override @Override
@ -69,13 +79,13 @@ public class RowTypeSerializer extends TypeSerializerSingleton<RowType> {
@Override @Override
public void serialize(RowType record, DataOutputView target) throws IOException { public void serialize(RowType record, DataOutputView target) throws IOException {
target.writeBoolean(record.isNullable()); target.writeBoolean(record.isNullable());
fieldsSerializer.serialize(record.getFields(), target); getFieldsSerializer().serialize(record.getFields(), target);
} }
@Override @Override
public RowType deserialize(DataInputView source) throws IOException { public RowType deserialize(DataInputView source) throws IOException {
boolean nullable = source.readBoolean(); boolean nullable = source.readBoolean();
return new RowType(nullable, fieldsSerializer.deserialize(source)); return new RowType(nullable, getFieldsSerializer().deserialize(source));
} }
@Override @Override

@ -24,7 +24,6 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.serializer.InternalSerializers; import org.apache.flink.cdc.runtime.serializer.InternalSerializers;
import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter; import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter; import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter;
@ -51,7 +50,6 @@ public class BinaryRecordDataGenerator {
dataTypes, dataTypes,
Arrays.stream(dataTypes) Arrays.stream(dataTypes)
.map(InternalSerializers::create) .map(InternalSerializers::create)
.map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new)); .toArray(TypeSerializer[]::new));
} }

@ -19,12 +19,24 @@ package org.apache.flink.cdc.runtime.serializer.data;
import org.apache.flink.cdc.common.data.ArrayData; import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.GenericArrayData; import org.apache.flink.cdc.common.data.GenericArrayData;
import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.StringData; import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
import org.apache.flink.testutils.DeeplyEqualsChecker; import org.apache.flink.testutils.DeeplyEqualsChecker;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
/** A test for the {@link ArrayDataSerializer}. */ /** A test for the {@link ArrayDataSerializer}. */
class ArrayDataSerializerTest extends SerializerTestBase<ArrayData> { class ArrayDataSerializerTest extends SerializerTestBase<ArrayData> {
@ -79,4 +91,75 @@ class ArrayDataSerializerTest extends SerializerTestBase<ArrayData> {
}) })
}; };
} }
static BinaryArrayData createArray(String... vs) {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 8);
for (int i = 0; i < vs.length; i++) {
writer.writeString(i, BinaryStringData.fromString(vs[i]));
}
writer.complete();
return array;
}
@Test
public void testToBinaryArrayWithNestedTypes() {
// Create a nested ArrayData
Map<BinaryStringData, BinaryStringData> map = new HashMap<>();
map.put(BinaryStringData.fromString("key1"), BinaryStringData.fromString("value1"));
map.put(BinaryStringData.fromString("key2"), BinaryStringData.fromString("value2"));
GenericMapData genMapData = new GenericMapData(map);
ArrayData innerArrayData = new GenericArrayData(new Object[] {genMapData});
// Serialize to BinaryArrayData
ArrayDataSerializer serializer =
new ArrayDataSerializer(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
BinaryArrayData binaryArrayData = serializer.toBinaryArray(innerArrayData);
// Verify the conversion
MapData mapData = binaryArrayData.getMap(0);
Assertions.assertEquals(2, mapData.size());
}
@Test
public void testToBinaryArrayWithDeeplyNestedTypes() {
// Create a nested structure: MapData containing ArrayData elements
Map<BinaryStringData, ArrayData> nestedMap = new HashMap<>();
nestedMap.put(
BinaryStringData.fromString("key1"), new GenericArrayData(new Object[] {42, 43}));
nestedMap.put(
BinaryStringData.fromString("key2"), new GenericArrayData(new Object[] {44, 45}));
GenericMapData genMapData = new GenericMapData(nestedMap);
// Create an outer ArrayData containing the nested MapData
ArrayData outerArrayData = new GenericArrayData(new Object[] {genMapData});
// Serialize to BinaryArrayData
ArrayDataSerializer serializer =
new ArrayDataSerializer(
DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.INT())));
BinaryArrayData binaryArrayData = serializer.toBinaryArray(outerArrayData);
// Verify the conversion
MapData mapData = binaryArrayData.getMap(0);
assertEquals(2, mapData.size());
// Check nested arrays in map
ArrayData keys = mapData.keyArray();
ArrayData values = mapData.valueArray();
// Check the first key-value pair
int keyIndex = keys.getString(0).toString().equals("key1") ? 0 : 1;
ArrayData arrayData1 = values.getArray(keyIndex);
assertEquals(42, arrayData1.getInt(0));
assertEquals(43, arrayData1.getInt(1));
// Check the second key-value pair
keyIndex = keys.getString(0).toString().equals("key2") ? 0 : 1;
ArrayData arrayData2 = values.getArray(keyIndex);
assertEquals(44, arrayData2.getInt(0));
assertEquals(45, arrayData2.getInt(1));
}
} }

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.GenericArrayData;
import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
import org.apache.flink.testutils.DeeplyEqualsChecker;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.cdc.runtime.serializer.data.util.MapDataUtil.convertToJavaMap;
/** A test for the {@link MapDataSerializer}. */
public class MapDataSerializerTest extends SerializerTestBase<MapData> {
private static final DataType INT = DataTypes.INT();
private static final DataType STRING = DataTypes.STRING();
public MapDataSerializerTest() {
super(
new DeeplyEqualsChecker()
.withCustomCheck(
(o1, o2) -> o1 instanceof MapData && o2 instanceof MapData,
(o1, o2, checker) ->
// Better is more proper to compare the maps after changing
// them to Java maps
// instead of binary maps. For example, consider the
// following two maps:
// {1: 'a', 2: 'b', 3: 'c'} and {3: 'c', 2: 'b', 1: 'a'}
// These are actually the same maps, but their key / value
// order will be
// different when stored as binary maps, and the equalsTo
// method of binary
// map will return false.
convertToJavaMap((MapData) o1, INT, STRING)
.equals(
convertToJavaMap(
(MapData) o2, INT, STRING))));
}
/** @return MapDataSerializer */
@Override
protected TypeSerializer<MapData> createSerializer() {
return new MapDataSerializer(INT, STRING);
}
/**
* Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method.
*
* <p>The expected length should be positive, for fix-length data types, or {@code -1} for
* variable-length types.
*/
@Override
protected int getLength() {
return -1;
}
/** @return MapData clazz */
@Override
protected Class<MapData> getTypeClass() {
return MapData.class;
}
/** @return MapData[] */
@Override
protected MapData[] getTestData() {
Map<Object, Object> first = new HashMap<>();
first.put(1, BinaryStringData.fromString(""));
return new MapData[] {
new GenericMapData(first),
new CustomMapData(first),
BinaryMapData.valueOf(
createArray(1, 2), ArrayDataSerializerTest.createArray("11", "haa")),
BinaryMapData.valueOf(
createArray(1, 3, 4), ArrayDataSerializerTest.createArray("11", "haa", "ke")),
BinaryMapData.valueOf(
createArray(1, 4, 2), ArrayDataSerializerTest.createArray("11", "haa", "ke")),
BinaryMapData.valueOf(
createArray(1, 5, 6, 7),
ArrayDataSerializerTest.createArray("11", "lele", "haa", "ke"))
};
}
private static BinaryArrayData createArray(int... vs) {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 4);
for (int i = 0; i < vs.length; i++) {
writer.writeInt(i, vs[i]);
}
writer.complete();
return array;
}
/** A simple custom implementation for {@link MapData}. */
public static class CustomMapData implements MapData {
private final Map<?, ?> map;
public CustomMapData(Map<?, ?> map) {
this.map = map;
}
public Object get(Object key) {
return map.get(key);
}
@Override
public int size() {
return map.size();
}
@Override
public ArrayData keyArray() {
Object[] keys = map.keySet().toArray();
return new GenericArrayData(keys);
}
@Override
public ArrayData valueArray() {
Object[] values = map.values().toArray();
return new GenericArrayData(values);
}
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof CustomMapData)) {
return false;
}
return map.equals(((CustomMapData) o).map);
}
@Override
public int hashCode() {
return Objects.hash(map);
}
}
}

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data.binary;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
import org.apache.flink.cdc.runtime.serializer.data.util.DataFormatTestUtil;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Test;
import static org.apache.flink.cdc.runtime.serializer.data.binary.BinaryRecordDataDataUtil.BYTE_ARRAY_BASE_OFFSET;
import static org.assertj.core.api.Assertions.assertThat;
/** Utilities for binary data segments which heavily uses {@link MemorySegment}. */
public class BinarySegmentUtilsTest {
@Test
public void testCopy() {
// test copy the content of the latter Seg
MemorySegment[] segments = new MemorySegment[2];
segments[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
segments[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, 4, 2);
assertThat(bytes).isEqualTo(new byte[] {12, 15});
}
@Test
public void testEquals() {
// test copy the content of the latter Seg
MemorySegment[] segments1 = new MemorySegment[3];
segments1[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
segments1[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
segments1[2] = MemorySegmentFactory.wrap(new byte[] {1, 1, 1});
MemorySegment[] segments2 = new MemorySegment[2];
segments2[0] = MemorySegmentFactory.wrap(new byte[] {6, 0, 2, 5});
segments2[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15, 18});
assertThat(BinarySegmentUtils.equalsMultiSegments(segments1, 0, segments2, 0, 0)).isTrue();
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 3)).isTrue();
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 6)).isTrue();
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 7)).isFalse();
}
@Test
public void testBoundaryByteArrayEquals() {
byte[] bytes1 = new byte[5];
bytes1[3] = 81;
byte[] bytes2 = new byte[100];
bytes2[3] = 81;
bytes2[4] = 81;
assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2, 4)).isTrue();
assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2, 5)).isFalse();
assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2, 0)).isTrue();
}
@Test
public void testBoundaryEquals() {
BinaryRecordData row24 = DataFormatTestUtil.get24BytesBinaryRow();
BinaryRecordData row160 = DataFormatTestUtil.get160BytesBinaryRow();
BinaryRecordData varRow160 = DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160);
BinaryRecordData varRow160InOne = DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160);
assertThat(varRow160InOne).isEqualTo(row160);
assertThat(varRow160InOne).isEqualTo(varRow160);
assertThat(varRow160).isEqualTo(row160);
assertThat(varRow160).isEqualTo(varRow160InOne);
assertThat(row160).isNotEqualTo(row24);
assertThat(varRow160).isNotEqualTo(row24);
assertThat(varRow160InOne).isNotEqualTo(row24);
assertThat(BinarySegmentUtils.equals(row24.getSegments(), 0, row160.getSegments(), 0, 0))
.isTrue();
assertThat(BinarySegmentUtils.equals(row24.getSegments(), 0, varRow160.getSegments(), 0, 0))
.isTrue();
// test var segs
MemorySegment[] segments1 = new MemorySegment[2];
segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
MemorySegment[] segments2 = new MemorySegment[3];
segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
segments1[0].put(9, (byte) 1);
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 14, 14)).isFalse();
segments2[1].put(7, (byte) 1);
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 14, 14)).isTrue();
assertThat(BinarySegmentUtils.equals(segments1, 2, segments2, 16, 14)).isTrue();
assertThat(BinarySegmentUtils.equals(segments1, 2, segments2, 16, 16)).isTrue();
segments2[2].put(7, (byte) 1);
assertThat(BinarySegmentUtils.equals(segments1, 2, segments2, 32, 14)).isTrue();
}
@Test
public void testBoundaryCopy() {
MemorySegment[] segments1 = new MemorySegment[2];
segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
segments1[0].put(15, (byte) 5);
segments1[1].put(15, (byte) 6);
{
byte[] bytes = new byte[64];
MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
BinarySegmentUtils.copyToBytes(segments1, 0, bytes, 0, 64);
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 0, 64)).isTrue();
}
{
byte[] bytes = new byte[64];
MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
BinarySegmentUtils.copyToBytes(segments1, 32, bytes, 0, 14);
assertThat(BinarySegmentUtils.equals(segments1, 32, segments2, 0, 14)).isTrue();
}
{
byte[] bytes = new byte[64];
MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
BinarySegmentUtils.copyToBytes(segments1, 34, bytes, 0, 14);
assertThat(BinarySegmentUtils.equals(segments1, 34, segments2, 0, 14)).isTrue();
}
}
@Test
public void testCopyToUnsafe() {
MemorySegment[] segments1 = new MemorySegment[2];
segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
segments1[0].put(15, (byte) 5);
segments1[1].put(15, (byte) 6);
{
byte[] bytes = new byte[64];
MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
BinarySegmentUtils.copyToUnsafe(segments1, 0, bytes, BYTE_ARRAY_BASE_OFFSET, 64);
assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 0, 64)).isTrue();
}
{
byte[] bytes = new byte[64];
MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
BinarySegmentUtils.copyToUnsafe(segments1, 32, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
assertThat(BinarySegmentUtils.equals(segments1, 32, segments2, 0, 14)).isTrue();
}
{
byte[] bytes = new byte[64];
MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
BinarySegmentUtils.copyToUnsafe(segments1, 34, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
assertThat(BinarySegmentUtils.equals(segments1, 34, segments2, 0, 14)).isTrue();
}
}
@Test
public void testFind() {
MemorySegment[] segments1 = new MemorySegment[2];
segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
MemorySegment[] segments2 = new MemorySegment[3];
segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 0)).isEqualTo(34);
assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 15)).isEqualTo(-1);
}
}

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data.util;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.commons.lang3.RandomStringUtils;
import static org.assertj.core.api.Assertions.assertThat;
/** Utils for testing data formats. */
public class DataFormatTestUtil {
/** Get a binary row of 24 bytes long. */
public static BinaryRecordData get24BytesBinaryRow() {
// header (8 bytes) + 2 * string in fixed-length part (8 bytes each)
BinaryRecordData row = new BinaryRecordData(2);
BinaryRecordDataWriter writer = new BinaryRecordDataWriter(row);
writer.writeString(0, BinaryStringData.fromString(RandomStringUtils.randomNumeric(2)));
writer.writeString(1, BinaryStringData.fromString(RandomStringUtils.randomNumeric(2)));
writer.complete();
return row;
}
/** Get a binary row of 160 bytes long. */
public static BinaryRecordData get160BytesBinaryRow() {
// header (8 bytes) +
// 72 byte length string (8 bytes in fixed-length, 72 bytes in variable-length) +
// 64 byte length string (8 bytes in fixed-length, 64 bytes in variable-length)
BinaryRecordData row = new BinaryRecordData(2);
BinaryRecordDataWriter writer = new BinaryRecordDataWriter(row);
writer.writeString(0, BinaryStringData.fromString(RandomStringUtils.randomNumeric(72)));
writer.writeString(1, BinaryStringData.fromString(RandomStringUtils.randomNumeric(64)));
writer.complete();
return row;
}
/**
* Get a binary row consisting of 6 segments. The bytes of the returned row is the same with the
* given input binary row.
*/
public static BinaryRecordData getMultiSeg160BytesBinaryRow(BinaryRecordData row160) {
BinaryRecordData multiSegRow160 = new BinaryRecordData(2);
MemorySegment[] segments = new MemorySegment[6];
int baseOffset = 8;
int posInSeg = baseOffset;
int remainSize = 160;
for (int i = 0; i < segments.length; i++) {
segments[i] = MemorySegmentFactory.wrap(new byte[32]);
int copy = Math.min(32 - posInSeg, remainSize);
row160.getSegments()[0].copyTo(160 - remainSize, segments[i], posInSeg, copy);
remainSize -= copy;
posInSeg = 0;
}
multiSegRow160.pointTo(segments, baseOffset, 160);
assertThat(multiSegRow160).isEqualTo(row160);
return multiSegRow160;
}
/**
* Get a binary row consisting of 2 segments. Its first segment is the same with the given input
* binary row, while its second segment is empty.
*/
public static BinaryRecordData getMultiSeg160BytesInOneSegRow(BinaryRecordData row160) {
MemorySegment[] segments = new MemorySegment[2];
segments[0] = row160.getSegments()[0];
segments[1] = MemorySegmentFactory.wrap(new byte[row160.getSegments()[0].size()]);
row160.pointTo(segments, 0, row160.getSizeInBytes());
return row160;
}
/** Split the given byte array into two memory segments. */
public static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
int newSize = (bytes.length + 1) / 2 + baseOffset;
MemorySegment[] ret = new MemorySegment[2];
ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
return ret;
}
/** A simple class for testing generic type getting / setting on data formats. */
public static class MyObj {
public int i;
public double j;
public MyObj(int i, double j) {
this.i = i;
this.j = j;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MyObj myObj = (MyObj) o;
return i == myObj.i && Double.compare(myObj.j, j) == 0;
}
@Override
public String toString() {
return "MyObj{" + "i=" + i + ", j=" + j + '}';
}
}
}

@ -0,0 +1,535 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.runtime.serializer.data.writer;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.RecordDataSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Test;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import static org.apache.flink.cdc.common.data.binary.BinaryStringData.fromString;
import static org.assertj.core.api.Assertions.assertThat;
/** Test of {@link BinaryArrayData} and {@link BinaryArrayWriter}. */
public class BinaryArrayWriterTest {
@Test
public void testArray() {
// 1.array test
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
writer.writeInt(0, 6);
writer.setNullInt(1);
writer.writeInt(2, 666);
writer.complete();
assertThat(6).isEqualTo(array.getInt(0));
assertThat(array.isNullAt(1)).isTrue();
assertThat(666).isEqualTo(array.getInt(2));
// 2.test write to binary row.
{
BinaryRecordData row2 = new BinaryRecordData(1);
BinaryRecordDataWriter writer2 = new BinaryRecordDataWriter(row2);
writer2.writeArray(0, array, new ArrayDataSerializer(DataTypes.INT()));
writer2.complete();
BinaryArrayData array2 = (BinaryArrayData) row2.getArray(0);
assertThat(array).isEqualTo(array2);
assertThat(6).isEqualTo(array2.getInt(0));
assertThat(array2.isNullAt(1)).isTrue();
assertThat(666).isEqualTo(array2.getInt(2));
}
// 3.test write var seg array to binary row.
{
BinaryArrayData array3 = splitArray(array);
BinaryRecordData row2 = new BinaryRecordData(1);
BinaryRecordDataWriter writer2 = new BinaryRecordDataWriter(row2);
writer2.writeArray(0, array3, new ArrayDataSerializer(DataTypes.INT()));
writer2.complete();
BinaryArrayData array2 = (BinaryArrayData) row2.getArray(0);
assertThat(array).isEqualTo(array2);
assertThat(6).isEqualTo(array2.getInt(0));
assertThat(array2.isNullAt(1)).isTrue();
assertThat(666).isEqualTo(array2.getInt(2));
}
}
@Test
public void testArrayTypes() {
{
// test bool
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
writer.setNullBoolean(0);
writer.writeBoolean(1, true);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getBoolean(1)).isTrue();
array.setBoolean(0, true);
assertThat(array.getBoolean(0)).isTrue();
array.setNullBoolean(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getBoolean(1)).isTrue();
newArray.setBoolean(0, true);
assertThat(newArray.getBoolean(0)).isTrue();
newArray.setNullBoolean(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setBoolean(0, true);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toBooleanArray()))
.isEqualTo(newArray);
}
{
// test byte
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
writer.setNullByte(0);
writer.writeByte(1, (byte) 25);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getByte(1)).isEqualTo((byte) 25);
array.setByte(0, (byte) 5);
assertThat(array.getByte(0)).isEqualTo((byte) 5);
array.setNullByte(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getByte(1)).isEqualTo((byte) 25);
newArray.setByte(0, (byte) 5);
assertThat(newArray.getByte(0)).isEqualTo((byte) 5);
newArray.setNullByte(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setByte(0, (byte) 3);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toByteArray()))
.isEqualTo(newArray);
}
{
// test short
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2);
writer.setNullShort(0);
writer.writeShort(1, (short) 25);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getShort(1)).isEqualTo((short) 25);
array.setShort(0, (short) 5);
assertThat(array.getShort(0)).isEqualTo((short) 5);
array.setNullShort(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getShort(1)).isEqualTo((short) 25);
newArray.setShort(0, (short) 5);
assertThat(newArray.getShort(0)).isEqualTo((short) 5);
newArray.setNullShort(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setShort(0, (short) 3);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toShortArray()))
.isEqualTo(newArray);
}
{
// test int
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
writer.setNullInt(0);
writer.writeInt(1, 25);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getInt(1)).isEqualTo(25);
array.setInt(0, 5);
assertThat(array.getInt(0)).isEqualTo(5);
array.setNullInt(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getInt(1)).isEqualTo(25);
newArray.setInt(0, 5);
assertThat(newArray.getInt(0)).isEqualTo(5);
newArray.setNullInt(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setInt(0, 3);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toIntArray()))
.isEqualTo(newArray);
}
{
// test long
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullLong(0);
writer.writeLong(1, 25);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getLong(1)).isEqualTo(25);
array.setLong(0, 5);
assertThat(array.getLong(0)).isEqualTo(5);
array.setNullLong(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getLong(1)).isEqualTo(25);
newArray.setLong(0, 5);
assertThat(newArray.getLong(0)).isEqualTo(5);
newArray.setNullLong(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setLong(0, 3);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toLongArray()))
.isEqualTo(newArray);
}
{
// test float
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
writer.setNullFloat(0);
writer.writeFloat(1, 25);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getFloat(1)).isEqualTo(25f);
array.setFloat(0, 5);
assertThat(array.getFloat(0)).isEqualTo(5f);
array.setNullFloat(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getFloat(1)).isEqualTo(25f);
newArray.setFloat(0, 5);
assertThat(newArray.getFloat(0)).isEqualTo(5f);
newArray.setNullFloat(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setFloat(0, 3);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toFloatArray()))
.isEqualTo(newArray);
}
{
// test double
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullDouble(0);
writer.writeDouble(1, 25);
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getDouble(1)).isEqualTo(25d);
array.setDouble(0, 5);
assertThat(array.getDouble(0)).isEqualTo(5d);
array.setNullDouble(0);
assertThat(array.isNullAt(0)).isTrue();
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getDouble(1)).isEqualTo(25d);
newArray.setDouble(0, 5);
assertThat(newArray.getDouble(0)).isEqualTo(5d);
newArray.setNullDouble(0);
assertThat(newArray.isNullAt(0)).isTrue();
newArray.setDouble(0, 3);
assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toDoubleArray()))
.isEqualTo(newArray);
}
{
// test string
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullAt(0);
writer.writeString(1, fromString("jaja"));
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getString(1)).isEqualTo(fromString("jaja"));
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getString(1)).isEqualTo(fromString("jaja"));
}
BinaryArrayData subArray = new BinaryArrayData();
BinaryArrayWriter subWriter = new BinaryArrayWriter(subArray, 2, 8);
subWriter.setNullAt(0);
subWriter.writeString(1, fromString("hehehe"));
subWriter.complete();
{
// test array
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullAt(0);
writer.writeArray(1, subArray, new ArrayDataSerializer(DataTypes.INT()));
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getArray(1)).isEqualTo(subArray);
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getArray(1)).isEqualTo(subArray);
}
{
// test map
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullAt(0);
writer.writeMap(
1,
BinaryMapData.valueOf(subArray, subArray),
new MapDataSerializer(DataTypes.INT(), DataTypes.INT()));
writer.complete();
assertThat(array.isNullAt(0)).isTrue();
assertThat(array.getMap(1)).isEqualTo(BinaryMapData.valueOf(subArray, subArray));
BinaryArrayData newArray = splitArray(array);
assertThat(newArray.isNullAt(0)).isTrue();
assertThat(newArray.getMap(1)).isEqualTo(BinaryMapData.valueOf(subArray, subArray));
}
}
@Test
public void testMap() {
BinaryArrayData array1 = new BinaryArrayData();
BinaryArrayWriter writer1 = new BinaryArrayWriter(array1, 3, 4);
writer1.writeInt(0, 6);
writer1.writeInt(1, 5);
writer1.writeInt(2, 666);
writer1.complete();
BinaryArrayData array2 = new BinaryArrayData();
BinaryArrayWriter writer2 = new BinaryArrayWriter(array2, 3, 8);
writer2.writeString(0, BinaryStringData.fromString("6"));
writer2.writeString(1, BinaryStringData.fromString("5"));
writer2.writeString(2, BinaryStringData.fromString("666"));
writer2.complete();
BinaryMapData binaryMap = BinaryMapData.valueOf(array1, array2);
BinaryRecordData row = new BinaryRecordData(1);
BinaryRecordDataWriter rowWriter = new BinaryRecordDataWriter(row);
rowWriter.writeMap(0, binaryMap, new MapDataSerializer(DataTypes.INT(), DataTypes.INT()));
rowWriter.complete();
BinaryMapData map = (BinaryMapData) row.getMap(0);
BinaryArrayData key = map.keyArray();
BinaryArrayData value = map.valueArray();
assertThat(map).isEqualTo(binaryMap);
assertThat(key).isEqualTo(array1);
assertThat(value).isEqualTo(array2);
assertThat(5).isEqualTo(key.getInt(1));
assertThat(BinaryStringData.fromString("5")).isEqualTo(value.getString(1));
}
private static BinaryArrayData splitArray(BinaryArrayData array) {
BinaryArrayData ret = new BinaryArrayData();
MemorySegment[] segments =
splitBytes(
BinarySegmentUtils.copyToBytes(
array.getSegments(), 0, array.getSizeInBytes()),
0);
ret.pointTo(segments, 0, array.getSizeInBytes());
return ret;
}
private static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
int newSize = (bytes.length + 1) / 2 + baseOffset;
MemorySegment[] ret = new MemorySegment[2];
ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
return ret;
}
@Test
public void testToArray() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 2);
writer.writeShort(0, (short) 5);
writer.writeShort(1, (short) 10);
writer.writeShort(2, (short) 15);
writer.complete();
short[] shorts = array.toShortArray();
assertThat(shorts[0]).isEqualTo((short) 5);
assertThat(shorts[1]).isEqualTo((short) 10);
assertThat(shorts[2]).isEqualTo((short) 15);
MemorySegment[] segments = splitBytes(writer.getSegments().getArray(), 3);
array.pointTo(segments, 3, array.getSizeInBytes());
assertThat(array.getShort(0)).isEqualTo((short) 5);
assertThat(array.getShort(1)).isEqualTo((short) 10);
assertThat(array.getShort(2)).isEqualTo((short) 15);
short[] shorts2 = array.toShortArray();
assertThat(shorts2[0]).isEqualTo((short) 5);
assertThat(shorts2[1]).isEqualTo((short) 10);
assertThat(shorts2[2]).isEqualTo((short) 15);
}
@Test
public void testDecimal() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
// 1.compact
{
int precision = 4;
int scale = 2;
writer.reset();
writer.writeDecimal(0, DecimalData.fromUnscaledLong(5, precision, scale), precision);
writer.setNullAt(1);
writer.complete();
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("0.05");
assertThat(array.isNullAt(1)).isTrue();
array.setDecimal(0, DecimalData.fromUnscaledLong(6, precision, scale), precision);
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("0.06");
}
// 2.not compact
{
int precision = 25;
int scale = 5;
DecimalData decimal1 =
DecimalData.fromBigDecimal(BigDecimal.valueOf(5.55), precision, scale);
DecimalData decimal2 =
DecimalData.fromBigDecimal(BigDecimal.valueOf(6.55), precision, scale);
writer.reset();
writer.writeDecimal(0, decimal1, precision);
writer.writeDecimal(1, null, precision);
writer.complete();
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("5.55000");
assertThat(array.isNullAt(1)).isTrue();
array.setDecimal(0, decimal2, precision);
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("6.55000");
}
}
@Test
public void testNested() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
BinaryRecordData row2 = new BinaryRecordData(2);
BinaryRecordDataWriter writer2 = new BinaryRecordDataWriter(row2);
writer2.writeString(0, BinaryStringData.fromString("1"));
writer2.writeInt(1, 1);
writer2.complete();
writer.writeRecord(0, row2, new RecordDataSerializer());
writer.setNullAt(1);
writer.complete();
RecordData nestedRow = array.getRecord(0, 2);
assertThat(nestedRow.getString(0).toString()).isEqualTo("1");
assertThat(nestedRow.getInt(1)).isEqualTo(1);
assertThat(array.isNullAt(1)).isTrue();
}
@Test
public void testBinary() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
byte[] bytes1 = new byte[] {1, -1, 5};
byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5};
writer.writeBinary(0, bytes1);
writer.writeBinary(1, bytes2);
writer.complete();
assertThat(array.getBinary(0)).isEqualTo(bytes1);
assertThat(array.getBinary(1)).isEqualTo(bytes2);
}
@Test
public void testTimestampData() {
BinaryArrayData array = new BinaryArrayData();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
// 2. not compact
{
final int precision = 9;
TimestampData timestamp1 =
TimestampData.fromLocalDateTime(
LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789));
TimestampData timestamp2 =
TimestampData.fromTimestamp(Timestamp.valueOf("1969-01-01 00:00:00.123456789"));
writer.reset();
writer.writeTimestamp(0, timestamp1, precision);
writer.writeTimestamp(1, null, precision);
writer.complete();
assertThat(array.getTimestamp(0, precision).toString())
.isEqualTo("1970-01-01T00:00:00.123456789");
assertThat(array.isNullAt(1)).isTrue();
array.setTimestamp(0, timestamp2, precision);
assertThat(array.getTimestamp(0, precision).toString())
.isEqualTo("1969-01-01T00:00:00.123456789");
}
}
}

@ -20,11 +20,24 @@ package org.apache.flink.cdc.runtime.serializer.schema;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/** A test for the {@link DataTypeSerializer}. */ /** A test for the {@link DataTypeSerializer}. */
public class DataTypeSerializerTest extends SerializerTestBase<DataType> { public class DataTypeSerializerTest extends SerializerTestBase<DataType> {
@Override @Override
@ -80,4 +93,39 @@ public class DataTypeSerializerTest extends SerializerTestBase<DataType> {
Arrays.stream(allTypes), Arrays.stream(allTypes).map(DataType::notNull)) Arrays.stream(allTypes), Arrays.stream(allTypes).map(DataType::notNull))
.toArray(DataType[]::new); .toArray(DataType[]::new);
} }
@Test
void testNestedRow() throws IOException {
RowType innerMostRowType = RowType.of(DataTypes.BIGINT());
RowType outerRowType =
RowType.of(DataTypes.ROW(DataTypes.FIELD("outerRow", innerMostRowType)));
DataTypeSerializer serializer = new DataTypeSerializer();
// Log to ensure INSTANCE is initialized
assertNotNull(RowTypeSerializer.INSTANCE, "RowTypeSerializer.INSTANCE is null");
System.out.println("RowTypeSerializer.INSTANCE is initialized");
// Copy the RowType
RowType copiedRow = (RowType) serializer.copy(outerRowType);
assertNotNull(copiedRow, "Copied RowType is null");
System.out.println("Copied RowType: " + copiedRow);
// Serialize the RowType
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputView outputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
serializer.serialize(outerRowType, outputView);
// Deserialize the RowType
ByteArrayInputStream byteArrayInputStream =
new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
DataInputView inputView = new DataInputViewStreamWrapper(byteArrayInputStream);
RowType deserializedRow = (RowType) serializer.deserialize(inputView);
// Assert that the deserialized RowType is not null and equals the original
assertNotNull(deserializedRow, "Deserialized RowType is null");
assertEquals(
outerRowType, deserializedRow, "Deserialized RowType does not match the original");
System.out.println("Deserialized RowType: " + deserializedRow);
}
} }

Loading…
Cancel
Save