) DataTypeUtils.toInternalConversionClass(elementType);
+ ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+ T[] values = (T[]) Array.newInstance(elementClass, size);
+ for (int i = 0; i < size; i++) {
+ if (!isNullAt(i)) {
+ values[i] = (T) elementGetter.getElementOrNull(this, i);
+ }
+ }
+ return values;
+ }
+
+ public BinaryArrayData copy() {
+ return copy(new BinaryArrayData());
+ }
+
+ public BinaryArrayData copy(BinaryArrayData reuse) {
+ byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
+ reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+ return reuse;
+ }
+
+ @Override
+ public int hashCode() {
+ return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
+ }
+
+ // ------------------------------------------------------------------------------------------
+ // Construction Utilities
+ // ------------------------------------------------------------------------------------------
+
+ public static BinaryArrayData fromPrimitiveArray(boolean[] arr) {
+ return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, arr.length, 1);
+ }
+
+ public static BinaryArrayData fromPrimitiveArray(byte[] arr) {
+ return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, arr.length, 1);
+ }
+
+ public static BinaryArrayData fromPrimitiveArray(short[] arr) {
+ return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2);
+ }
+
+ public static BinaryArrayData fromPrimitiveArray(int[] arr) {
+ return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4);
+ }
+
+ public static BinaryArrayData fromPrimitiveArray(long[] arr) {
+ return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 8);
+ }
+
+ public static BinaryArrayData fromPrimitiveArray(float[] arr) {
+ return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 4);
+ }
+
+ public static BinaryArrayData fromPrimitiveArray(double[] arr) {
+ return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 8);
+ }
+
+ private static BinaryArrayData fromPrimitiveArray(
+ Object arr, int offset, int length, int elementSize) {
+ final long headerInBytes = calculateHeaderInBytes(length);
+ final long valueRegionInBytes = elementSize * length;
+
+ // must align by 8 bytes
+ long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
+ if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
+ throw new UnsupportedOperationException(
+ "Cannot convert this array to unsafe format as " + "it's too big.");
+ }
+ long totalSize = totalSizeInLongs * 8;
+
+ final byte[] data = new byte[(int) totalSize];
+
+ UNSAFE.putInt(data, (long) BYTE_ARRAY_BASE_OFFSET, length);
+ UNSAFE.copyMemory(
+ arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes);
+
+ BinaryArrayData result = new BinaryArrayData();
+ result.pointTo(MemorySegmentFactory.wrap(data), 0, (int) totalSize);
+ return result;
+ }
+}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryMapData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryMapData.java
new file mode 100644
index 000000000..98cc19d34
--- /dev/null
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryMapData.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data.binary;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
+
+/**
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ *
+ * {@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
+ */
+@Internal
+public class BinaryMapData extends BinarySection implements MapData {
+ private final BinaryArrayData keys;
+ private final BinaryArrayData values;
+
+ public BinaryMapData() {
+ keys = new BinaryArrayData();
+ values = new BinaryArrayData();
+ }
+
+ public int size() {
+ return keys.size();
+ }
+
+ @Override
+ public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
+ // Read the numBytes of key array from the first 4 bytes.
+ final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
+ assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
+ final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
+ assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";
+
+ keys.pointTo(segments, offset + 4, keyArrayBytes);
+ values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
+
+ assert keys.size() == values.size();
+
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public BinaryArrayData keyArray() {
+ return keys;
+ }
+
+ public BinaryArrayData valueArray() {
+ return values;
+ }
+
+ public Map, ?> toJavaMap(DataType keyType, DataType valueType) {
+ Object[] keyArray = keys.toObjectArray(keyType);
+ Object[] valueArray = values.toObjectArray(valueType);
+
+ Map map = new HashMap<>();
+ for (int i = 0; i < keyArray.length; i++) {
+ map.put(keyArray[i], valueArray[i]);
+ }
+ return map;
+ }
+
+ public BinaryMapData copy() {
+ return copy(new BinaryMapData());
+ }
+
+ public BinaryMapData copy(BinaryMapData reuse) {
+ byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
+ reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+ return reuse;
+ }
+
+ @Override
+ public int hashCode() {
+ return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
+ }
+
+ // ------------------------------------------------------------------------------------------
+ // Construction Utilities
+ // ------------------------------------------------------------------------------------------
+
+ public static BinaryMapData valueOf(BinaryArrayData key, BinaryArrayData value) {
+ checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
+ byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
+ MemorySegment segment = MemorySegmentFactory.wrap(bytes);
+ segment.putInt(0, key.sizeInBytes);
+ key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
+ value.getSegments()[0].copyTo(
+ value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
+ BinaryMapData map = new BinaryMapData();
+ map.pointTo(segment, 0, bytes.length);
+ return map;
+ }
+}
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
index 0b02ad7e1..2cd4bc1f5 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java
@@ -200,12 +200,14 @@ public final class BinaryRecordData extends BinarySection implements RecordData,
@Override
public ArrayData getArray(int pos) {
- throw new UnsupportedOperationException("Not support ArrayData");
+ assertIndexIsValid(pos);
+ return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos));
}
@Override
public MapData getMap(int pos) {
- throw new UnsupportedOperationException("Not support MapData.");
+ assertIndexIsValid(pos);
+ return BinarySegmentUtils.readMapData(segments, offset, getLong(pos));
}
@Override
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinarySegmentUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinarySegmentUtils.java
index 079527f32..f8cf849bd 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinarySegmentUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinarySegmentUtils.java
@@ -18,8 +18,10 @@
package org.apache.flink.cdc.common.data.binary;
import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
@@ -318,7 +320,7 @@ public final class BinarySegmentUtils {
}
}
- static boolean equalsMultiSegments(
+ public static boolean equalsMultiSegments(
MemorySegment[] segments1,
int offset1,
MemorySegment[] segments2,
@@ -1154,4 +1156,24 @@ public final class BinarySegmentUtils {
}
return -1;
}
+
+ /** Gets an instance of {@link MapData} from underlying {@link MemorySegment}. */
+ public static MapData readMapData(
+ MemorySegment[] segments, int baseOffset, long offsetAndSize) {
+ final int size = ((int) offsetAndSize);
+ int offset = (int) (offsetAndSize >> 32);
+ BinaryMapData map = new BinaryMapData();
+ map.pointTo(segments, offset + baseOffset, size);
+ return map;
+ }
+
+ /** Gets an instance of {@link ArrayData} from underlying {@link MemorySegment}. */
+ public static ArrayData readArrayData(
+ MemorySegment[] segments, int baseOffset, long offsetAndSize) {
+ final int size = ((int) offsetAndSize);
+ int offset = (int) (offsetAndSize >> 32);
+ BinaryArrayData array = new BinaryArrayData();
+ array.pointTo(segments, offset + baseOffset, size);
+ return array;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 2e7e5f437..986125780 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -17,20 +17,31 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;
+import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
+import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.RowKind;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
@@ -118,7 +129,11 @@ public class PaimonWriterHelper {
break;
case ROW:
final int rowFieldCount = getFieldCount(fieldType);
- fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
+ fieldGetter = new BinaryFieldDataGetter(fieldPos, DataTypeRoot.ROW, rowFieldCount);
+ break;
+ case ARRAY:
+ case MAP:
+ fieldGetter = new BinaryFieldDataGetter(fieldPos, fieldType.getTypeRoot());
break;
default:
throw new IllegalArgumentException(
@@ -163,4 +178,121 @@ public class PaimonWriterHelper {
}
return genericRow;
}
+
+ /** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */
+ public static class BinaryFieldDataGetter implements RecordData.FieldGetter {
+ private final int fieldPos;
+ private final DataTypeRoot dataTypeRoot;
+ private final int rowFieldCount;
+
+ BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot) {
+ this(fieldPos, dataTypeRoot, -1);
+ }
+
+ BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot, int rowFieldCount) {
+ this.fieldPos = fieldPos;
+ this.dataTypeRoot = dataTypeRoot;
+ this.rowFieldCount = rowFieldCount;
+ }
+
+ @Override
+ public Object getFieldOrNull(RecordData row) {
+ switch (dataTypeRoot) {
+ case ARRAY:
+ return getArrayField(row);
+ case MAP:
+ return getMapField(row);
+ case ROW:
+ return getRecordField(row);
+ default:
+ throw new IllegalArgumentException("Unsupported field type: " + dataTypeRoot);
+ }
+ }
+
+ private Object getArrayField(RecordData row) {
+ ArrayData arrayData = row.getArray(fieldPos);
+ if (!(arrayData instanceof BinaryArrayData)) {
+ throw new IllegalArgumentException(
+ "Expected BinaryArrayData but was " + arrayData.getClass().getSimpleName());
+ }
+ BinaryArrayData binaryArrayData = (BinaryArrayData) arrayData;
+ return convertSegments(
+ binaryArrayData.getSegments(),
+ binaryArrayData.getOffset(),
+ binaryArrayData.getSizeInBytes(),
+ MemorySegmentUtils::readArrayData);
+ }
+
+ private Object getMapField(RecordData row) {
+ MapData mapData = row.getMap(fieldPos);
+ if (!(mapData instanceof BinaryMapData)) {
+ throw new IllegalArgumentException(
+ "Expected BinaryMapData but was " + mapData.getClass().getSimpleName());
+ }
+ BinaryMapData binaryMapData = (BinaryMapData) mapData;
+ return convertSegments(
+ binaryMapData.getSegments(),
+ binaryMapData.getOffset(),
+ binaryMapData.getSizeInBytes(),
+ MemorySegmentUtils::readMapData);
+ }
+
+ private Object getRecordField(RecordData row) {
+ RecordData recordData = row.getRow(fieldPos, rowFieldCount);
+ if (!(recordData instanceof BinaryRecordData)) {
+ throw new IllegalArgumentException(
+ "Expected BinaryRecordData but was "
+ + recordData.getClass().getSimpleName());
+ }
+ BinaryRecordData binaryRecordData = (BinaryRecordData) recordData;
+ return convertSegments(
+ binaryRecordData.getSegments(),
+ binaryRecordData.getOffset(),
+ binaryRecordData.getSizeInBytes(),
+ (segments, offset, sizeInBytes) ->
+ MemorySegmentUtils.readRowData(
+ segments, rowFieldCount, offset, sizeInBytes));
+ }
+
+ private T convertSegments(
+ MemorySegment[] segments,
+ int offset,
+ int sizeInBytes,
+ SegmentConverter converter) {
+ org.apache.paimon.memory.MemorySegment[] paimonMemorySegments =
+ new org.apache.paimon.memory.MemorySegment[segments.length];
+ for (int i = 0; i < segments.length; i++) {
+ MemorySegment currMemorySegment = segments[i];
+ ByteBuffer byteBuffer = currMemorySegment.wrap(0, currMemorySegment.size());
+
+ // Allocate a new byte array and copy the data from the ByteBuffer
+ byte[] bytes = new byte[currMemorySegment.size()];
+ byteBuffer.get(bytes);
+
+ paimonMemorySegments[i] = org.apache.paimon.memory.MemorySegment.wrap(bytes);
+ }
+ return converter.convert(paimonMemorySegments, offset, sizeInBytes);
+ }
+
+ private interface SegmentConverter {
+ T convert(
+ org.apache.paimon.memory.MemorySegment[] segments, int offset, int sizeInBytes);
+ }
+
+ /**
+ * Gets an instance of {@link InternalRow} from underlying {@link
+ * org.apache.paimon.memory.MemorySegment}.
+ */
+ public InternalRow readRowData(
+ org.apache.paimon.memory.MemorySegment[] segments,
+ int numFields,
+ int baseOffset,
+ long offsetAndSize) {
+ final int size = ((int) offsetAndSize);
+ int offset = (int) (offsetAndSize >> 32);
+ BinaryRow row = new BinaryRow(numFields);
+ row.pointTo(segments, offset + baseOffset, size);
+ return row;
+ }
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
index 01897dad5..28d9b54a6 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -18,9 +18,11 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.GenericMapData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -28,11 +30,14 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.NestedRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.RowKind;
import org.junit.jupiter.api.Assertions;
@@ -41,7 +46,9 @@ import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/** Tests for {@link PaimonWriterHelper}. */
public class PaimonWriterHelperTest {
@@ -171,4 +178,78 @@ public class PaimonWriterHelperTest {
genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
Assertions.assertEquals(genericRow.getRowKind(), RowKind.INSERT);
}
+
+ @Test
+ public void testConvertEventToGenericRowWithNestedRow() {
+ // Define the inner row type with an integer and a map
+ RowType innerRowType =
+ RowType.of(DataTypes.INT(), DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+
+ // Define the outer row type with a nested row
+ RowType rowType = RowType.of(DataTypes.ROW(innerRowType));
+
+ // Create a map serializer and a map data instance
+ MapDataSerializer mapDataserializer =
+ new MapDataSerializer(DataTypes.STRING(), DataTypes.STRING());
+ Map map = new HashMap<>();
+ map.put(BinaryStringData.fromString("key1"), BinaryStringData.fromString("value1"));
+ map.put(BinaryStringData.fromString("key2"), BinaryStringData.fromString("value2"));
+ GenericMapData genMapData = new GenericMapData(map);
+ BinaryMapData binMap = mapDataserializer.toBinaryMap(genMapData);
+
+ // Create inner row data
+ Object[] innerRowData =
+ new Object[] {
+ 42, // Integer field
+ binMap // Map field
+ };
+
+ // Generate inner BinaryRecordData
+ BinaryRecordData innerRecordData =
+ new BinaryRecordDataGenerator(innerRowType).generate(innerRowData);
+
+ // Create outer row data containing the inner record data
+ Object[] testData = new Object[] {innerRecordData};
+
+ // Generate outer BinaryRecordData
+ BinaryRecordData recordData = new BinaryRecordDataGenerator(rowType).generate(testData);
+
+ // Create schema and field getters
+ Schema schema = Schema.newBuilder().fromRowDataType(rowType).build();
+ List fieldGetters =
+ PaimonWriterHelper.createFieldGetters(schema, ZoneId.of("UTC+8"));
+
+ // Create a data change event
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(TableId.parse("database.table"), recordData);
+
+ // Convert the event to GenericRow
+ GenericRow genericRow =
+ PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, fieldGetters);
+
+ // Verify the nested row field
+ NestedRow innerRow = (NestedRow) genericRow.getField(0);
+ NestedRow nestedRow = (NestedRow) innerRow.getRow(0, 2);
+ int actual = nestedRow.getRow(0, 2).getInt(0); // 42
+ Assertions.assertEquals(42, actual);
+ Map expectedMap = new HashMap<>();
+ expectedMap.put(BinaryString.fromString("key1"), BinaryString.fromString("value1"));
+ expectedMap.put(BinaryString.fromString("key2"), BinaryString.fromString("value2"));
+ InternalMap internalMap = nestedRow.getMap(1);
+ Map extractedMap = extractMap(internalMap);
+
+ for (Map.Entry entry : expectedMap.entrySet()) {
+ Assertions.assertEquals(entry.getValue(), extractedMap.get(entry.getKey()));
+ }
+ }
+
+ private static Map extractMap(InternalMap internalMap) {
+ Map resultMap = new HashMap<>();
+ for (int i = 0; i < internalMap.size(); i++) {
+ BinaryString key = internalMap.keyArray().getString(i);
+ BinaryString value = internalMap.valueArray().getString(i);
+ resultMap.put(key, value);
+ }
+ return resultMap;
+ }
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/InternalSerializers.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/InternalSerializers.java
index cdf903059..cc1522303 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/InternalSerializers.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/InternalSerializers.java
@@ -20,9 +20,11 @@ package org.apache.flink.cdc.runtime.serializer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.types.ArrayType;
import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.MapType;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.DecimalDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.LocalZonedTimestampDataSerializer;
+import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.RecordDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.StringDataSerializer;
import org.apache.flink.cdc.runtime.serializer.data.TimestampDataSerializer;
@@ -79,6 +81,8 @@ public class InternalSerializers {
case ROW:
return new RecordDataSerializer();
case MAP:
+ MapType mapType = (MapType) type;
+ return new MapDataSerializer(mapType.getKeyType(), mapType.getValueType());
default:
throw new UnsupportedOperationException(
"Unsupported type '" + type + "' to get internal serializer");
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java
index 9ace5ba20..c230c8c9a 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/NullableSerializerWrapper.java
@@ -123,6 +123,10 @@ public class NullableSerializerWrapper extends TypeSerializer {
return new NullableSerializerWrapperSnapshot<>(innerSerializer);
}
+ public TypeSerializer getWrappedSerializer() {
+ return innerSerializer;
+ }
+
/** Serializer configuration snapshot for compatibility and format evolution. */
@SuppressWarnings("WeakerAccess")
public static final class NullableSerializerWrapperSnapshot
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java
index 570df8c3c..39939375d 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializer.java
@@ -25,11 +25,14 @@ import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.InstantiationUtil;
import org.apache.flink.cdc.runtime.serializer.InternalSerializers;
import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -45,6 +48,8 @@ public class ArrayDataSerializer extends TypeSerializer {
private final DataType eleType;
private final TypeSerializer eleSer;
private final ArrayData.ElementGetter elementGetter;
+ private transient BinaryArrayData reuseArray;
+ private transient BinaryArrayWriter reuseWriter;
public ArrayDataSerializer(DataType eleType) {
this(eleType, InternalSerializers.create(eleType));
@@ -130,6 +135,38 @@ public class ArrayDataSerializer extends TypeSerializer {
}
}
+ public BinaryArrayData toBinaryArray(ArrayData from) {
+ if (from instanceof BinaryArrayData) {
+ return (BinaryArrayData) from;
+ }
+
+ int numElements = from.size();
+ if (reuseArray == null) {
+ reuseArray = new BinaryArrayData();
+ }
+ if (reuseWriter == null || reuseWriter.getNumElements() != numElements) {
+ reuseWriter =
+ new BinaryArrayWriter(
+ reuseArray,
+ numElements,
+ BinaryArrayData.calculateFixLengthPartSize(eleType));
+ } else {
+ reuseWriter.reset();
+ }
+
+ for (int i = 0; i < numElements; i++) {
+ if (from.isNullAt(i)) {
+ reuseWriter.setNullAt(i, eleType);
+ } else {
+ BinaryWriter.write(
+ reuseWriter, i, elementGetter.getElementOrNull(from, i), eleType, eleSer);
+ }
+ }
+ reuseWriter.complete();
+
+ return reuseArray;
+ }
+
@Override
public ArrayData deserialize(DataInputView source) throws IOException {
int size = source.readInt();
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java
new file mode 100644
index 000000000..d8f92ee29
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializer.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
+import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.runtime.serializer.InternalSerializers;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/** Serializer for {@link MapData}. */
+@Internal
+public class MapDataSerializer extends TypeSerializer {
+
+ private final DataType keyType;
+ private final DataType valueType;
+
+ private final TypeSerializer keySerializer;
+ private final TypeSerializer valueSerializer;
+
+ private final ArrayData.ElementGetter keyGetter;
+ private final ArrayData.ElementGetter valueGetter;
+
+ private transient BinaryArrayData reuseKeyArray;
+ private transient BinaryArrayData reuseValueArray;
+ private transient BinaryArrayWriter reuseKeyWriter;
+ private transient BinaryArrayWriter reuseValueWriter;
+
+ public MapDataSerializer(DataType keyType, DataType valueType) {
+ this(
+ keyType,
+ valueType,
+ InternalSerializers.create(keyType),
+ InternalSerializers.create(valueType));
+ }
+
+ private MapDataSerializer(
+ DataType keyType,
+ DataType valueType,
+ TypeSerializer keySerializer,
+ TypeSerializer valueSerializer) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+
+ this.keyGetter = ArrayData.createElementGetter(keyType);
+ this.valueGetter = ArrayData.createElementGetter(valueType);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer duplicate() {
+ return new MapDataSerializer(
+ keyType, valueType, keySerializer.duplicate(), valueSerializer.duplicate());
+ }
+
+ @Override
+ public MapData createInstance() {
+ return new BinaryMapData();
+ }
+
+ /**
+ * NOTE: Map should be a HashMap, when we insert the key/value pairs of the TreeMap into a
+ * HashMap, problems maybe occur.
+ */
+ @Override
+ public MapData copy(MapData from) {
+ if (from instanceof BinaryMapData) {
+ return ((BinaryMapData) from).copy();
+ } else {
+ return toBinaryMap(from);
+ }
+ }
+
+ @Override
+ public MapData copy(MapData from, MapData reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(MapData record, DataOutputView target) throws IOException {
+ BinaryMapData binaryMap = toBinaryMap(record);
+ target.writeInt(binaryMap.getSizeInBytes());
+ BinarySegmentUtils.copyToView(
+ binaryMap.getSegments(), binaryMap.getOffset(), binaryMap.getSizeInBytes(), target);
+ }
+
+ public BinaryMapData toBinaryMap(MapData from) {
+ if (from instanceof BinaryMapData) {
+ return (BinaryMapData) from;
+ }
+
+ int numElements = from.size();
+ if (reuseKeyArray == null) {
+ reuseKeyArray = new BinaryArrayData();
+ }
+ if (reuseValueArray == null) {
+ reuseValueArray = new BinaryArrayData();
+ }
+ if (reuseKeyWriter == null || reuseKeyWriter.getNumElements() != numElements) {
+ reuseKeyWriter =
+ new BinaryArrayWriter(
+ reuseKeyArray,
+ numElements,
+ BinaryArrayData.calculateFixLengthPartSize(keyType));
+ } else {
+ reuseKeyWriter.reset();
+ }
+ if (reuseValueWriter == null || reuseValueWriter.getNumElements() != numElements) {
+ reuseValueWriter =
+ new BinaryArrayWriter(
+ reuseValueArray,
+ numElements,
+ BinaryArrayData.calculateFixLengthPartSize(valueType));
+ } else {
+ reuseValueWriter.reset();
+ }
+
+ ArrayData keyArray = from.keyArray();
+ ArrayData valueArray = from.valueArray();
+ for (int i = 0; i < from.size(); i++) {
+ Object key = keyGetter.getElementOrNull(keyArray, i);
+ Object value = valueGetter.getElementOrNull(valueArray, i);
+ if (key == null) {
+ reuseKeyWriter.setNullAt(i, keyType);
+ } else {
+ BinaryWriter.write(reuseKeyWriter, i, key, keyType, keySerializer);
+ }
+ if (value == null) {
+ reuseValueWriter.setNullAt(i, valueType);
+ } else {
+ BinaryWriter.write(reuseValueWriter, i, value, valueType, valueSerializer);
+ }
+ }
+
+ reuseKeyWriter.complete();
+ reuseValueWriter.complete();
+
+ return BinaryMapData.valueOf(reuseKeyArray, reuseValueArray);
+ }
+
+ @Override
+ public MapData deserialize(DataInputView source) throws IOException {
+ return deserializeReuse(new BinaryMapData(), source);
+ }
+
+ @Override
+ public MapData deserialize(MapData reuse, DataInputView source) throws IOException {
+ return deserializeReuse(
+ reuse instanceof BinaryMapData ? (BinaryMapData) reuse : new BinaryMapData(),
+ source);
+ }
+
+ private BinaryMapData deserializeReuse(BinaryMapData reuse, DataInputView source)
+ throws IOException {
+ int length = source.readInt();
+ byte[] bytes = new byte[length];
+ source.readFully(bytes);
+ reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int length = source.readInt();
+ target.writeInt(length);
+ target.write(source, length);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MapDataSerializer that = (MapDataSerializer) o;
+
+ return keyType.equals(that.keyType) && valueType.equals(that.valueType);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = keyType.hashCode();
+ result = 31 * result + valueType.hashCode();
+ return result;
+ }
+
+ @VisibleForTesting
+ public TypeSerializer getKeySerializer() {
+ return keySerializer;
+ }
+
+ @VisibleForTesting
+ public TypeSerializer getValueSerializer() {
+ return valueSerializer;
+ }
+
+ @Override
+ public TypeSerializerSnapshot snapshotConfiguration() {
+ return new MapDataSerializer.MapDataSerializerSnapshot(
+ keyType, valueType, keySerializer, valueSerializer);
+ }
+
+ /** {@link TypeSerializerSnapshot} for {@link MapDataSerializer}. */
+ public static final class MapDataSerializerSnapshot implements TypeSerializerSnapshot {
+ private static final int CURRENT_VERSION = 0;
+
+ private DataType keyType;
+ private DataType valueType;
+
+ private TypeSerializer keySerializer;
+ private TypeSerializer valueSerializer;
+
+ @SuppressWarnings("unused")
+ public MapDataSerializerSnapshot() {
+ // this constructor is used when restoring from a checkpoint/savepoint.
+ }
+
+ MapDataSerializerSnapshot(
+ DataType keyT, DataType valueT, TypeSerializer keySer, TypeSerializer valueSer) {
+ this.keyType = keyT;
+ this.valueType = valueT;
+
+ this.keySerializer = keySer;
+ this.valueSerializer = valueSer;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public void writeSnapshot(DataOutputView out) throws IOException {
+ DataOutputViewStream outStream = new DataOutputViewStream(out);
+ InstantiationUtil.serializeObject(outStream, keyType);
+ InstantiationUtil.serializeObject(outStream, valueType);
+ InstantiationUtil.serializeObject(outStream, keySerializer);
+ InstantiationUtil.serializeObject(outStream, valueSerializer);
+ }
+
+ @Override
+ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
+ throws IOException {
+ try {
+ DataInputViewStream inStream = new DataInputViewStream(in);
+ this.keyType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.valueType = InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.keySerializer =
+ InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ this.valueSerializer =
+ InstantiationUtil.deserializeObject(inStream, userCodeClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public TypeSerializer restoreSerializer() {
+ return new MapDataSerializer(keyType, valueType, keySerializer, valueSerializer);
+ }
+
+ @Override
+ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
+ TypeSerializer newSerializer) {
+ if (!(newSerializer instanceof MapDataSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ MapDataSerializer newMapDataSerializer = (MapDataSerializer) newSerializer;
+ if (!keyType.equals(newMapDataSerializer.keyType)
+ || !valueType.equals(newMapDataSerializer.valueType)
+ || !keySerializer.equals(newMapDataSerializer.keySerializer)
+ || !valueSerializer.equals(newMapDataSerializer.valueSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ } else {
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+ }
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/binary/BinaryRecordDataDataUtil.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/binary/BinaryRecordDataDataUtil.java
new file mode 100644
index 000000000..d3c71912e
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/binary/BinaryRecordDataDataUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data.binary;
+
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemoryUtils;
+
+/**
+ * Utilities for {@link BinaryRecordDataDataUtil}. Many of the methods in this class are used in
+ * code generation.
+ */
+public class BinaryRecordDataDataUtil {
+
+ public static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+ public static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+ public static final BinaryRecordData EMPTY_ROW = new BinaryRecordData(0);
+
+ static {
+ int size = EMPTY_ROW.getFixedLengthPartSize();
+ byte[] bytes = new byte[size];
+ EMPTY_ROW.pointTo(MemorySegmentFactory.wrap(bytes), 0, size);
+ }
+
+ public static boolean byteArrayEquals(byte[] left, byte[] right, int length) {
+ return byteArrayEquals(left, BYTE_ARRAY_BASE_OFFSET, right, BYTE_ARRAY_BASE_OFFSET, length);
+ }
+
+ public static boolean byteArrayEquals(
+ Object left, long leftOffset, Object right, long rightOffset, int length) {
+ int i = 0;
+
+ while (i <= length - 8) {
+ if (UNSAFE.getLong(left, leftOffset + i) != UNSAFE.getLong(right, rightOffset + i)) {
+ return false;
+ }
+ i += 8;
+ }
+
+ while (i < length) {
+ if (UNSAFE.getByte(left, leftOffset + i) != UNSAFE.getByte(right, rightOffset + i)) {
+ return false;
+ }
+ i += 1;
+ }
+ return true;
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/util/MapDataUtil.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/util/MapDataUtil.java
new file mode 100644
index 000000000..6f8117ecf
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/util/MapDataUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data.util;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.types.DataType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Utilities for {@link MapData}. */
+public final class MapDataUtil {
+
+ /**
+ * Converts a {@link MapData} into Java {@link Map}, the keys and values of the Java map still
+ * holds objects of internal data structures.
+ */
+ public static Map convertToJavaMap(
+ MapData map, DataType keyType, DataType valueType) {
+ ArrayData keyArray = map.keyArray();
+ ArrayData valueArray = map.valueArray();
+ Map javaMap = new HashMap<>();
+ ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType);
+ ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+ for (int i = 0; i < map.size(); i++) {
+ Object key = keyGetter.getElementOrNull(keyArray, i);
+ Object value = valueGetter.getElementOrNull(valueArray, i);
+ javaMap.put(key, value);
+ }
+ return javaMap;
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
index 44636fd7e..8a7b4fa04 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java
@@ -27,11 +27,14 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryFormat;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
+import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -103,12 +106,16 @@ abstract class AbstractBinaryWriter implements BinaryWriter {
@Override
public void writeArray(int pos, ArrayData input, ArrayDataSerializer serializer) {
- throw new UnsupportedOperationException("Not support array data.");
+ BinaryArrayData binary = serializer.toBinaryArray(input);
+ writeSegmentsToVarLenPart(
+ pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}
@Override
- public void writeMap(int pos, MapData input, TypeSerializer serializer) {
- throw new UnsupportedOperationException("Not support map data.");
+ public void writeMap(int pos, MapData input, MapDataSerializer serializer) {
+ BinaryMapData binary = serializer.toBinaryMap(input);
+ writeSegmentsToVarLenPart(
+ pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}
private DataOutputViewStreamWrapper getOutputView() {
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryArrayWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryArrayWriter.java
new file mode 100644
index 000000000..a2e79d52f
--- /dev/null
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryArrayWriter.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data.writer;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
+import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.io.Serializable;
+
+/** Writer for binary array. See {@link BinaryArrayData}. */
+@Internal
+public final class BinaryArrayWriter extends AbstractBinaryWriter {
+
+ private final int nullBitsSizeInBytes;
+ private final BinaryArrayData array;
+ private final int numElements;
+ private int fixedSize;
+
+ public BinaryArrayWriter(BinaryArrayData array, int numElements, int elementSize) {
+ this.nullBitsSizeInBytes = BinaryArrayData.calculateHeaderInBytes(numElements);
+ this.fixedSize =
+ roundNumberOfBytesToNearestWord(nullBitsSizeInBytes + elementSize * numElements);
+ this.cursor = fixedSize;
+ this.numElements = numElements;
+
+ this.segment = MemorySegmentFactory.wrap(new byte[fixedSize]);
+ this.segment.putInt(0, numElements);
+ this.array = array;
+ }
+
+ /** First, reset. */
+ @Override
+ public void reset() {
+ this.cursor = fixedSize;
+ for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
+ segment.putLong(i, 0L);
+ }
+ this.segment.putInt(0, numElements);
+ }
+
+ @Override
+ public void setNullBit(int ordinal) {
+ BinarySegmentUtils.bitSet(segment, 4, ordinal);
+ }
+
+ public void setNullBoolean(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putBoolean(getElementOffset(ordinal, 1), false);
+ }
+
+ public void setNullByte(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.put(getElementOffset(ordinal, 1), (byte) 0);
+ }
+
+ public void setNullShort(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putShort(getElementOffset(ordinal, 2), (short) 0);
+ }
+
+ public void setNullInt(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putInt(getElementOffset(ordinal, 4), 0);
+ }
+
+ public void setNullLong(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putLong(getElementOffset(ordinal, 8), (long) 0);
+ }
+
+ public void setNullFloat(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putFloat(getElementOffset(ordinal, 4), (float) 0);
+ }
+
+ public void setNullDouble(int ordinal) {
+ setNullBit(ordinal);
+ // put zero into the corresponding field when set null
+ segment.putDouble(getElementOffset(ordinal, 8), (double) 0);
+ }
+
+ @Override
+ public void setNullAt(int ordinal) {
+ setNullLong(ordinal);
+ }
+
+ /**
+ * @deprecated Use {@link #createNullSetter(DataType)} for avoiding logical types during
+ * runtime.
+ */
+ @Deprecated
+ public void setNullAt(int pos, DataType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ setNullBoolean(pos);
+ break;
+ case TINYINT:
+ setNullByte(pos);
+ break;
+ case SMALLINT:
+ setNullShort(pos);
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ setNullInt(pos);
+ break;
+ case BIGINT:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ setNullLong(pos);
+ break;
+ case FLOAT:
+ setNullFloat(pos);
+ break;
+ case DOUBLE:
+ setNullDouble(pos);
+ break;
+ default:
+ setNullAt(pos);
+ }
+ }
+
+ private int getElementOffset(int pos, int elementSize) {
+ return nullBitsSizeInBytes + elementSize * pos;
+ }
+
+ @Override
+ public int getFieldOffset(int pos) {
+ return getElementOffset(pos, 8);
+ }
+
+ @Override
+ public void setOffsetAndSize(int pos, int offset, long size) {
+ final long offsetAndSize = ((long) offset << 32) | size;
+ segment.putLong(getElementOffset(pos, 8), offsetAndSize);
+ }
+
+ @Override
+ public void writeBoolean(int pos, boolean value) {
+ segment.putBoolean(getElementOffset(pos, 1), value);
+ }
+
+ @Override
+ public void writeByte(int pos, byte value) {
+ segment.put(getElementOffset(pos, 1), value);
+ }
+
+ @Override
+ public void writeShort(int pos, short value) {
+ segment.putShort(getElementOffset(pos, 2), value);
+ }
+
+ @Override
+ public void writeInt(int pos, int value) {
+ segment.putInt(getElementOffset(pos, 4), value);
+ }
+
+ @Override
+ public void writeLong(int pos, long value) {
+ segment.putLong(getElementOffset(pos, 8), value);
+ }
+
+ @Override
+ public void writeFloat(int pos, float value) {
+ if (Float.isNaN(value)) {
+ value = Float.NaN;
+ }
+ segment.putFloat(getElementOffset(pos, 4), value);
+ }
+
+ @Override
+ public void writeDouble(int pos, double value) {
+ if (Double.isNaN(value)) {
+ value = Double.NaN;
+ }
+ segment.putDouble(getElementOffset(pos, 8), value);
+ }
+
+ @Override
+ public void afterGrow() {
+ array.pointTo(segment, 0, segment.size());
+ }
+
+ /** Finally, complete write to set real size to row. */
+ @Override
+ public void complete() {
+ array.pointTo(segment, 0, cursor);
+ }
+
+ public int getNumElements() {
+ return numElements;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates an for accessor setting the elements of an array writer to {@code null} during
+ * runtime.
+ *
+ * @param elementType the element type of the array
+ */
+ public static BinaryArrayWriter.NullSetter createNullSetter(DataType elementType) {
+ // ordered by type root definition
+ switch (elementType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ case DECIMAL:
+ case BIGINT:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case ARRAY:
+ case MAP:
+ case ROW:
+ return BinaryArrayWriter::setNullLong;
+ case BOOLEAN:
+ return BinaryArrayWriter::setNullBoolean;
+ case TINYINT:
+ return BinaryArrayWriter::setNullByte;
+ case SMALLINT:
+ return BinaryArrayWriter::setNullShort;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return BinaryArrayWriter::setNullInt;
+ case FLOAT:
+ return BinaryArrayWriter::setNullFloat;
+ case DOUBLE:
+ return BinaryArrayWriter::setNullDouble;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ throw new UnsupportedOperationException();
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ /**
+ * Accessor for setting the elements of an array writer to {@code null} during runtime.
+ *
+ * @see #createNullSetter(DataType)
+ */
+ public interface NullSetter extends Serializable {
+ void setNull(BinaryArrayWriter writer, int pos);
+ }
+}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
index ebd6c4ed2..f25626de9 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java
@@ -32,7 +32,9 @@ import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
+import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
/**
* Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write
@@ -76,7 +78,7 @@ public interface BinaryWriter {
void writeArray(int pos, ArrayData value, ArrayDataSerializer serializer);
- void writeMap(int pos, MapData value, TypeSerializer serializer);
+ void writeMap(int pos, MapData value, MapDataSerializer serializer);
void writeRecord(int pos, RecordData value, TypeSerializer serializer);
@@ -131,10 +133,16 @@ public interface BinaryWriter {
writer.writeDecimal(pos, (DecimalData) o, decimalType.getPrecision());
break;
case ARRAY:
+ if (serializer instanceof NullableSerializerWrapper) {
+ serializer = ((NullableSerializerWrapper) serializer).getWrappedSerializer();
+ }
writer.writeArray(pos, (ArrayData) o, (ArrayDataSerializer) serializer);
break;
case MAP:
- writer.writeMap(pos, (MapData) o, (TypeSerializer) serializer);
+ if (serializer instanceof NullableSerializerWrapper) {
+ serializer = ((NullableSerializerWrapper) serializer).getWrappedSerializer();
+ }
+ writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer);
break;
case ROW:
writer.writeRecord(pos, (RecordData) o, (TypeSerializer) serializer);
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataFieldSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataFieldSerializer.java
index a709dcb72..fb0b4a512 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataFieldSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataFieldSerializer.java
@@ -39,7 +39,14 @@ public class DataFieldSerializer extends TypeSerializerSingleton {
public static final DataFieldSerializer INSTANCE = new DataFieldSerializer();
private final StringSerializer stringSerializer = StringSerializer.INSTANCE;
- private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer();
+ private DataTypeSerializer dataTypeSerializer;
+
+ private DataTypeSerializer getDataTypeSerializer() {
+ if (dataTypeSerializer == null) {
+ dataTypeSerializer = new DataTypeSerializer();
+ }
+ return dataTypeSerializer;
+ }
@Override
public boolean isImmutableType() {
@@ -55,7 +62,7 @@ public class DataFieldSerializer extends TypeSerializerSingleton {
public DataField copy(DataField from) {
return new DataField(
stringSerializer.copy(from.getName()),
- dataTypeSerializer.copy(from.getType()),
+ getDataTypeSerializer().copy(from.getType()),
stringSerializer.copy(from.getDescription()));
}
@@ -72,14 +79,14 @@ public class DataFieldSerializer extends TypeSerializerSingleton {
@Override
public void serialize(DataField record, DataOutputView target) throws IOException {
stringSerializer.serialize(record.getName(), target);
- dataTypeSerializer.serialize(record.getType(), target);
+ getDataTypeSerializer().serialize(record.getType(), target);
stringSerializer.serialize(record.getDescription(), target);
}
@Override
public DataField deserialize(DataInputView source) throws IOException {
String name = stringSerializer.deserialize(source);
- DataType type = dataTypeSerializer.deserialize(source);
+ DataType type = getDataTypeSerializer().deserialize(source);
String desc = stringSerializer.deserialize(source);
return new DataField(name, type, desc);
}
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
index 5d6779cc0..214824132 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializer.java
@@ -55,7 +55,14 @@ public class DataTypeSerializer extends TypeSerializer {
private final EnumSerializer enumSerializer =
new EnumSerializer<>(DataTypeClass.class);
- private final RowTypeSerializer rowTypeSerializer = RowTypeSerializer.INSTANCE;
+ private RowTypeSerializer rowTypeSerializer;
+
+ private RowTypeSerializer getRowTypeSerializer() {
+ if (rowTypeSerializer == null) {
+ rowTypeSerializer = RowTypeSerializer.INSTANCE;
+ }
+ return rowTypeSerializer;
+ }
@Override
public boolean isImmutableType() {
@@ -75,7 +82,7 @@ public class DataTypeSerializer extends TypeSerializer {
@Override
public DataType copy(DataType from) {
if (from instanceof RowType) {
- return rowTypeSerializer.copy((RowType) from);
+ return getRowTypeSerializer().copy((RowType) from);
}
return from;
}
@@ -94,7 +101,7 @@ public class DataTypeSerializer extends TypeSerializer {
public void serialize(DataType record, DataOutputView target) throws IOException {
if (record instanceof RowType) {
enumSerializer.serialize(DataTypeClass.ROW, target);
- rowTypeSerializer.serialize((RowType) record, target);
+ getRowTypeSerializer().serialize((RowType) record, target);
} else if (record instanceof BinaryType) {
enumSerializer.serialize(DataTypeClass.BINARY, target);
target.writeBoolean(record.isNullable());
@@ -178,7 +185,7 @@ public class DataTypeSerializer extends TypeSerializer {
public DataType deserialize(DataInputView source) throws IOException {
DataTypeClass dataTypeClass = enumSerializer.deserialize(source);
if (dataTypeClass == DataTypeClass.ROW) {
- return rowTypeSerializer.deserialize(source);
+ return getRowTypeSerializer().deserialize(source);
}
boolean isNullable = source.readBoolean();
switch (dataTypeClass) {
@@ -255,12 +262,12 @@ public class DataTypeSerializer extends TypeSerializer {
}
DataTypeSerializer that = (DataTypeSerializer) o;
return Objects.equals(enumSerializer, that.enumSerializer)
- && Objects.equals(rowTypeSerializer, that.rowTypeSerializer);
+ && Objects.equals(getRowTypeSerializer(), that.getRowTypeSerializer());
}
@Override
public int hashCode() {
- return Objects.hash(enumSerializer, rowTypeSerializer);
+ return Objects.hash(enumSerializer, getRowTypeSerializer());
}
@Override
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/RowTypeSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/RowTypeSerializer.java
index f2aa2aa5d..68523f809 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/RowTypeSerializer.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/RowTypeSerializer.java
@@ -38,8 +38,18 @@ public class RowTypeSerializer extends TypeSerializerSingleton {
/** Sharable instance of the TableIdSerializer. */
public static final RowTypeSerializer INSTANCE = new RowTypeSerializer();
- private final ListSerializer fieldsSerializer =
- new ListSerializer<>(DataFieldSerializer.INSTANCE);
+ private volatile ListSerializer fieldsSerializer;
+
+ private ListSerializer getFieldsSerializer() {
+ if (fieldsSerializer == null) {
+ synchronized (this) {
+ if (fieldsSerializer == null) {
+ fieldsSerializer = new ListSerializer<>(DataFieldSerializer.INSTANCE);
+ }
+ }
+ }
+ return fieldsSerializer;
+ }
@Override
public boolean isImmutableType() {
@@ -53,7 +63,7 @@ public class RowTypeSerializer extends TypeSerializerSingleton {
@Override
public RowType copy(RowType from) {
- return new RowType(from.isNullable(), fieldsSerializer.copy(from.getFields()));
+ return new RowType(from.isNullable(), getFieldsSerializer().copy(from.getFields()));
}
@Override
@@ -69,13 +79,13 @@ public class RowTypeSerializer extends TypeSerializerSingleton {
@Override
public void serialize(RowType record, DataOutputView target) throws IOException {
target.writeBoolean(record.isNullable());
- fieldsSerializer.serialize(record.getFields(), target);
+ getFieldsSerializer().serialize(record.getFields(), target);
}
@Override
public RowType deserialize(DataInputView source) throws IOException {
boolean nullable = source.readBoolean();
- return new RowType(nullable, fieldsSerializer.deserialize(source));
+ return new RowType(nullable, getFieldsSerializer().deserialize(source));
}
@Override
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.java
index 5004fa4f2..cb57c5459 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.java
@@ -24,7 +24,6 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.serializer.InternalSerializers;
-import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryWriter;
@@ -51,7 +50,6 @@ public class BinaryRecordDataGenerator {
dataTypes,
Arrays.stream(dataTypes)
.map(InternalSerializers::create)
- .map(NullableSerializerWrapper::new)
.toArray(TypeSerializer[]::new));
}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializerTest.java
index 61bcb6ccf..2015b9a5e 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializerTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/ArrayDataSerializerTest.java
@@ -19,12 +19,24 @@ package org.apache.flink.cdc.runtime.serializer.data;
import org.apache.flink.cdc.common.data.ArrayData;
import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.MapData;
import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
import org.apache.flink.testutils.DeeplyEqualsChecker;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
/** A test for the {@link ArrayDataSerializer}. */
class ArrayDataSerializerTest extends SerializerTestBase {
@@ -79,4 +91,75 @@ class ArrayDataSerializerTest extends SerializerTestBase {
})
};
}
+
+ static BinaryArrayData createArray(String... vs) {
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 8);
+ for (int i = 0; i < vs.length; i++) {
+ writer.writeString(i, BinaryStringData.fromString(vs[i]));
+ }
+ writer.complete();
+ return array;
+ }
+
+ @Test
+ public void testToBinaryArrayWithNestedTypes() {
+ // Create a nested ArrayData
+ Map map = new HashMap<>();
+ map.put(BinaryStringData.fromString("key1"), BinaryStringData.fromString("value1"));
+ map.put(BinaryStringData.fromString("key2"), BinaryStringData.fromString("value2"));
+ GenericMapData genMapData = new GenericMapData(map);
+
+ ArrayData innerArrayData = new GenericArrayData(new Object[] {genMapData});
+
+ // Serialize to BinaryArrayData
+ ArrayDataSerializer serializer =
+ new ArrayDataSerializer(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+ BinaryArrayData binaryArrayData = serializer.toBinaryArray(innerArrayData);
+
+ // Verify the conversion
+ MapData mapData = binaryArrayData.getMap(0);
+ Assertions.assertEquals(2, mapData.size());
+ }
+
+ @Test
+ public void testToBinaryArrayWithDeeplyNestedTypes() {
+ // Create a nested structure: MapData containing ArrayData elements
+ Map nestedMap = new HashMap<>();
+ nestedMap.put(
+ BinaryStringData.fromString("key1"), new GenericArrayData(new Object[] {42, 43}));
+ nestedMap.put(
+ BinaryStringData.fromString("key2"), new GenericArrayData(new Object[] {44, 45}));
+
+ GenericMapData genMapData = new GenericMapData(nestedMap);
+
+ // Create an outer ArrayData containing the nested MapData
+ ArrayData outerArrayData = new GenericArrayData(new Object[] {genMapData});
+
+ // Serialize to BinaryArrayData
+ ArrayDataSerializer serializer =
+ new ArrayDataSerializer(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.ARRAY(DataTypes.INT())));
+ BinaryArrayData binaryArrayData = serializer.toBinaryArray(outerArrayData);
+
+ // Verify the conversion
+ MapData mapData = binaryArrayData.getMap(0);
+ assertEquals(2, mapData.size());
+
+ // Check nested arrays in map
+ ArrayData keys = mapData.keyArray();
+ ArrayData values = mapData.valueArray();
+
+ // Check the first key-value pair
+ int keyIndex = keys.getString(0).toString().equals("key1") ? 0 : 1;
+ ArrayData arrayData1 = values.getArray(keyIndex);
+ assertEquals(42, arrayData1.getInt(0));
+ assertEquals(43, arrayData1.getInt(1));
+
+ // Check the second key-value pair
+ keyIndex = keys.getString(0).toString().equals("key2") ? 0 : 1;
+ ArrayData arrayData2 = values.getArray(keyIndex);
+ assertEquals(44, arrayData2.getInt(0));
+ assertEquals(45, arrayData2.getInt(1));
+ }
}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java
new file mode 100644
index 000000000..ed6f1a847
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/MapDataSerializerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.data.GenericMapData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryArrayWriter;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.cdc.runtime.serializer.data.util.MapDataUtil.convertToJavaMap;
+
+/** A test for the {@link MapDataSerializer}. */
+public class MapDataSerializerTest extends SerializerTestBase {
+ private static final DataType INT = DataTypes.INT();
+ private static final DataType STRING = DataTypes.STRING();
+
+ public MapDataSerializerTest() {
+ super(
+ new DeeplyEqualsChecker()
+ .withCustomCheck(
+ (o1, o2) -> o1 instanceof MapData && o2 instanceof MapData,
+ (o1, o2, checker) ->
+ // Better is more proper to compare the maps after changing
+ // them to Java maps
+ // instead of binary maps. For example, consider the
+ // following two maps:
+ // {1: 'a', 2: 'b', 3: 'c'} and {3: 'c', 2: 'b', 1: 'a'}
+ // These are actually the same maps, but their key / value
+ // order will be
+ // different when stored as binary maps, and the equalsTo
+ // method of binary
+ // map will return false.
+ convertToJavaMap((MapData) o1, INT, STRING)
+ .equals(
+ convertToJavaMap(
+ (MapData) o2, INT, STRING))));
+ }
+
+ /** @return MapDataSerializer */
+ @Override
+ protected TypeSerializer createSerializer() {
+ return new MapDataSerializer(INT, STRING);
+ }
+
+ /**
+ * Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method.
+ *
+ * The expected length should be positive, for fix-length data types, or {@code -1} for
+ * variable-length types.
+ */
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ /** @return MapData clazz */
+ @Override
+ protected Class getTypeClass() {
+ return MapData.class;
+ }
+
+ /** @return MapData[] */
+ @Override
+ protected MapData[] getTestData() {
+ Map first = new HashMap<>();
+ first.put(1, BinaryStringData.fromString(""));
+ return new MapData[] {
+ new GenericMapData(first),
+ new CustomMapData(first),
+ BinaryMapData.valueOf(
+ createArray(1, 2), ArrayDataSerializerTest.createArray("11", "haa")),
+ BinaryMapData.valueOf(
+ createArray(1, 3, 4), ArrayDataSerializerTest.createArray("11", "haa", "ke")),
+ BinaryMapData.valueOf(
+ createArray(1, 4, 2), ArrayDataSerializerTest.createArray("11", "haa", "ke")),
+ BinaryMapData.valueOf(
+ createArray(1, 5, 6, 7),
+ ArrayDataSerializerTest.createArray("11", "lele", "haa", "ke"))
+ };
+ }
+
+ private static BinaryArrayData createArray(int... vs) {
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 4);
+ for (int i = 0; i < vs.length; i++) {
+ writer.writeInt(i, vs[i]);
+ }
+ writer.complete();
+ return array;
+ }
+
+ /** A simple custom implementation for {@link MapData}. */
+ public static class CustomMapData implements MapData {
+
+ private final Map, ?> map;
+
+ public CustomMapData(Map, ?> map) {
+ this.map = map;
+ }
+
+ public Object get(Object key) {
+ return map.get(key);
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public ArrayData keyArray() {
+ Object[] keys = map.keySet().toArray();
+ return new GenericArrayData(keys);
+ }
+
+ @Override
+ public ArrayData valueArray() {
+ Object[] values = map.values().toArray();
+ return new GenericArrayData(values);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof CustomMapData)) {
+ return false;
+ }
+ return map.equals(((CustomMapData) o).map);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(map);
+ }
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/binary/BinarySegmentUtilsTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/binary/BinarySegmentUtilsTest.java
new file mode 100644
index 000000000..825837f37
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/binary/BinarySegmentUtilsTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data.binary;
+
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
+import org.apache.flink.cdc.runtime.serializer.data.util.DataFormatTestUtil;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.junit.Test;
+
+import static org.apache.flink.cdc.runtime.serializer.data.binary.BinaryRecordDataDataUtil.BYTE_ARRAY_BASE_OFFSET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utilities for binary data segments which heavily uses {@link MemorySegment}. */
+public class BinarySegmentUtilsTest {
+ @Test
+ public void testCopy() {
+ // test copy the content of the latter Seg
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
+ segments[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
+
+ byte[] bytes = BinarySegmentUtils.copyToBytes(segments, 4, 2);
+ assertThat(bytes).isEqualTo(new byte[] {12, 15});
+ }
+
+ @Test
+ public void testEquals() {
+ // test copy the content of the latter Seg
+ MemorySegment[] segments1 = new MemorySegment[3];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
+ segments1[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
+ segments1[2] = MemorySegmentFactory.wrap(new byte[] {1, 1, 1});
+
+ MemorySegment[] segments2 = new MemorySegment[2];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[] {6, 0, 2, 5});
+ segments2[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15, 18});
+
+ assertThat(BinarySegmentUtils.equalsMultiSegments(segments1, 0, segments2, 0, 0)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 3)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 6)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 7)).isFalse();
+ }
+
+ @Test
+ public void testBoundaryByteArrayEquals() {
+ byte[] bytes1 = new byte[5];
+ bytes1[3] = 81;
+ byte[] bytes2 = new byte[100];
+ bytes2[3] = 81;
+ bytes2[4] = 81;
+
+ assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2, 4)).isTrue();
+ assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2, 5)).isFalse();
+ assertThat(BinaryRecordDataDataUtil.byteArrayEquals(bytes1, bytes2, 0)).isTrue();
+ }
+
+ @Test
+ public void testBoundaryEquals() {
+ BinaryRecordData row24 = DataFormatTestUtil.get24BytesBinaryRow();
+ BinaryRecordData row160 = DataFormatTestUtil.get160BytesBinaryRow();
+ BinaryRecordData varRow160 = DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160);
+ BinaryRecordData varRow160InOne = DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160);
+
+ assertThat(varRow160InOne).isEqualTo(row160);
+ assertThat(varRow160InOne).isEqualTo(varRow160);
+ assertThat(varRow160).isEqualTo(row160);
+ assertThat(varRow160).isEqualTo(varRow160InOne);
+
+ assertThat(row160).isNotEqualTo(row24);
+ assertThat(varRow160).isNotEqualTo(row24);
+ assertThat(varRow160InOne).isNotEqualTo(row24);
+
+ assertThat(BinarySegmentUtils.equals(row24.getSegments(), 0, row160.getSegments(), 0, 0))
+ .isTrue();
+ assertThat(BinarySegmentUtils.equals(row24.getSegments(), 0, varRow160.getSegments(), 0, 0))
+ .isTrue();
+
+ // test var segs
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ MemorySegment[] segments2 = new MemorySegment[3];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+ segments1[0].put(9, (byte) 1);
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 14, 14)).isFalse();
+ segments2[1].put(7, (byte) 1);
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 14, 14)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 2, segments2, 16, 14)).isTrue();
+ assertThat(BinarySegmentUtils.equals(segments1, 2, segments2, 16, 16)).isTrue();
+
+ segments2[2].put(7, (byte) 1);
+ assertThat(BinarySegmentUtils.equals(segments1, 2, segments2, 32, 14)).isTrue();
+ }
+
+ @Test
+ public void testBoundaryCopy() {
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[0].put(15, (byte) 5);
+ segments1[1].put(15, (byte) 6);
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+
+ BinarySegmentUtils.copyToBytes(segments1, 0, bytes, 0, 64);
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 0, 64)).isTrue();
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+
+ BinarySegmentUtils.copyToBytes(segments1, 32, bytes, 0, 14);
+ assertThat(BinarySegmentUtils.equals(segments1, 32, segments2, 0, 14)).isTrue();
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+
+ BinarySegmentUtils.copyToBytes(segments1, 34, bytes, 0, 14);
+ assertThat(BinarySegmentUtils.equals(segments1, 34, segments2, 0, 14)).isTrue();
+ }
+ }
+
+ @Test
+ public void testCopyToUnsafe() {
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[0].put(15, (byte) 5);
+ segments1[1].put(15, (byte) 6);
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+
+ BinarySegmentUtils.copyToUnsafe(segments1, 0, bytes, BYTE_ARRAY_BASE_OFFSET, 64);
+ assertThat(BinarySegmentUtils.equals(segments1, 0, segments2, 0, 64)).isTrue();
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+
+ BinarySegmentUtils.copyToUnsafe(segments1, 32, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
+ assertThat(BinarySegmentUtils.equals(segments1, 32, segments2, 0, 14)).isTrue();
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+
+ BinarySegmentUtils.copyToUnsafe(segments1, 34, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
+ assertThat(BinarySegmentUtils.equals(segments1, 34, segments2, 0, 14)).isTrue();
+ }
+ }
+
+ @Test
+ public void testFind() {
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ MemorySegment[] segments2 = new MemorySegment[3];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+ assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 0)).isEqualTo(34);
+ assertThat(BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 15)).isEqualTo(-1);
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/util/DataFormatTestUtil.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/util/DataFormatTestUtil.java
new file mode 100644
index 000000000..71b3c84e6
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/util/DataFormatTestUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data.util;
+
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Utils for testing data formats. */
+public class DataFormatTestUtil {
+
+ /** Get a binary row of 24 bytes long. */
+ public static BinaryRecordData get24BytesBinaryRow() {
+ // header (8 bytes) + 2 * string in fixed-length part (8 bytes each)
+ BinaryRecordData row = new BinaryRecordData(2);
+ BinaryRecordDataWriter writer = new BinaryRecordDataWriter(row);
+ writer.writeString(0, BinaryStringData.fromString(RandomStringUtils.randomNumeric(2)));
+ writer.writeString(1, BinaryStringData.fromString(RandomStringUtils.randomNumeric(2)));
+ writer.complete();
+ return row;
+ }
+
+ /** Get a binary row of 160 bytes long. */
+ public static BinaryRecordData get160BytesBinaryRow() {
+ // header (8 bytes) +
+ // 72 byte length string (8 bytes in fixed-length, 72 bytes in variable-length) +
+ // 64 byte length string (8 bytes in fixed-length, 64 bytes in variable-length)
+ BinaryRecordData row = new BinaryRecordData(2);
+ BinaryRecordDataWriter writer = new BinaryRecordDataWriter(row);
+ writer.writeString(0, BinaryStringData.fromString(RandomStringUtils.randomNumeric(72)));
+ writer.writeString(1, BinaryStringData.fromString(RandomStringUtils.randomNumeric(64)));
+ writer.complete();
+ return row;
+ }
+
+ /**
+ * Get a binary row consisting of 6 segments. The bytes of the returned row is the same with the
+ * given input binary row.
+ */
+ public static BinaryRecordData getMultiSeg160BytesBinaryRow(BinaryRecordData row160) {
+ BinaryRecordData multiSegRow160 = new BinaryRecordData(2);
+ MemorySegment[] segments = new MemorySegment[6];
+ int baseOffset = 8;
+ int posInSeg = baseOffset;
+ int remainSize = 160;
+ for (int i = 0; i < segments.length; i++) {
+ segments[i] = MemorySegmentFactory.wrap(new byte[32]);
+ int copy = Math.min(32 - posInSeg, remainSize);
+ row160.getSegments()[0].copyTo(160 - remainSize, segments[i], posInSeg, copy);
+ remainSize -= copy;
+ posInSeg = 0;
+ }
+ multiSegRow160.pointTo(segments, baseOffset, 160);
+ assertThat(multiSegRow160).isEqualTo(row160);
+ return multiSegRow160;
+ }
+
+ /**
+ * Get a binary row consisting of 2 segments. Its first segment is the same with the given input
+ * binary row, while its second segment is empty.
+ */
+ public static BinaryRecordData getMultiSeg160BytesInOneSegRow(BinaryRecordData row160) {
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = row160.getSegments()[0];
+ segments[1] = MemorySegmentFactory.wrap(new byte[row160.getSegments()[0].size()]);
+ row160.pointTo(segments, 0, row160.getSizeInBytes());
+ return row160;
+ }
+
+ /** Split the given byte array into two memory segments. */
+ public static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
+ int newSize = (bytes.length + 1) / 2 + baseOffset;
+ MemorySegment[] ret = new MemorySegment[2];
+ ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+ ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+ ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+ ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
+ return ret;
+ }
+
+ /** A simple class for testing generic type getting / setting on data formats. */
+ public static class MyObj {
+ public int i;
+ public double j;
+
+ public MyObj(int i, double j) {
+ this.i = i;
+ this.j = j;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MyObj myObj = (MyObj) o;
+
+ return i == myObj.i && Double.compare(myObj.j, j) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return "MyObj{" + "i=" + i + ", j=" + j + '}';
+ }
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryArrayWriterTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryArrayWriterTest.java
new file mode 100644
index 000000000..cb9cec3ef
--- /dev/null
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryArrayWriterTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.serializer.data.writer;
+
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
+import org.apache.flink.cdc.common.data.binary.BinaryMapData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinarySegmentUtils;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer;
+import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer;
+import org.apache.flink.cdc.runtime.serializer.data.RecordDataSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+
+import static org.apache.flink.cdc.common.data.binary.BinaryStringData.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test of {@link BinaryArrayData} and {@link BinaryArrayWriter}. */
+public class BinaryArrayWriterTest {
+ @Test
+ public void testArray() {
+ // 1.array test
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 4);
+
+ writer.writeInt(0, 6);
+ writer.setNullInt(1);
+ writer.writeInt(2, 666);
+ writer.complete();
+
+ assertThat(6).isEqualTo(array.getInt(0));
+ assertThat(array.isNullAt(1)).isTrue();
+ assertThat(666).isEqualTo(array.getInt(2));
+
+ // 2.test write to binary row.
+ {
+ BinaryRecordData row2 = new BinaryRecordData(1);
+ BinaryRecordDataWriter writer2 = new BinaryRecordDataWriter(row2);
+ writer2.writeArray(0, array, new ArrayDataSerializer(DataTypes.INT()));
+ writer2.complete();
+
+ BinaryArrayData array2 = (BinaryArrayData) row2.getArray(0);
+ assertThat(array).isEqualTo(array2);
+ assertThat(6).isEqualTo(array2.getInt(0));
+ assertThat(array2.isNullAt(1)).isTrue();
+ assertThat(666).isEqualTo(array2.getInt(2));
+ }
+
+ // 3.test write var seg array to binary row.
+ {
+ BinaryArrayData array3 = splitArray(array);
+
+ BinaryRecordData row2 = new BinaryRecordData(1);
+ BinaryRecordDataWriter writer2 = new BinaryRecordDataWriter(row2);
+ writer2.writeArray(0, array3, new ArrayDataSerializer(DataTypes.INT()));
+ writer2.complete();
+
+ BinaryArrayData array2 = (BinaryArrayData) row2.getArray(0);
+ assertThat(array).isEqualTo(array2);
+ assertThat(6).isEqualTo(array2.getInt(0));
+ assertThat(array2.isNullAt(1)).isTrue();
+ assertThat(666).isEqualTo(array2.getInt(2));
+ }
+ }
+
+ @Test
+ public void testArrayTypes() {
+ {
+ // test bool
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
+ writer.setNullBoolean(0);
+ writer.writeBoolean(1, true);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getBoolean(1)).isTrue();
+ array.setBoolean(0, true);
+ assertThat(array.getBoolean(0)).isTrue();
+ array.setNullBoolean(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getBoolean(1)).isTrue();
+ newArray.setBoolean(0, true);
+ assertThat(newArray.getBoolean(0)).isTrue();
+ newArray.setNullBoolean(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setBoolean(0, true);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toBooleanArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test byte
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 1);
+ writer.setNullByte(0);
+ writer.writeByte(1, (byte) 25);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getByte(1)).isEqualTo((byte) 25);
+ array.setByte(0, (byte) 5);
+ assertThat(array.getByte(0)).isEqualTo((byte) 5);
+ array.setNullByte(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getByte(1)).isEqualTo((byte) 25);
+ newArray.setByte(0, (byte) 5);
+ assertThat(newArray.getByte(0)).isEqualTo((byte) 5);
+ newArray.setNullByte(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setByte(0, (byte) 3);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toByteArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test short
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 2);
+ writer.setNullShort(0);
+ writer.writeShort(1, (short) 25);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getShort(1)).isEqualTo((short) 25);
+ array.setShort(0, (short) 5);
+ assertThat(array.getShort(0)).isEqualTo((short) 5);
+ array.setNullShort(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getShort(1)).isEqualTo((short) 25);
+ newArray.setShort(0, (short) 5);
+ assertThat(newArray.getShort(0)).isEqualTo((short) 5);
+ newArray.setNullShort(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setShort(0, (short) 3);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toShortArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test int
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
+ writer.setNullInt(0);
+ writer.writeInt(1, 25);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getInt(1)).isEqualTo(25);
+ array.setInt(0, 5);
+ assertThat(array.getInt(0)).isEqualTo(5);
+ array.setNullInt(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getInt(1)).isEqualTo(25);
+ newArray.setInt(0, 5);
+ assertThat(newArray.getInt(0)).isEqualTo(5);
+ newArray.setNullInt(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setInt(0, 3);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toIntArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test long
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+ writer.setNullLong(0);
+ writer.writeLong(1, 25);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getLong(1)).isEqualTo(25);
+ array.setLong(0, 5);
+ assertThat(array.getLong(0)).isEqualTo(5);
+ array.setNullLong(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getLong(1)).isEqualTo(25);
+ newArray.setLong(0, 5);
+ assertThat(newArray.getLong(0)).isEqualTo(5);
+ newArray.setNullLong(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setLong(0, 3);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toLongArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test float
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 4);
+ writer.setNullFloat(0);
+ writer.writeFloat(1, 25);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getFloat(1)).isEqualTo(25f);
+ array.setFloat(0, 5);
+ assertThat(array.getFloat(0)).isEqualTo(5f);
+ array.setNullFloat(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getFloat(1)).isEqualTo(25f);
+ newArray.setFloat(0, 5);
+ assertThat(newArray.getFloat(0)).isEqualTo(5f);
+ newArray.setNullFloat(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setFloat(0, 3);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toFloatArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test double
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+ writer.setNullDouble(0);
+ writer.writeDouble(1, 25);
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getDouble(1)).isEqualTo(25d);
+ array.setDouble(0, 5);
+ assertThat(array.getDouble(0)).isEqualTo(5d);
+ array.setNullDouble(0);
+ assertThat(array.isNullAt(0)).isTrue();
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getDouble(1)).isEqualTo(25d);
+ newArray.setDouble(0, 5);
+ assertThat(newArray.getDouble(0)).isEqualTo(5d);
+ newArray.setNullDouble(0);
+ assertThat(newArray.isNullAt(0)).isTrue();
+
+ newArray.setDouble(0, 3);
+ assertThat(BinaryArrayData.fromPrimitiveArray(newArray.toDoubleArray()))
+ .isEqualTo(newArray);
+ }
+
+ {
+ // test string
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+ writer.setNullAt(0);
+ writer.writeString(1, fromString("jaja"));
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getString(1)).isEqualTo(fromString("jaja"));
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getString(1)).isEqualTo(fromString("jaja"));
+ }
+
+ BinaryArrayData subArray = new BinaryArrayData();
+ BinaryArrayWriter subWriter = new BinaryArrayWriter(subArray, 2, 8);
+ subWriter.setNullAt(0);
+ subWriter.writeString(1, fromString("hehehe"));
+ subWriter.complete();
+
+ {
+ // test array
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+ writer.setNullAt(0);
+ writer.writeArray(1, subArray, new ArrayDataSerializer(DataTypes.INT()));
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getArray(1)).isEqualTo(subArray);
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getArray(1)).isEqualTo(subArray);
+ }
+
+ {
+ // test map
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+ writer.setNullAt(0);
+ writer.writeMap(
+ 1,
+ BinaryMapData.valueOf(subArray, subArray),
+ new MapDataSerializer(DataTypes.INT(), DataTypes.INT()));
+ writer.complete();
+
+ assertThat(array.isNullAt(0)).isTrue();
+ assertThat(array.getMap(1)).isEqualTo(BinaryMapData.valueOf(subArray, subArray));
+
+ BinaryArrayData newArray = splitArray(array);
+ assertThat(newArray.isNullAt(0)).isTrue();
+ assertThat(newArray.getMap(1)).isEqualTo(BinaryMapData.valueOf(subArray, subArray));
+ }
+ }
+
+ @Test
+ public void testMap() {
+ BinaryArrayData array1 = new BinaryArrayData();
+ BinaryArrayWriter writer1 = new BinaryArrayWriter(array1, 3, 4);
+ writer1.writeInt(0, 6);
+ writer1.writeInt(1, 5);
+ writer1.writeInt(2, 666);
+ writer1.complete();
+
+ BinaryArrayData array2 = new BinaryArrayData();
+ BinaryArrayWriter writer2 = new BinaryArrayWriter(array2, 3, 8);
+ writer2.writeString(0, BinaryStringData.fromString("6"));
+ writer2.writeString(1, BinaryStringData.fromString("5"));
+ writer2.writeString(2, BinaryStringData.fromString("666"));
+ writer2.complete();
+
+ BinaryMapData binaryMap = BinaryMapData.valueOf(array1, array2);
+
+ BinaryRecordData row = new BinaryRecordData(1);
+ BinaryRecordDataWriter rowWriter = new BinaryRecordDataWriter(row);
+ rowWriter.writeMap(0, binaryMap, new MapDataSerializer(DataTypes.INT(), DataTypes.INT()));
+ rowWriter.complete();
+
+ BinaryMapData map = (BinaryMapData) row.getMap(0);
+ BinaryArrayData key = map.keyArray();
+ BinaryArrayData value = map.valueArray();
+
+ assertThat(map).isEqualTo(binaryMap);
+ assertThat(key).isEqualTo(array1);
+ assertThat(value).isEqualTo(array2);
+
+ assertThat(5).isEqualTo(key.getInt(1));
+ assertThat(BinaryStringData.fromString("5")).isEqualTo(value.getString(1));
+ }
+
+ private static BinaryArrayData splitArray(BinaryArrayData array) {
+ BinaryArrayData ret = new BinaryArrayData();
+ MemorySegment[] segments =
+ splitBytes(
+ BinarySegmentUtils.copyToBytes(
+ array.getSegments(), 0, array.getSizeInBytes()),
+ 0);
+ ret.pointTo(segments, 0, array.getSizeInBytes());
+ return ret;
+ }
+
+ private static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
+ int newSize = (bytes.length + 1) / 2 + baseOffset;
+ MemorySegment[] ret = new MemorySegment[2];
+ ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+ ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+ ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+ ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
+ return ret;
+ }
+
+ @Test
+ public void testToArray() {
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 2);
+ writer.writeShort(0, (short) 5);
+ writer.writeShort(1, (short) 10);
+ writer.writeShort(2, (short) 15);
+ writer.complete();
+
+ short[] shorts = array.toShortArray();
+ assertThat(shorts[0]).isEqualTo((short) 5);
+ assertThat(shorts[1]).isEqualTo((short) 10);
+ assertThat(shorts[2]).isEqualTo((short) 15);
+
+ MemorySegment[] segments = splitBytes(writer.getSegments().getArray(), 3);
+ array.pointTo(segments, 3, array.getSizeInBytes());
+ assertThat(array.getShort(0)).isEqualTo((short) 5);
+ assertThat(array.getShort(1)).isEqualTo((short) 10);
+ assertThat(array.getShort(2)).isEqualTo((short) 15);
+ short[] shorts2 = array.toShortArray();
+ assertThat(shorts2[0]).isEqualTo((short) 5);
+ assertThat(shorts2[1]).isEqualTo((short) 10);
+ assertThat(shorts2[2]).isEqualTo((short) 15);
+ }
+
+ @Test
+ public void testDecimal() {
+
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+
+ // 1.compact
+ {
+ int precision = 4;
+ int scale = 2;
+ writer.reset();
+ writer.writeDecimal(0, DecimalData.fromUnscaledLong(5, precision, scale), precision);
+ writer.setNullAt(1);
+ writer.complete();
+
+ assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("0.05");
+ assertThat(array.isNullAt(1)).isTrue();
+ array.setDecimal(0, DecimalData.fromUnscaledLong(6, precision, scale), precision);
+ assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("0.06");
+ }
+
+ // 2.not compact
+ {
+ int precision = 25;
+ int scale = 5;
+ DecimalData decimal1 =
+ DecimalData.fromBigDecimal(BigDecimal.valueOf(5.55), precision, scale);
+ DecimalData decimal2 =
+ DecimalData.fromBigDecimal(BigDecimal.valueOf(6.55), precision, scale);
+
+ writer.reset();
+ writer.writeDecimal(0, decimal1, precision);
+ writer.writeDecimal(1, null, precision);
+ writer.complete();
+
+ assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("5.55000");
+ assertThat(array.isNullAt(1)).isTrue();
+ array.setDecimal(0, decimal2, precision);
+ assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("6.55000");
+ }
+ }
+
+ @Test
+ public void testNested() {
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+
+ BinaryRecordData row2 = new BinaryRecordData(2);
+ BinaryRecordDataWriter writer2 = new BinaryRecordDataWriter(row2);
+ writer2.writeString(0, BinaryStringData.fromString("1"));
+ writer2.writeInt(1, 1);
+ writer2.complete();
+
+ writer.writeRecord(0, row2, new RecordDataSerializer());
+ writer.setNullAt(1);
+ writer.complete();
+
+ RecordData nestedRow = array.getRecord(0, 2);
+ assertThat(nestedRow.getString(0).toString()).isEqualTo("1");
+ assertThat(nestedRow.getInt(1)).isEqualTo(1);
+ assertThat(array.isNullAt(1)).isTrue();
+ }
+
+ @Test
+ public void testBinary() {
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+ byte[] bytes1 = new byte[] {1, -1, 5};
+ byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5};
+ writer.writeBinary(0, bytes1);
+ writer.writeBinary(1, bytes2);
+ writer.complete();
+
+ assertThat(array.getBinary(0)).isEqualTo(bytes1);
+ assertThat(array.getBinary(1)).isEqualTo(bytes2);
+ }
+
+ @Test
+ public void testTimestampData() {
+ BinaryArrayData array = new BinaryArrayData();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
+
+ // 2. not compact
+ {
+ final int precision = 9;
+ TimestampData timestamp1 =
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789));
+ TimestampData timestamp2 =
+ TimestampData.fromTimestamp(Timestamp.valueOf("1969-01-01 00:00:00.123456789"));
+
+ writer.reset();
+ writer.writeTimestamp(0, timestamp1, precision);
+ writer.writeTimestamp(1, null, precision);
+ writer.complete();
+
+ assertThat(array.getTimestamp(0, precision).toString())
+ .isEqualTo("1970-01-01T00:00:00.123456789");
+ assertThat(array.isNullAt(1)).isTrue();
+ array.setTimestamp(0, timestamp2, precision);
+ assertThat(array.getTimestamp(0, precision).toString())
+ .isEqualTo("1969-01-01T00:00:00.123456789");
+ }
+ }
+}
diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializerTest.java
index 3e6a2b91f..3e822dcef 100644
--- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializerTest.java
+++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/schema/DataTypeSerializerTest.java
@@ -20,11 +20,24 @@ package org.apache.flink.cdc.runtime.serializer.schema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.serializer.SerializerTestBase;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Stream;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
/** A test for the {@link DataTypeSerializer}. */
public class DataTypeSerializerTest extends SerializerTestBase {
@Override
@@ -80,4 +93,39 @@ public class DataTypeSerializerTest extends SerializerTestBase {
Arrays.stream(allTypes), Arrays.stream(allTypes).map(DataType::notNull))
.toArray(DataType[]::new);
}
+
+ @Test
+ void testNestedRow() throws IOException {
+ RowType innerMostRowType = RowType.of(DataTypes.BIGINT());
+ RowType outerRowType =
+ RowType.of(DataTypes.ROW(DataTypes.FIELD("outerRow", innerMostRowType)));
+
+ DataTypeSerializer serializer = new DataTypeSerializer();
+
+ // Log to ensure INSTANCE is initialized
+ assertNotNull(RowTypeSerializer.INSTANCE, "RowTypeSerializer.INSTANCE is null");
+ System.out.println("RowTypeSerializer.INSTANCE is initialized");
+
+ // Copy the RowType
+ RowType copiedRow = (RowType) serializer.copy(outerRowType);
+ assertNotNull(copiedRow, "Copied RowType is null");
+ System.out.println("Copied RowType: " + copiedRow);
+
+ // Serialize the RowType
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputView outputView = new DataOutputViewStreamWrapper(byteArrayOutputStream);
+ serializer.serialize(outerRowType, outputView);
+
+ // Deserialize the RowType
+ ByteArrayInputStream byteArrayInputStream =
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
+ DataInputView inputView = new DataInputViewStreamWrapper(byteArrayInputStream);
+ RowType deserializedRow = (RowType) serializer.deserialize(inputView);
+
+ // Assert that the deserialized RowType is not null and equals the original
+ assertNotNull(deserializedRow, "Deserialized RowType is null");
+ assertEquals(
+ outerRowType, deserializedRow, "Deserialized RowType does not match the original");
+ System.out.println("Deserialized RowType: " + deserializedRow);
+ }
}