diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java index 3555ef96b..832163b4b 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java @@ -142,6 +142,18 @@ public final class TimestampData implements Comparable { return new TimestampData(milliseconds, nanosOfMillisecond); } + /** + * Creates an instance of {@link TimestampData} from milliseconds. + * + *

The nanos-of-millisecond field will be set to zero. + * + * @param milliseconds the number of milliseconds since {@code 1970-01-01 00:00:00}; a negative + * number is the number of milliseconds before {@code 1970-01-01 00:00:00} + */ + public static TimestampData fromEpochMillis(long milliseconds) { + return new TimestampData(milliseconds, 0); + } + /** * Creates an instance of {@link TimestampData} from an instance of {@link Timestamp}. * diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryFormat.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryFormat.java new file mode 100644 index 000000000..117b87c1c --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryFormat.java @@ -0,0 +1,62 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.core.memory.MemorySegment; + +import com.ververica.cdc.common.annotation.Internal; + +/** Binary format spanning {@link MemorySegment}s. */ +@Internal +public interface BinaryFormat { + + /** + * It decides whether to put data in FixLenPart or VarLenPart. See more in {@link + * BinaryRecordData}. + * + *

If len is less than 8, its binary format is: 1-bit mark(1) = 1, 7-bits len, and 7-bytes + * data. Data is stored in fix-length part. + * + *

If len is greater or equal to 8, its binary format is: 1-bit mark(1) = 0, 31-bits offset + * to the data, and 4-bytes length of data. Data is stored in variable-length part. + */ + int MAX_FIX_PART_DATA_SIZE = 7; + /** + * To get the mark in highest bit of long. Form: 10000000 00000000 ... (8 bytes) + * + *

This is used to decide whether the data is stored in fixed-length part or variable-length + * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information. + */ + long HIGHEST_FIRST_BIT = 0x80L << 56; + /** + * To get the 7 bits length in second bit to eighth bit out of a long. Form: 01111111 00000000 + * ... (8 bytes) + * + *

This is used to get the length of the data which is stored in this long. see {@link + * #MAX_FIX_PART_DATA_SIZE} for more information. + */ + long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56; + + /** Gets the underlying {@link MemorySegment}s this binary format spans. */ + MemorySegment[] getSegments(); + + /** Gets the start offset of this binary data in the {@link MemorySegment}s. */ + int getOffset(); + + /** Gets the size in bytes of this binary data. */ + int getSizeInBytes(); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryRecordData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryRecordData.java new file mode 100644 index 000000000..c6f34450e --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryRecordData.java @@ -0,0 +1,340 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.ArrayData; +import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.LocalZonedTimestampData; +import com.ververica.cdc.common.data.MapData; +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.StringData; +import com.ververica.cdc.common.data.TimestampData; +import com.ververica.cdc.common.data.ZonedTimestampData; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DecimalType; +import com.ververica.cdc.common.types.LocalZonedTimestampType; +import com.ververica.cdc.common.types.TimestampType; +import com.ververica.cdc.common.types.ZonedTimestampType; + +import java.nio.ByteOrder; + +import static com.ververica.cdc.common.types.DataTypeRoot.DECIMAL; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of {@link RecordData} which is backed by {@link MemorySegment} instead of + * Object. It can significantly reduce the serialization/deserialization of Java objects. + * + *

A Row has two part: Fixed-length part and variable-length part. + * + *

Fixed-length part contains 1 byte header and null bit set and field values. Null bit set is + * used for null tracking and is aligned to 8-byte word boundaries. `Field values` holds + * fixed-length primitive types and variable-length values which can be stored in 8 bytes inside. If + * it do not fit the variable-length field, then store the length and offset of variable-length + * part. + * + *

Fixed-length part will certainly fall into a MemorySegment, which will speed up the read and + * write of field. During the write phase, if the target memory segment has less space than fixed + * length part size, we will skip the space. So the number of fields in a single Row cannot exceed + * the capacity of a single MemorySegment, if there are too many fields, we suggest that user set a + * bigger pageSize of MemorySegment. + * + *

Variable-length part may fall into multiple MemorySegments. + */ +@Internal +public final class BinaryRecordData extends BinarySection implements RecordData, NullAwareGetters { + + public static final boolean LITTLE_ENDIAN = + (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : ~(0xFFL << 56L); + public static final int HEADER_SIZE_IN_BITS = 8; + + public static final String TIMESTAMP_DELIMITER = "|"; + + public static int calculateBitSetWidthInBytes(int arity) { + return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8; + } + + public static int calculateFixPartSizeInBytes(int arity) { + return calculateBitSetWidthInBytes(arity) + 8 * arity; + } + + /** + * If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place + * updates. If it is variable-length field, can't use this method, because the underlying data + * is stored continuously. + */ + public static boolean isInFixedLengthPart(DataType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case BIGINT: + case FLOAT: + case DOUBLE: + return true; + case DECIMAL: + return DecimalData.isCompact(((DecimalType) type).getPrecision()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TimestampData.isCompact(((TimestampType) type).getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return LocalZonedTimestampData.isCompact( + ((LocalZonedTimestampType) type).getPrecision()); + case TIMESTAMP_WITH_TIME_ZONE: + return ZonedTimestampData.isCompact(((ZonedTimestampType) type).getPrecision()); + default: + return false; + } + } + + public static boolean isMutable(DataType type) { + return isInFixedLengthPart(type) || type.getTypeRoot() == DECIMAL; + } + + private final int arity; + private final int nullBitsSizeInBytes; + + public BinaryRecordData(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + public int getFixedLengthPartSize() { + return nullBitsSizeInBytes + 8 * arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public boolean isNullAt(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.bitGet(segments[0], offset, pos + HEADER_SIZE_IN_BITS); + } + + @Override + public boolean getBoolean(int pos) { + assertIndexIsValid(pos); + return segments[0].getBoolean(getFieldOffset(pos)); + } + + @Override + public byte getByte(int pos) { + assertIndexIsValid(pos); + return segments[0].get(getFieldOffset(pos)); + } + + @Override + public short getShort(int pos) { + assertIndexIsValid(pos); + return segments[0].getShort(getFieldOffset(pos)); + } + + @Override + public int getInt(int pos) { + assertIndexIsValid(pos); + return segments[0].getInt(getFieldOffset(pos)); + } + + @Override + public long getLong(int pos) { + assertIndexIsValid(pos); + return segments[0].getLong(getFieldOffset(pos)); + } + + @Override + public float getFloat(int pos) { + assertIndexIsValid(pos); + return segments[0].getFloat(getFieldOffset(pos)); + } + + @Override + public double getDouble(int pos) { + assertIndexIsValid(pos); + return segments[0].getDouble(getFieldOffset(pos)); + } + + @Override + public StringData getString(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readStringData(segments, offset, fieldOffset, offsetAndLen); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + assertIndexIsValid(pos); + + if (DecimalData.isCompact(precision)) { + return DecimalData.fromUnscaledLong( + segments[0].getLong(getFieldOffset(pos)), precision, scale); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndSize = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readDecimalData( + segments, offset, offsetAndSize, precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + return TimestampData.fromEpochMillis(segments[0].getLong(getFieldOffset(pos))); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); + } + + @Override + public ZonedTimestampData getZonedTimestamp(int pos, int precision) { + String[] parts = getString(pos).toString().split(TIMESTAMP_DELIMITER); + return ZonedTimestampData.of( + Long.parseLong(parts[0]), Integer.parseInt(parts[1]), parts[2]); + } + + @Override + public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) { + assertIndexIsValid(pos); + + if (LocalZonedTimestampData.isCompact(precision)) { + return LocalZonedTimestampData.fromEpochMillis( + segments[0].getLong(getFieldOffset(pos))); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readLocalZonedTimestampData( + segments, offset, offsetAndNanoOfMilli); + } + + @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen); + } + + @Override + public ArrayData getArray(int pos) { + throw new UnsupportedOperationException("Not support ArrayData"); + } + + @Override + public MapData getMap(int pos) { + throw new UnsupportedOperationException("Not support MapData."); + } + + @Override + public RecordData getRow(int pos, int numFields) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readRecordData(segments, numFields, offset, getLong(pos)); + } + + /** The bit is 1 when the field is null. Default is 0. */ + @Override + public boolean anyNull() { + // Skip the header. + if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) { + return true; + } + for (int i = 8; i < nullBitsSizeInBytes; i += 8) { + if (segments[0].getLong(i) != 0) { + return true; + } + } + return false; + } + + @Override + public boolean anyNull(int[] fields) { + for (int field : fields) { + if (isNullAt(field)) { + return true; + } + } + return false; + } + + public BinaryRecordData copy() { + return copy(new BinaryRecordData(arity)); + } + + public BinaryRecordData copy(BinaryRecordData reuse) { + return copyInternal(reuse); + } + + private BinaryRecordData copyInternal(BinaryRecordData reuse) { + byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes); + reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes); + return reuse; + } + + public void clear() { + segments = null; + offset = 0; + sizeInBytes = 0; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + // both BinaryRowData and NestedRowData have the same memory format + if (!(o instanceof BinaryRecordData)) { + return false; + } + final BinarySection that = (BinarySection) o; + return sizeInBytes == that.sizeInBytes + && BinarySegmentUtils.equals( + segments, offset, that.segments, that.offset, sizeInBytes); + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes); + } + + public void setTotalSize(int sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinarySection.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinarySection.java new file mode 100644 index 000000000..b9ed7e35a --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinarySection.java @@ -0,0 +1,82 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.core.memory.MemorySegment; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.utils.Preconditions; + +/** A basic implementation of {@link BinaryFormat} which describe a section of memory. */ +@Internal +public class BinarySection implements BinaryFormat { + + protected MemorySegment[] segments; + protected int offset; + protected int sizeInBytes; + + public BinarySection() {} + + public BinarySection(MemorySegment[] segments, int offset, int sizeInBytes) { + Preconditions.checkArgument(segments != null); + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + pointTo(new MemorySegment[] {segment}, offset, sizeInBytes); + } + + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + Preconditions.checkArgument(segments != null); + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public MemorySegment[] getSegments() { + return segments; + } + + public int getOffset() { + return offset; + } + + public int getSizeInBytes() { + return sizeInBytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final BinarySection that = (BinarySection) o; + return sizeInBytes == that.sizeInBytes + && BinarySegmentUtils.equals( + segments, offset, that.segments, that.offset, sizeInBytes); + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hash(segments, offset, sizeInBytes); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinarySegmentUtils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinarySegmentUtils.java new file mode 100644 index 000000000..d06a92d96 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinarySegmentUtils.java @@ -0,0 +1,1157 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.LocalZonedTimestampData; +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.StringData; +import com.ververica.cdc.common.data.TimestampData; + +import java.io.IOException; +import java.nio.ByteOrder; + +import static com.ververica.cdc.common.data.binary.BinaryFormat.HIGHEST_FIRST_BIT; +import static com.ververica.cdc.common.data.binary.BinaryFormat.HIGHEST_SECOND_TO_EIGHTH_BIT; +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** Utilities for binary data segments which heavily uses {@link MemorySegment}. */ +@Internal +public final class BinarySegmentUtils { + + /** Constant that flags the byte order. */ + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + private static final int ADDRESS_BITS_PER_WORD = 3; + + private static final int BIT_BYTE_INDEX_MASK = 7; + + /** + * SQL execution threads is limited, not too many, so it can bear the overhead of 64K per + * thread. + */ + private static final int MAX_BYTES_LENGTH = 1024 * 64; + + private static final int MAX_CHARS_LENGTH = 1024 * 32; + + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final ThreadLocal BYTES_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal CHARS_LOCAL = new ThreadLocal<>(); + + private BinarySegmentUtils() { + // do not instantiate + } + + /** + * Allocate bytes that is only for temporary usage, it should not be stored in somewhere else. + * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] new and gc. + * + *

If there are methods that can only accept a byte[], instead of a MemorySegment[] + * parameter, we can allocate a reuse bytes and copy the MemorySegment data to byte[], then call + * the method. Such as String deserialization. + */ + public static byte[] allocateReuseBytes(int length) { + byte[] bytes = BYTES_LOCAL.get(); + + if (bytes == null) { + if (length <= MAX_BYTES_LENGTH) { + bytes = new byte[MAX_BYTES_LENGTH]; + BYTES_LOCAL.set(bytes); + } else { + bytes = new byte[length]; + } + } else if (bytes.length < length) { + bytes = new byte[length]; + } + + return bytes; + } + + public static char[] allocateReuseChars(int length) { + char[] chars = CHARS_LOCAL.get(); + + if (chars == null) { + if (length <= MAX_CHARS_LENGTH) { + chars = new char[MAX_CHARS_LENGTH]; + CHARS_LOCAL.set(chars); + } else { + chars = new char[length]; + } + } else if (chars.length < length) { + chars = new char[length]; + } + + return chars; + } + + /** + * Copy segments to a new byte[]. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param numBytes the number bytes to copy. + */ + public static byte[] copyToBytes(MemorySegment[] segments, int offset, int numBytes) { + return copyToBytes(segments, offset, new byte[numBytes], 0, numBytes); + } + + /** + * Copy segments to target byte[]. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param bytes target byte[]. + * @param bytesOffset target byte[] offset. + * @param numBytes the number bytes to copy. + */ + public static byte[] copyToBytes( + MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + segments[0].get(offset, bytes, bytesOffset, numBytes); + } else { + copyMultiSegmentsToBytes(segments, offset, bytes, bytesOffset, numBytes); + } + return bytes; + } + + public static void copyMultiSegmentsToBytes( + MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) { + int remainSize = numBytes; + for (MemorySegment segment : segments) { + int remain = segment.size() - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + segment.get(offset, bytes, numBytes - remainSize + bytesOffset, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + /** + * Copy segments to target unsafe pointer. + * + * @param segments Source segments. + * @param offset The position where the bytes are started to be read from these memory segments. + * @param target The unsafe memory to copy the bytes to. + * @param pointer The position in the target unsafe memory to copy the chunk to. + * @param numBytes the number bytes to copy. + */ + public static void copyToUnsafe( + MemorySegment[] segments, int offset, Object target, int pointer, int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + segments[0].copyToUnsafe(offset, target, pointer, numBytes); + } else { + copyMultiSegmentsToUnsafe(segments, offset, target, pointer, numBytes); + } + } + + private static void copyMultiSegmentsToUnsafe( + MemorySegment[] segments, int offset, Object target, int pointer, int numBytes) { + int remainSize = numBytes; + for (MemorySegment segment : segments) { + int remain = segment.size() - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + segment.copyToUnsafe(offset, target, numBytes - remainSize + pointer, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + /** + * Copy bytes of segments to output view. + * + *

Note: It just copies the data in, not include the length. + * + * @param segments source segments + * @param offset offset for segments + * @param sizeInBytes size in bytes + * @param target target output view + */ + public static void copyToView( + MemorySegment[] segments, int offset, int sizeInBytes, DataOutputView target) + throws IOException { + for (MemorySegment sourceSegment : segments) { + int curSegRemain = sourceSegment.size() - offset; + if (curSegRemain > 0) { + int copySize = Math.min(curSegRemain, sizeInBytes); + + byte[] bytes = allocateReuseBytes(copySize); + sourceSegment.get(offset, bytes, 0, copySize); + target.write(bytes, 0, copySize); + + sizeInBytes -= copySize; + offset = 0; + } else { + offset -= sourceSegment.size(); + } + + if (sizeInBytes == 0) { + return; + } + } + + if (sizeInBytes != 0) { + throw new RuntimeException( + "No copy finished, this should be a bug, " + + "The remaining length is: " + + sizeInBytes); + } + } + + /** + * Copy target segments from source byte[]. + * + * @param segments target segments. + * @param offset target segments offset. + * @param bytes source byte[]. + * @param bytesOffset source byte[] offset. + * @param numBytes the number bytes to copy. + */ + public static void copyFromBytes( + MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) { + if (segments.length == 1) { + segments[0].put(offset, bytes, bytesOffset, numBytes); + } else { + copyMultiSegmentsFromBytes(segments, offset, bytes, bytesOffset, numBytes); + } + } + + private static void copyMultiSegmentsFromBytes( + MemorySegment[] segments, int offset, byte[] bytes, int bytesOffset, int numBytes) { + int remainSize = numBytes; + for (MemorySegment segment : segments) { + int remain = segment.size() - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + segment.put(offset, bytes, numBytes - remainSize + bytesOffset, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + /** Maybe not copied, if want copy, please use copyTo. */ + public static byte[] getBytes(MemorySegment[] segments, int baseOffset, int sizeInBytes) { + // avoid copy if `base` is `byte[]` + if (segments.length == 1) { + byte[] heapMemory = segments[0].getHeapMemory(); + if (baseOffset == 0 && heapMemory != null && heapMemory.length == sizeInBytes) { + return heapMemory; + } else { + byte[] bytes = new byte[sizeInBytes]; + segments[0].get(baseOffset, bytes, 0, sizeInBytes); + return bytes; + } + } else { + byte[] bytes = new byte[sizeInBytes]; + copyMultiSegmentsToBytes(segments, baseOffset, bytes, 0, sizeInBytes); + return bytes; + } + } + + /** + * Equals two memory segments regions. + * + * @param segments1 Segments 1 + * @param offset1 Offset of segments1 to start equaling + * @param segments2 Segments 2 + * @param offset2 Offset of segments2 to start equaling + * @param len Length of the equaled memory region + * @return true if equal, false otherwise + */ + public static boolean equals( + MemorySegment[] segments1, + int offset1, + MemorySegment[] segments2, + int offset2, + int len) { + if (inFirstSegment(segments1, offset1, len) && inFirstSegment(segments2, offset2, len)) { + return segments1[0].equalTo(segments2[0], offset1, offset2, len); + } else { + return equalsMultiSegments(segments1, offset1, segments2, offset2, len); + } + } + + static boolean equalsMultiSegments( + MemorySegment[] segments1, + int offset1, + MemorySegment[] segments2, + int offset2, + int len) { + if (len == 0) { + // quick way and avoid segSize is zero. + return true; + } + + int segSize1 = segments1[0].size(); + int segSize2 = segments2[0].size(); + + // find first segIndex and segOffset of segments. + int segIndex1 = offset1 / segSize1; + int segIndex2 = offset2 / segSize2; + int segOffset1 = offset1 - segSize1 * segIndex1; // equal to % + int segOffset2 = offset2 - segSize2 * segIndex2; // equal to % + + while (len > 0) { + int equalLen = Math.min(Math.min(len, segSize1 - segOffset1), segSize2 - segOffset2); + if (!segments1[segIndex1].equalTo( + segments2[segIndex2], segOffset1, segOffset2, equalLen)) { + return false; + } + len -= equalLen; + segOffset1 += equalLen; + if (segOffset1 == segSize1) { + segOffset1 = 0; + segIndex1++; + } + segOffset2 += equalLen; + if (segOffset2 == segSize2) { + segOffset2 = 0; + segIndex2++; + } + } + return true; + } + + /** + * hash segments to int, numBytes must be aligned to 4 bytes. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param numBytes the number bytes to hash. + */ + public static int hashByWords(MemorySegment[] segments, int offset, int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + return MurmurHashUtils.hashBytesByWords(segments[0], offset, numBytes); + } else { + return hashMultiSegByWords(segments, offset, numBytes); + } + } + + private static int hashMultiSegByWords(MemorySegment[] segments, int offset, int numBytes) { + byte[] bytes = allocateReuseBytes(numBytes); + copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes); + return MurmurHashUtils.hashUnsafeBytesByWords(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes); + } + + /** + * hash segments to int. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param numBytes the number bytes to hash. + */ + public static int hash(MemorySegment[] segments, int offset, int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + return MurmurHashUtils.hashBytes(segments[0], offset, numBytes); + } else { + return hashMultiSeg(segments, offset, numBytes); + } + } + + private static int hashMultiSeg(MemorySegment[] segments, int offset, int numBytes) { + byte[] bytes = allocateReuseBytes(numBytes); + copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes); + return MurmurHashUtils.hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes); + } + + /** Is it just in first MemorySegment, we use quick way to do something. */ + private static boolean inFirstSegment(MemorySegment[] segments, int offset, int numBytes) { + return numBytes + offset <= segments[0].size(); + } + + /** + * Given a bit index, return the byte index containing it. + * + * @param bitIndex the bit index. + * @return the byte index. + */ + private static int byteIndex(int bitIndex) { + return bitIndex >>> ADDRESS_BITS_PER_WORD; + } + + /** + * unset bit. + * + * @param segment target segment. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitUnSet(MemorySegment segment, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } + + /** + * set bit. + * + * @param segment target segment. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitSet(MemorySegment segment, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } + + /** + * read bit. + * + * @param segment target segment. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static boolean bitGet(MemorySegment segment, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; + } + + /** + * unset bit from segments. + * + * @param segments target segments. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitUnSet(MemorySegment[] segments, int baseOffset, int index) { + if (segments.length == 1) { + MemorySegment segment = segments[0]; + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } else { + bitUnSetMultiSegments(segments, baseOffset, index); + } + } + + private static void bitUnSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + MemorySegment segment = segments[segIndex]; + + byte current = segment.get(segOffset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(segOffset, current); + } + + /** + * set bit from segments. + * + * @param segments target segments. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitSet(MemorySegment[] segments, int baseOffset, int index) { + if (segments.length == 1) { + int offset = baseOffset + byteIndex(index); + MemorySegment segment = segments[0]; + byte current = segment.get(offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } else { + bitSetMultiSegments(segments, baseOffset, index); + } + } + + private static void bitSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + MemorySegment segment = segments[segIndex]; + + byte current = segment.get(segOffset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(segOffset, current); + } + + /** + * read bit from segments. + * + * @param segments target segments. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static boolean bitGet(MemorySegment[] segments, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = getByte(segments, offset); + return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; + } + + /** + * get boolean from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static boolean getBoolean(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 1)) { + return segments[0].getBoolean(offset); + } else { + return getBooleanMultiSegments(segments, offset); + } + } + + private static boolean getBooleanMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + return segments[segIndex].getBoolean(segOffset); + } + + /** + * set boolean from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setBoolean(MemorySegment[] segments, int offset, boolean value) { + if (inFirstSegment(segments, offset, 1)) { + segments[0].putBoolean(offset, value); + } else { + setBooleanMultiSegments(segments, offset, value); + } + } + + private static void setBooleanMultiSegments( + MemorySegment[] segments, int offset, boolean value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + segments[segIndex].putBoolean(segOffset, value); + } + + /** + * get byte from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static byte getByte(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 1)) { + return segments[0].get(offset); + } else { + return getByteMultiSegments(segments, offset); + } + } + + private static byte getByteMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + return segments[segIndex].get(segOffset); + } + + /** + * set byte from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setByte(MemorySegment[] segments, int offset, byte value) { + if (inFirstSegment(segments, offset, 1)) { + segments[0].put(offset, value); + } else { + setByteMultiSegments(segments, offset, value); + } + } + + private static void setByteMultiSegments(MemorySegment[] segments, int offset, byte value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + segments[segIndex].put(segOffset, value); + } + + /** + * get int from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static int getInt(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 4)) { + return segments[0].getInt(offset); + } else { + return getIntMultiSegments(segments, offset); + } + } + + private static int getIntMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + return segments[segIndex].getInt(segOffset); + } else { + return getIntSlowly(segments, segSize, segIndex, segOffset); + } + } + + private static int getIntSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset) { + MemorySegment segment = segments[segNum]; + int ret = 0; + for (int i = 0; i < 4; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + int unsignedByte = segment.get(segOffset) & 0xff; + if (LITTLE_ENDIAN) { + ret |= (unsignedByte << (i * 8)); + } else { + ret |= (unsignedByte << ((3 - i) * 8)); + } + segOffset++; + } + return ret; + } + + /** + * set int from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setInt(MemorySegment[] segments, int offset, int value) { + if (inFirstSegment(segments, offset, 4)) { + segments[0].putInt(offset, value); + } else { + setIntMultiSegments(segments, offset, value); + } + } + + private static void setIntMultiSegments(MemorySegment[] segments, int offset, int value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + segments[segIndex].putInt(segOffset, value); + } else { + setIntSlowly(segments, segSize, segIndex, segOffset, value); + } + } + + private static void setIntSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset, int value) { + MemorySegment segment = segments[segNum]; + for (int i = 0; i < 4; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + int unsignedByte; + if (LITTLE_ENDIAN) { + unsignedByte = value >> (i * 8); + } else { + unsignedByte = value >> ((3 - i) * 8); + } + segment.put(segOffset, (byte) unsignedByte); + segOffset++; + } + } + + /** + * get long from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static long getLong(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 8)) { + return segments[0].getLong(offset); + } else { + return getLongMultiSegments(segments, offset); + } + } + + private static long getLongMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + return segments[segIndex].getLong(segOffset); + } else { + return getLongSlowly(segments, segSize, segIndex, segOffset); + } + } + + private static long getLongSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset) { + MemorySegment segment = segments[segNum]; + long ret = 0; + for (int i = 0; i < 8; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + long unsignedByte = segment.get(segOffset) & 0xff; + if (LITTLE_ENDIAN) { + ret |= (unsignedByte << (i * 8)); + } else { + ret |= (unsignedByte << ((7 - i) * 8)); + } + segOffset++; + } + return ret; + } + + /** + * set long from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setLong(MemorySegment[] segments, int offset, long value) { + if (inFirstSegment(segments, offset, 8)) { + segments[0].putLong(offset, value); + } else { + setLongMultiSegments(segments, offset, value); + } + } + + private static void setLongMultiSegments(MemorySegment[] segments, int offset, long value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + segments[segIndex].putLong(segOffset, value); + } else { + setLongSlowly(segments, segSize, segIndex, segOffset, value); + } + } + + private static void setLongSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset, long value) { + MemorySegment segment = segments[segNum]; + for (int i = 0; i < 8; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + long unsignedByte; + if (LITTLE_ENDIAN) { + unsignedByte = value >> (i * 8); + } else { + unsignedByte = value >> ((7 - i) * 8); + } + segment.put(segOffset, (byte) unsignedByte); + segOffset++; + } + } + + /** + * get short from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static short getShort(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 2)) { + return segments[0].getShort(offset); + } else { + return getShortMultiSegments(segments, offset); + } + } + + private static short getShortMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 1) { + return segments[segIndex].getShort(segOffset); + } else { + return (short) getTwoByteSlowly(segments, segSize, segIndex, segOffset); + } + } + + /** + * set short from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setShort(MemorySegment[] segments, int offset, short value) { + if (inFirstSegment(segments, offset, 2)) { + segments[0].putShort(offset, value); + } else { + setShortMultiSegments(segments, offset, value); + } + } + + private static void setShortMultiSegments(MemorySegment[] segments, int offset, short value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 1) { + segments[segIndex].putShort(segOffset, value); + } else { + setTwoByteSlowly(segments, segSize, segIndex, segOffset, value, value >> 8); + } + } + + /** + * get float from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static float getFloat(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 4)) { + return segments[0].getFloat(offset); + } else { + return getFloatMultiSegments(segments, offset); + } + } + + private static float getFloatMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + return segments[segIndex].getFloat(segOffset); + } else { + return Float.intBitsToFloat(getIntSlowly(segments, segSize, segIndex, segOffset)); + } + } + + /** + * set float from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setFloat(MemorySegment[] segments, int offset, float value) { + if (inFirstSegment(segments, offset, 4)) { + segments[0].putFloat(offset, value); + } else { + setFloatMultiSegments(segments, offset, value); + } + } + + private static void setFloatMultiSegments(MemorySegment[] segments, int offset, float value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + segments[segIndex].putFloat(segOffset, value); + } else { + setIntSlowly(segments, segSize, segIndex, segOffset, Float.floatToRawIntBits(value)); + } + } + + /** + * get double from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static double getDouble(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 8)) { + return segments[0].getDouble(offset); + } else { + return getDoubleMultiSegments(segments, offset); + } + } + + private static double getDoubleMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + return segments[segIndex].getDouble(segOffset); + } else { + return Double.longBitsToDouble(getLongSlowly(segments, segSize, segIndex, segOffset)); + } + } + + /** + * set double from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setDouble(MemorySegment[] segments, int offset, double value) { + if (inFirstSegment(segments, offset, 8)) { + segments[0].putDouble(offset, value); + } else { + setDoubleMultiSegments(segments, offset, value); + } + } + + private static void setDoubleMultiSegments(MemorySegment[] segments, int offset, double value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + segments[segIndex].putDouble(segOffset, value); + } else { + setLongSlowly( + segments, segSize, segIndex, segOffset, Double.doubleToRawLongBits(value)); + } + } + + private static int getTwoByteSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset) { + MemorySegment segment = segments[segNum]; + int ret = 0; + for (int i = 0; i < 2; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + int unsignedByte = segment.get(segOffset) & 0xff; + if (LITTLE_ENDIAN) { + ret |= (unsignedByte << (i * 8)); + } else { + ret |= (unsignedByte << ((1 - i) * 8)); + } + segOffset++; + } + return ret; + } + + private static void setTwoByteSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset, int b1, int b2) { + MemorySegment segment = segments[segNum]; + segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b1 : b2)); + segOffset++; + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b2 : b1)); + } + + /** Gets an instance of {@link DecimalData} from underlying {@link MemorySegment}. */ + public static DecimalData readDecimalData( + MemorySegment[] segments, + int baseOffset, + long offsetAndSize, + int precision, + int scale) { + final int size = ((int) offsetAndSize); + int subOffset = (int) (offsetAndSize >> 32); + byte[] bytes = new byte[size]; + copyToBytes(segments, baseOffset + subOffset, bytes, 0, size); + return DecimalData.fromUnscaledBytes(bytes, precision, scale); + } + + /** + * Gets an instance of {@link TimestampData} from underlying {@link MemorySegment}. + * + * @param segments the underlying MemorySegments + * @param baseOffset the base offset of current instance of {@code TimestampData} + * @param offsetAndNanos the offset of milli-seconds part and nanoseconds + * @return an instance of {@link TimestampData} + */ + public static TimestampData readTimestampData( + MemorySegment[] segments, int baseOffset, long offsetAndNanos) { + final int nanoOfMillisecond = (int) offsetAndNanos; + final int subOffset = (int) (offsetAndNanos >> 32); + final long millisecond = getLong(segments, baseOffset + subOffset); + return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond); + } + + /** + * Gets an instance of {@link LocalZonedTimestampData} from underlying {@link MemorySegment}. + * + * @param segments the underlying MemorySegments + * @param baseOffset the base offset of current instance of {@code TimestampData} + * @param offsetAndNanos the offset of milli-seconds part and nanoseconds + * @return an instance of {@link LocalZonedTimestampData} + */ + public static LocalZonedTimestampData readLocalZonedTimestampData( + MemorySegment[] segments, int baseOffset, long offsetAndNanos) { + final int nanoOfMillisecond = (int) offsetAndNanos; + final int subOffset = (int) (offsetAndNanos >> 32); + final long millisecond = getLong(segments, baseOffset + subOffset); + return LocalZonedTimestampData.fromEpochMillis(millisecond, nanoOfMillisecond); + } + + /** + * Get binary, if len less than 8, will be include in variablePartOffsetAndLen. + * + *

Note: Need to consider the ByteOrder. + * + * @param baseOffset base offset of composite binary format. + * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. + * @param variablePartOffsetAndLen a long value, real data or offset and len. + */ + public static byte[] readBinary( + MemorySegment[] segments, + int baseOffset, + int fieldOffset, + long variablePartOffsetAndLen) { + long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; + if (mark == 0) { + final int subOffset = (int) (variablePartOffsetAndLen >> 32); + final int len = (int) variablePartOffsetAndLen; + return BinarySegmentUtils.copyToBytes(segments, baseOffset + subOffset, len); + } else { + int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); + if (BinarySegmentUtils.LITTLE_ENDIAN) { + return BinarySegmentUtils.copyToBytes(segments, fieldOffset, len); + } else { + // fieldOffset + 1 to skip header. + return BinarySegmentUtils.copyToBytes(segments, fieldOffset + 1, len); + } + } + } + + /** + * Get binary string, if len less than 8, will be include in variablePartOffsetAndLen. + * + *

Note: Need to consider the ByteOrder. + * + * @param baseOffset base offset of composite binary format. + * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. + * @param variablePartOffsetAndLen a long value, real data or offset and len. + */ + public static StringData readStringData( + MemorySegment[] segments, + int baseOffset, + int fieldOffset, + long variablePartOffsetAndLen) { + long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; + if (mark == 0) { + final int subOffset = (int) (variablePartOffsetAndLen >> 32); + final int len = (int) variablePartOffsetAndLen; + return BinaryStringData.fromAddress(segments, baseOffset + subOffset, len); + } else { + int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); + if (BinarySegmentUtils.LITTLE_ENDIAN) { + return BinaryStringData.fromAddress(segments, fieldOffset, len); + } else { + // fieldOffset + 1 to skip header. + return BinaryStringData.fromAddress(segments, fieldOffset + 1, len); + } + } + } + + /** Gets an instance of {@link RecordData} from underlying {@link MemorySegment}. */ + public static RecordData readRecordData( + MemorySegment[] segments, int numFields, int baseOffset, long offsetAndSize) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); + BinaryRecordData recordData = new BinaryRecordData(numFields); + recordData.pointTo(segments, offset + baseOffset, size); + return recordData; + } + + /** + * Find equal segments2 in segments1. + * + * @param segments1 segs to find. + * @param segments2 sub segs. + * @return Return the found offset, return -1 if not find. + */ + public static int find( + MemorySegment[] segments1, + int offset1, + int numBytes1, + MemorySegment[] segments2, + int offset2, + int numBytes2) { + if (numBytes2 == 0) { // quick way 1. + return offset1; + } + if (inFirstSegment(segments1, offset1, numBytes1) + && inFirstSegment(segments2, offset2, numBytes2)) { + byte first = segments2[0].get(offset2); + int end = numBytes1 - numBytes2 + offset1; + for (int i = offset1; i <= end; i++) { + // quick way 2: equal first byte. + if (segments1[0].get(i) == first + && segments1[0].equalTo(segments2[0], i, offset2, numBytes2)) { + return i; + } + } + return -1; + } else { + return findInMultiSegments( + segments1, offset1, numBytes1, segments2, offset2, numBytes2); + } + } + + private static int findInMultiSegments( + MemorySegment[] segments1, + int offset1, + int numBytes1, + MemorySegment[] segments2, + int offset2, + int numBytes2) { + int end = numBytes1 - numBytes2 + offset1; + for (int i = offset1; i <= end; i++) { + if (equalsMultiSegments(segments1, i, segments2, offset2, numBytes2)) { + return i; + } + } + return -1; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryStringData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryStringData.java new file mode 100644 index 000000000..a923cdf59 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/BinaryStringData.java @@ -0,0 +1,878 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.StringData; +import com.ververica.cdc.common.utils.StringUtf8Utils; + +import javax.annotation.Nonnull; + +import java.util.Arrays; + +import static com.ververica.cdc.common.data.binary.BinarySegmentUtils.getBytes; +import static com.ververica.cdc.common.utils.Preconditions.checkArgument; + +/** + * A lazily binary implementation of {@link StringData} which is backed by {@link MemorySegment}s + * and {@link String}. + * + *

Either {@link MemorySegment}s or {@link String} must be provided when constructing {@link + * BinaryStringData}. The other representation will be materialized when needed. + * + *

It provides many useful methods for comparison, search, and so on. + */ +@Internal +public final class BinaryStringData extends LazyBinaryFormat implements StringData { + + public static final BinaryStringData EMPTY_UTF8 = + BinaryStringData.fromBytes(StringUtf8Utils.encodeUTF8("")); + + public BinaryStringData() {} + + public BinaryStringData(String javaObject) { + super(javaObject); + } + + public BinaryStringData(MemorySegment[] segments, int offset, int sizeInBytes) { + super(segments, offset, sizeInBytes); + } + + public BinaryStringData( + MemorySegment[] segments, int offset, int sizeInBytes, String javaObject) { + super(segments, offset, sizeInBytes, javaObject); + } + + // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates a {@link BinaryStringData} instance from the given address (base and offset) and + * length. + */ + public static BinaryStringData fromAddress(MemorySegment[] segments, int offset, int numBytes) { + return new BinaryStringData(segments, offset, numBytes); + } + + /** Creates a {@link BinaryStringData} instance from the given Java string. */ + public static BinaryStringData fromString(String str) { + if (str == null) { + return null; + } else { + return new BinaryStringData(str); + } + } + + /** Creates a {@link BinaryStringData} instance from the given UTF-8 bytes. */ + public static BinaryStringData fromBytes(byte[] bytes) { + return fromBytes(bytes, 0, bytes.length); + } + + /** + * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes with offset and number + * of bytes. + */ + public static BinaryStringData fromBytes(byte[] bytes, int offset, int numBytes) { + return new BinaryStringData( + new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, offset, numBytes); + } + + /** Creates a {@link BinaryStringData} instance that contains `length` spaces. */ + public static BinaryStringData blankString(int length) { + byte[] spaces = new byte[length]; + Arrays.fill(spaces, (byte) ' '); + return fromBytes(spaces); + } + + // ------------------------------------------------------------------------------------------ + // Public Interfaces + // ------------------------------------------------------------------------------------------ + + @Override + public byte[] toBytes() { + ensureMaterialized(); + return getBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes); + } + + @Override + public boolean equals(Object o) { + if (o instanceof BinaryStringData) { + BinaryStringData other = (BinaryStringData) o; + if (javaObject != null && other.javaObject != null) { + return javaObject.equals(other.javaObject); + } + + ensureMaterialized(); + other.ensureMaterialized(); + return binarySection.equals(other.binarySection); + } else { + return false; + } + } + + @Override + public int hashCode() { + ensureMaterialized(); + return binarySection.hashCode(); + } + + @Override + public String toString() { + if (javaObject == null) { + byte[] bytes = + org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReuseBytes( + binarySection.sizeInBytes); + org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes( + binarySection.segments, + binarySection.offset, + bytes, + 0, + binarySection.sizeInBytes); + javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, binarySection.sizeInBytes); + } + return javaObject; + } + + /** + * Compares two strings lexicographically. Since UTF-8 uses groups of six bits, it is sometimes + * useful to use octal notation which uses 3-bit groups. With a calculator which can convert + * between hexadecimal and octal it can be easier to manually create or interpret UTF-8 compared + * with using binary. So we just compare the binary. + */ + @Override + public int compareTo(@Nonnull StringData o) { + // BinaryStringData is the only implementation of StringData + BinaryStringData other = (BinaryStringData) o; + if (javaObject != null && other.javaObject != null) { + return javaObject.compareTo(other.javaObject); + } + + ensureMaterialized(); + other.ensureMaterialized(); + if (binarySection.segments.length == 1 && other.binarySection.segments.length == 1) { + + int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes); + MemorySegment seg1 = binarySection.segments[0]; + MemorySegment seg2 = other.binarySection.segments[0]; + + for (int i = 0; i < len; i++) { + int res = + (seg1.get(binarySection.offset + i) & 0xFF) + - (seg2.get(other.binarySection.offset + i) & 0xFF); + if (res != 0) { + return res; + } + } + return binarySection.sizeInBytes - other.binarySection.sizeInBytes; + } + + // if there are multi segments. + return compareMultiSegments(other); + } + + /** Find the boundaries of segments, and then compare MemorySegment. */ + private int compareMultiSegments(BinaryStringData other) { + + if (binarySection.sizeInBytes == 0 || other.binarySection.sizeInBytes == 0) { + return binarySection.sizeInBytes - other.binarySection.sizeInBytes; + } + + int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes); + + MemorySegment seg1 = binarySection.segments[0]; + MemorySegment seg2 = other.binarySection.segments[0]; + + int segmentSize = binarySection.segments[0].size(); + int otherSegmentSize = other.binarySection.segments[0].size(); + + int sizeOfFirst1 = segmentSize - binarySection.offset; + int sizeOfFirst2 = otherSegmentSize - other.binarySection.offset; + + int varSegIndex1 = 1; + int varSegIndex2 = 1; + + // find the first segment of this string. + while (sizeOfFirst1 <= 0) { + sizeOfFirst1 += segmentSize; + seg1 = binarySection.segments[varSegIndex1++]; + } + + while (sizeOfFirst2 <= 0) { + sizeOfFirst2 += otherSegmentSize; + seg2 = other.binarySection.segments[varSegIndex2++]; + } + + int offset1 = segmentSize - sizeOfFirst1; + int offset2 = otherSegmentSize - sizeOfFirst2; + + int needCompare = Math.min(Math.min(sizeOfFirst1, sizeOfFirst2), len); + + while (needCompare > 0) { + // compare in one segment. + for (int i = 0; i < needCompare; i++) { + int res = (seg1.get(offset1 + i) & 0xFF) - (seg2.get(offset2 + i) & 0xFF); + if (res != 0) { + return res; + } + } + if (needCompare == len) { + break; + } + len -= needCompare; + // next segment + if (sizeOfFirst1 < sizeOfFirst2) { // I am smaller + seg1 = binarySection.segments[varSegIndex1++]; + offset1 = 0; + offset2 += needCompare; + sizeOfFirst1 = segmentSize; + sizeOfFirst2 -= needCompare; + } else if (sizeOfFirst1 > sizeOfFirst2) { // other is smaller + seg2 = other.binarySection.segments[varSegIndex2++]; + offset2 = 0; + offset1 += needCompare; + sizeOfFirst2 = otherSegmentSize; + sizeOfFirst1 -= needCompare; + } else { // same, should go ahead both. + seg1 = binarySection.segments[varSegIndex1++]; + seg2 = other.binarySection.segments[varSegIndex2++]; + offset1 = 0; + offset2 = 0; + sizeOfFirst1 = segmentSize; + sizeOfFirst2 = otherSegmentSize; + } + needCompare = Math.min(Math.min(sizeOfFirst1, sizeOfFirst2), len); + } + + checkArgument(needCompare == len); + + return binarySection.sizeInBytes - other.binarySection.sizeInBytes; + } + + // ------------------------------------------------------------------------------------------ + // Public methods on BinaryStringData + // ------------------------------------------------------------------------------------------ + + /** Returns the number of UTF-8 code points in the string. */ + public int numChars() { + ensureMaterialized(); + if (inFirstSegment()) { + int len = 0; + for (int i = 0; + i < binarySection.sizeInBytes; + i += numBytesForFirstByte(getByteOneSegment(i))) { + len++; + } + return len; + } else { + return numCharsMultiSegs(); + } + } + + private int numCharsMultiSegs() { + int len = 0; + int segSize = binarySection.segments[0].size(); + SegmentAndOffset index = firstSegmentAndOffset(segSize); + int i = 0; + while (i < binarySection.sizeInBytes) { + int charBytes = numBytesForFirstByte(index.value()); + i += charBytes; + len++; + index.skipBytes(charBytes, segSize); + } + return len; + } + + /** + * Returns the {@code byte} value at the specified index. An index ranges from {@code 0} to + * {@code binarySection.sizeInBytes - 1}. + * + * @param index the index of the {@code byte} value. + * @return the {@code byte} value at the specified index of this UTF-8 bytes. + * @exception IndexOutOfBoundsException if the {@code index} argument is negative or not less + * than the length of this UTF-8 bytes. + */ + public byte byteAt(int index) { + ensureMaterialized(); + int globalOffset = binarySection.offset + index; + int size = binarySection.segments[0].size(); + if (globalOffset < size) { + return binarySection.segments[0].get(globalOffset); + } else { + return binarySection.segments[globalOffset / size].get(globalOffset % size); + } + } + + @Override + public MemorySegment[] getSegments() { + ensureMaterialized(); + return super.getSegments(); + } + + @Override + public int getOffset() { + ensureMaterialized(); + return super.getOffset(); + } + + @Override + public int getSizeInBytes() { + ensureMaterialized(); + return super.getSizeInBytes(); + } + + public void ensureMaterialized() { + ensureMaterialized(null); + } + + @Override + protected BinarySection materialize(TypeSerializer serializer) { + if (serializer != null) { + throw new IllegalArgumentException( + "BinaryStringData does not support custom serializers"); + } + + byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject); + return new BinarySection( + new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length); + } + + /** Copy a new {@code BinaryStringData}. */ + public BinaryStringData copy() { + ensureMaterialized(); + byte[] copy = + org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes( + binarySection.segments, binarySection.offset, binarySection.sizeInBytes); + return new BinaryStringData( + new MemorySegment[] {MemorySegmentFactory.wrap(copy)}, + 0, + binarySection.sizeInBytes, + javaObject); + } + + /** + * Returns a binary string that is a substring of this binary string. The substring begins at + * the specified {@code beginIndex} and extends to the character at index {@code endIndex - 1}. + * + *

Examples: + * + *

+ * + *
+     * fromString("hamburger").substring(4, 8) returns binary string "urge"
+     * fromString("smiles").substring(1, 5) returns binary string "mile"
+     * 
+ * + *
+ * + * @param beginIndex the beginning index, inclusive. + * @param endIndex the ending index, exclusive. + * @return the specified substring, return EMPTY_UTF8 when index out of bounds instead of + * StringIndexOutOfBoundsException. + */ + public BinaryStringData substring(int beginIndex, int endIndex) { + ensureMaterialized(); + if (endIndex <= beginIndex || beginIndex >= binarySection.sizeInBytes) { + return EMPTY_UTF8; + } + if (inFirstSegment()) { + MemorySegment segment = binarySection.segments[0]; + int i = 0; + int c = 0; + while (i < binarySection.sizeInBytes && c < beginIndex) { + i += numBytesForFirstByte(segment.get(i + binarySection.offset)); + c += 1; + } + + int j = i; + while (i < binarySection.sizeInBytes && c < endIndex) { + i += numBytesForFirstByte(segment.get(i + binarySection.offset)); + c += 1; + } + + if (i > j) { + byte[] bytes = new byte[i - j]; + segment.get(binarySection.offset + j, bytes, 0, i - j); + return fromBytes(bytes); + } else { + return EMPTY_UTF8; + } + } else { + return substringMultiSegs(beginIndex, endIndex); + } + } + + private BinaryStringData substringMultiSegs(final int start, final int until) { + int segSize = binarySection.segments[0].size(); + SegmentAndOffset index = firstSegmentAndOffset(segSize); + int i = 0; + int c = 0; + while (i < binarySection.sizeInBytes && c < start) { + int charSize = numBytesForFirstByte(index.value()); + i += charSize; + index.skipBytes(charSize, segSize); + c += 1; + } + + int j = i; + while (i < binarySection.sizeInBytes && c < until) { + int charSize = numBytesForFirstByte(index.value()); + i += charSize; + index.skipBytes(charSize, segSize); + c += 1; + } + + if (i > j) { + return fromBytes( + org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes( + binarySection.segments, binarySection.offset + j, i - j)); + } else { + return EMPTY_UTF8; + } + } + + /** + * Returns true if and only if this BinaryStringData contains the specified sequence of bytes + * values. + * + * @param s the sequence to search for + * @return true if this BinaryStringData contains {@code s}, false otherwise + */ + public boolean contains(final BinaryStringData s) { + ensureMaterialized(); + s.ensureMaterialized(); + if (s.binarySection.sizeInBytes == 0) { + return true; + } + int find = + org.apache.flink.table.data.binary.BinarySegmentUtils.find( + binarySection.segments, + binarySection.offset, + binarySection.sizeInBytes, + s.binarySection.segments, + s.binarySection.offset, + s.binarySection.sizeInBytes); + return find != -1; + } + + /** + * Tests if this BinaryStringData starts with the specified prefix. + * + * @param prefix the prefix. + * @return {@code true} if the bytes represented by the argument is a prefix of the bytes + * represented by this string; {@code false} otherwise. Note also that {@code true} will be + * returned if the argument is an empty BinaryStringData or is equal to this {@code + * BinaryStringData} object as determined by the {@link #equals(Object)} method. + */ + public boolean startsWith(final BinaryStringData prefix) { + ensureMaterialized(); + prefix.ensureMaterialized(); + return matchAt(prefix, 0); + } + + /** + * Tests if this BinaryStringData ends with the specified suffix. + * + * @param suffix the suffix. + * @return {@code true} if the bytes represented by the argument is a suffix of the bytes + * represented by this object; {@code false} otherwise. Note that the result will be {@code + * true} if the argument is the empty string or is equal to this {@code BinaryStringData} + * object as determined by the {@link #equals(Object)} method. + */ + public boolean endsWith(final BinaryStringData suffix) { + ensureMaterialized(); + suffix.ensureMaterialized(); + return matchAt(suffix, binarySection.sizeInBytes - suffix.binarySection.sizeInBytes); + } + + /** + * Returns a string whose value is this string, with any leading and trailing whitespace + * removed. + * + * @return A string whose value is this string, with any leading and trailing white space + * removed, or this string if it has no leading or trailing white space. + */ + public BinaryStringData trim() { + ensureMaterialized(); + if (inFirstSegment()) { + int s = 0; + int e = this.binarySection.sizeInBytes - 1; + // skip all of the space (0x20) in the left side + while (s < this.binarySection.sizeInBytes && getByteOneSegment(s) == 0x20) { + s++; + } + // skip all of the space (0x20) in the right side + while (e >= s && getByteOneSegment(e) == 0x20) { + e--; + } + if (s > e) { + // empty string + return EMPTY_UTF8; + } else { + return copyBinaryStringInOneSeg(s, e - s + 1); + } + } else { + return trimMultiSegs(); + } + } + + private BinaryStringData trimMultiSegs() { + int s = 0; + int e = this.binarySection.sizeInBytes - 1; + int segSize = binarySection.segments[0].size(); + SegmentAndOffset front = firstSegmentAndOffset(segSize); + // skip all of the space (0x20) in the left side + while (s < this.binarySection.sizeInBytes && front.value() == 0x20) { + s++; + front.nextByte(segSize); + } + SegmentAndOffset behind = lastSegmentAndOffset(segSize); + // skip all of the space (0x20) in the right side + while (e >= s && behind.value() == 0x20) { + e--; + behind.previousByte(segSize); + } + if (s > e) { + // empty string + return EMPTY_UTF8; + } else { + return copyBinaryString(s, e); + } + } + + /** + * Returns the index within this string of the first occurrence of the specified substring, + * starting at the specified index. + * + * @param str the substring to search for. + * @param fromIndex the index from which to start the search. + * @return the index of the first occurrence of the specified substring, starting at the + * specified index, or {@code -1} if there is no such occurrence. + */ + public int indexOf(BinaryStringData str, int fromIndex) { + ensureMaterialized(); + str.ensureMaterialized(); + if (str.binarySection.sizeInBytes == 0) { + return 0; + } + if (inFirstSegment()) { + // position in byte + int byteIdx = 0; + // position is char + int charIdx = 0; + while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) { + byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx)); + charIdx++; + } + do { + if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) { + return -1; + } + if (org.apache.flink.table.data.binary.BinarySegmentUtils.equals( + binarySection.segments, + binarySection.offset + byteIdx, + str.binarySection.segments, + str.binarySection.offset, + str.binarySection.sizeInBytes)) { + return charIdx; + } + byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx)); + charIdx++; + } while (byteIdx < binarySection.sizeInBytes); + + return -1; + } else { + return indexOfMultiSegs(str, fromIndex); + } + } + + private int indexOfMultiSegs(BinaryStringData str, int fromIndex) { + // position in byte + int byteIdx = 0; + // position is char + int charIdx = 0; + int segSize = binarySection.segments[0].size(); + SegmentAndOffset index = firstSegmentAndOffset(segSize); + while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) { + int charBytes = numBytesForFirstByte(index.value()); + byteIdx += charBytes; + charIdx++; + index.skipBytes(charBytes, segSize); + } + do { + if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) { + return -1; + } + if (org.apache.flink.table.data.binary.BinarySegmentUtils.equals( + binarySection.segments, + binarySection.offset + byteIdx, + str.binarySection.segments, + str.binarySection.offset, + str.binarySection.sizeInBytes)) { + return charIdx; + } + int charBytes = numBytesForFirstByte(index.segment.get(index.offset)); + byteIdx += charBytes; + charIdx++; + index.skipBytes(charBytes, segSize); + } while (byteIdx < binarySection.sizeInBytes); + + return -1; + } + + /** + * Converts all of the characters in this {@code BinaryStringData} to upper case. + * + * @return the {@code BinaryStringData}, converted to uppercase. + */ + public BinaryStringData toUpperCase() { + if (javaObject != null) { + return javaToUpperCase(); + } + if (binarySection.sizeInBytes == 0) { + return EMPTY_UTF8; + } + int size = binarySection.segments[0].size(); + SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size); + byte[] bytes = new byte[binarySection.sizeInBytes]; + bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value()); + for (int i = 0; i < binarySection.sizeInBytes; i++) { + byte b = segmentAndOffset.value(); + if (numBytesForFirstByte(b) != 1) { + // fallback + return javaToUpperCase(); + } + int upper = Character.toUpperCase((int) b); + if (upper > 127) { + // fallback + return javaToUpperCase(); + } + bytes[i] = (byte) upper; + segmentAndOffset.nextByte(size); + } + return fromBytes(bytes); + } + + private BinaryStringData javaToUpperCase() { + return fromString(toString().toUpperCase()); + } + + /** + * Converts all of the characters in this {@code BinaryStringData} to lower case. + * + * @return the {@code BinaryStringData}, converted to lowercase. + */ + public BinaryStringData toLowerCase() { + if (javaObject != null) { + return javaToLowerCase(); + } + if (binarySection.sizeInBytes == 0) { + return EMPTY_UTF8; + } + int size = binarySection.segments[0].size(); + SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size); + byte[] bytes = new byte[binarySection.sizeInBytes]; + bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value()); + for (int i = 0; i < binarySection.sizeInBytes; i++) { + byte b = segmentAndOffset.value(); + if (numBytesForFirstByte(b) != 1) { + // fallback + return javaToLowerCase(); + } + int lower = Character.toLowerCase((int) b); + if (lower > 127) { + // fallback + return javaToLowerCase(); + } + bytes[i] = (byte) lower; + segmentAndOffset.nextByte(size); + } + return fromBytes(bytes); + } + + private BinaryStringData javaToLowerCase() { + return fromString(toString().toLowerCase()); + } + + // ------------------------------------------------------------------------------------------ + // Internal methods on BinaryStringData + // ------------------------------------------------------------------------------------------ + + byte getByteOneSegment(int i) { + return binarySection.segments[0].get(binarySection.offset + i); + } + + boolean inFirstSegment() { + return binarySection.sizeInBytes + binarySection.offset <= binarySection.segments[0].size(); + } + + private boolean matchAt(final BinaryStringData s, int pos) { + return (inFirstSegment() && s.inFirstSegment()) + ? matchAtOneSeg(s, pos) + : matchAtVarSeg(s, pos); + } + + private boolean matchAtOneSeg(final BinaryStringData s, int pos) { + return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes + && pos >= 0 + && binarySection.segments[0].equalTo( + s.binarySection.segments[0], + binarySection.offset + pos, + s.binarySection.offset, + s.binarySection.sizeInBytes); + } + + private boolean matchAtVarSeg(final BinaryStringData s, int pos) { + return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes + && pos >= 0 + && org.apache.flink.table.data.binary.BinarySegmentUtils.equals( + binarySection.segments, + binarySection.offset + pos, + s.binarySection.segments, + s.binarySection.offset, + s.binarySection.sizeInBytes); + } + + BinaryStringData copyBinaryStringInOneSeg(int start, int len) { + byte[] newBytes = new byte[len]; + binarySection.segments[0].get(binarySection.offset + start, newBytes, 0, len); + return fromBytes(newBytes); + } + + BinaryStringData copyBinaryString(int start, int end) { + int len = end - start + 1; + byte[] newBytes = new byte[len]; + BinarySegmentUtils.copyToBytes( + binarySection.segments, binarySection.offset + start, newBytes, 0, len); + return fromBytes(newBytes); + } + + SegmentAndOffset firstSegmentAndOffset(int segSize) { + int segIndex = binarySection.offset / segSize; + return new SegmentAndOffset(segIndex, binarySection.offset % segSize); + } + + SegmentAndOffset lastSegmentAndOffset(int segSize) { + int lastOffset = binarySection.offset + binarySection.sizeInBytes - 1; + int segIndex = lastOffset / segSize; + return new SegmentAndOffset(segIndex, lastOffset % segSize); + } + + private SegmentAndOffset startSegmentAndOffset(int segSize) { + return inFirstSegment() + ? new SegmentAndOffset(0, binarySection.offset) + : firstSegmentAndOffset(segSize); + } + + /** CurrentSegment and positionInSegment. */ + class SegmentAndOffset { + int segIndex; + MemorySegment segment; + int offset; + + private SegmentAndOffset(int segIndex, int offset) { + this.segIndex = segIndex; + this.segment = binarySection.segments[segIndex]; + this.offset = offset; + } + + private void assignSegment() { + segment = + segIndex >= 0 && segIndex < binarySection.segments.length + ? binarySection.segments[segIndex] + : null; + } + + void previousByte(int segSize) { + offset--; + if (offset == -1) { + segIndex--; + assignSegment(); + offset = segSize - 1; + } + } + + void nextByte(int segSize) { + offset++; + checkAdvance(segSize); + } + + private void checkAdvance(int segSize) { + if (offset == segSize) { + advance(); + } + } + + private void advance() { + segIndex++; + assignSegment(); + offset = 0; + } + + void skipBytes(int n, int segSize) { + int remaining = segSize - this.offset; + if (remaining > n) { + this.offset += n; + } else { + while (true) { + int toSkip = Math.min(remaining, n); + n -= toSkip; + if (n <= 0) { + this.offset += toSkip; + checkAdvance(segSize); + return; + } + advance(); + remaining = segSize - this.offset; + } + } + } + + byte value() { + return this.segment.get(this.offset); + } + } + + /** + * Returns the number of bytes for a code point with the first byte as `b`. + * + * @param b The first byte of a code point + */ + static int numBytesForFirstByte(final byte b) { + if (b >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + return 1; + } else if ((b >> 5) == -2 && (b & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + return 2; + } else if ((b >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + return 3; + } else if ((b >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + return 4; + } else { + // Skip the first byte disallowed in UTF-8 + // Handling errors quietly, same semantics to java String. + return 1; + } + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/LazyBinaryFormat.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/LazyBinaryFormat.java new file mode 100644 index 000000000..a3991f8ef --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/LazyBinaryFormat.java @@ -0,0 +1,136 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; + +import com.ververica.cdc.common.annotation.Internal; + +import java.io.IOException; + +/** + * An abstract implementation fo {@link BinaryFormat} which is lazily serialized into binary or + * lazily deserialized into Java object. + * + *

The reason why we introduce this data structure is in order to save (de)serialization in + * nested function calls. Consider the following function call chain: + * + *

UDF0(input) -> UDF1(result0) -> UDF2(result1) -> UDF3(result2)
+ * + *

Such nested calls, if the return values of UDFs are Java object format, it will result in + * multiple conversions between Java object and binary format: + * + *

+ * converterToBinary(UDF0(converterToJavaObject(input))) ->
+ *   converterToBinary(UDF1(converterToJavaObject(result0))) ->
+ *     converterToBinary(UDF2(converterToJavaObject(result1))) ->
+ *       ...
+ * 
+ * + *

So we introduced {@link LazyBinaryFormat} to avoid the redundant cost, it has three forms: + * + *

+ * + *

It can lazy the conversions as much as possible. It will be converted into required form only + * when it is needed. + */ +@Internal +public abstract class LazyBinaryFormat implements BinaryFormat { + + T javaObject; + BinarySection binarySection; + + public LazyBinaryFormat() { + this(null, null); + } + + public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) { + this(javaObject, new BinarySection(segments, offset, sizeInBytes)); + } + + public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) { + this(null, new BinarySection(segments, offset, sizeInBytes)); + } + + public LazyBinaryFormat(T javaObject) { + this(javaObject, null); + } + + public LazyBinaryFormat(T javaObject, BinarySection binarySection) { + this.javaObject = javaObject; + this.binarySection = binarySection; + } + + public T getJavaObject() { + return javaObject; + } + + public BinarySection getBinarySection() { + return binarySection; + } + + /** Must be public as it is used during code generation. */ + public void setJavaObject(T javaObject) { + this.javaObject = javaObject; + } + + @Override + public MemorySegment[] getSegments() { + if (binarySection == null) { + throw new IllegalStateException("Lazy Binary Format was not materialized"); + } + return binarySection.segments; + } + + @Override + public int getOffset() { + if (binarySection == null) { + throw new IllegalStateException("Lazy Binary Format was not materialized"); + } + return binarySection.offset; + } + + @Override + public int getSizeInBytes() { + if (binarySection == null) { + throw new IllegalStateException("Lazy Binary Format was not materialized"); + } + return binarySection.sizeInBytes; + } + + /** Ensure we have materialized binary format. */ + public final void ensureMaterialized(TypeSerializer serializer) { + if (binarySection == null) { + try { + this.binarySection = materialize(serializer); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + + /** + * Materialize java object to binary format. Inherited classes need to hold the information they + * need. + */ + protected abstract BinarySection materialize(TypeSerializer serializer) throws IOException; +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/MurmurHashUtils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/MurmurHashUtils.java new file mode 100644 index 000000000..d665e57a8 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/MurmurHashUtils.java @@ -0,0 +1,179 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import org.apache.flink.core.memory.MemorySegment; + +import com.ververica.cdc.common.annotation.Internal; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction. */ +@Internal +final class MurmurHashUtils { + + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + public static final int DEFAULT_SEED = 42; + + private MurmurHashUtils() { + // do not instantiate + } + + /** + * Hash unsafe bytes, length must be aligned to 4 bytes. + * + * @param base base unsafe object + * @param offset offset for unsafe object + * @param lengthInBytes length in bytes + * @return hash code + */ + public static int hashUnsafeBytesByWords(Object base, long offset, int lengthInBytes) { + return hashUnsafeBytesByWords(base, offset, lengthInBytes, DEFAULT_SEED); + } + + /** + * Hash unsafe bytes. + * + * @param base base unsafe object + * @param offset offset for unsafe object + * @param lengthInBytes length in bytes + * @return hash code + */ + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { + return hashUnsafeBytes(base, offset, lengthInBytes, DEFAULT_SEED); + } + + /** + * Hash bytes in MemorySegment, length must be aligned to 4 bytes. + * + * @param segment segment. + * @param offset offset for MemorySegment + * @param lengthInBytes length in MemorySegment + * @return hash code + */ + public static int hashBytesByWords(MemorySegment segment, int offset, int lengthInBytes) { + return hashBytesByWords(segment, offset, lengthInBytes, DEFAULT_SEED); + } + + /** + * Hash bytes in MemorySegment. + * + * @param segment segment. + * @param offset offset for MemorySegment + * @param lengthInBytes length in MemorySegment + * @return hash code + */ + public static int hashBytes(MemorySegment segment, int offset, int lengthInBytes) { + return hashBytes(segment, offset, lengthInBytes, DEFAULT_SEED); + } + + private static int hashUnsafeBytesByWords( + Object base, long offset, int lengthInBytes, int seed) { + int h1 = hashUnsafeBytesByInt(base, offset, lengthInBytes, seed); + return fmix(h1, lengthInBytes); + } + + private static int hashBytesByWords( + MemorySegment segment, int offset, int lengthInBytes, int seed) { + int h1 = hashBytesByInt(segment, offset, lengthInBytes, seed); + return fmix(h1, lengthInBytes); + } + + private static int hashBytes(MemorySegment segment, int offset, int lengthInBytes, int seed) { + int lengthAligned = lengthInBytes - lengthInBytes % 4; + int h1 = hashBytesByInt(segment, offset, lengthAligned, seed); + for (int i = lengthAligned; i < lengthInBytes; i++) { + int k1 = mixK1(segment.get(offset + i)); + h1 = mixH1(h1, k1); + } + return fmix(h1, lengthInBytes); + } + + private static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative"; + int lengthAligned = lengthInBytes - lengthInBytes % 4; + int h1 = hashUnsafeBytesByInt(base, offset, lengthAligned, seed); + for (int i = lengthAligned; i < lengthInBytes; i++) { + int halfWord = UNSAFE.getByte(base, offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return fmix(h1, lengthInBytes); + } + + private static int hashUnsafeBytesByInt(Object base, long offset, int lengthInBytes, int seed) { + assert (lengthInBytes % 4 == 0); + int h1 = seed; + for (int i = 0; i < lengthInBytes; i += 4) { + int halfWord = UNSAFE.getInt(base, offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return h1; + } + + private static int hashBytesByInt( + MemorySegment segment, int offset, int lengthInBytes, int seed) { + assert (lengthInBytes % 4 == 0); + int h1 = seed; + for (int i = 0; i < lengthInBytes; i += 4) { + int halfWord = segment.getInt(offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return h1; + } + + private static int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private static int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + return h1; + } + + // Finalization mix - force all bits of a hash block to avalanche + private static int fmix(int h1, int length) { + h1 ^= length; + return fmix(h1); + } + + public static int fmix(int h) { + h ^= h >>> 16; + h *= 0x85ebca6b; + h ^= h >>> 13; + h *= 0xc2b2ae35; + h ^= h >>> 16; + return h; + } + + public static long fmix(long h) { + h ^= (h >>> 33); + h *= 0xff51afd7ed558ccdL; + h ^= (h >>> 33); + h *= 0xc4ceb9fe1a85ec53L; + h ^= (h >>> 33); + return h; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/NullAwareGetters.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/NullAwareGetters.java new file mode 100644 index 000000000..e3ed1e605 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/binary/NullAwareGetters.java @@ -0,0 +1,33 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data.binary; + +import com.ververica.cdc.common.annotation.Internal; + +/** Provides null related getters. */ +@Internal +public interface NullAwareGetters { + + /** If no field is null, return false. Returns true if one of the columns is null. */ + boolean anyNull(); + + /** + * For the input fields, if no field is null, return false. Returns true if one of the columns + * is null. + */ + boolean anyNull(int[] fields); +} diff --git a/flink-cdc-runtime/pom.xml b/flink-cdc-runtime/pom.xml index 411d41335..7f9e0311f 100644 --- a/flink-cdc-runtime/pom.xml +++ b/flink-cdc-runtime/pom.xml @@ -51,6 +51,20 @@ under the License. ${flink.version} test + + org.apache.flink + flink-streaming-java + ${flink.version} + test + test-jar + + + org.apache.flink + flink-runtime + ${flink.version} + test + test-jar + \ No newline at end of file diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/RecordDataSerializer.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/RecordDataSerializer.java index 4fcc61eb1..f9e14a07a 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/RecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/RecordDataSerializer.java @@ -27,12 +27,16 @@ import org.apache.flink.core.memory.DataOutputView; import com.ververica.cdc.common.data.GenericRecordData; import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.binary.BinaryRecordData; import com.ververica.cdc.common.types.DataType; import com.ververica.cdc.common.types.RowType; import com.ververica.cdc.common.utils.InstantiationUtil; import com.ververica.cdc.runtime.serializer.InternalSerializers; import com.ververica.cdc.runtime.serializer.NestedSerializersSnapshotDelegate; import com.ververica.cdc.runtime.serializer.NullableSerializerWrapper; +import com.ververica.cdc.runtime.serializer.data.binary.BinaryRecordDataSerializer; +import com.ververica.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter; +import com.ververica.cdc.runtime.serializer.data.writer.BinaryWriter; import com.ververica.cdc.runtime.serializer.schema.DataTypeSerializer; import java.io.IOException; @@ -47,6 +51,10 @@ public class RecordDataSerializer extends TypeSerializer { private final DataType[] types; private final TypeSerializer[] fieldSerializers; private final RecordData.FieldGetter[] fieldGetters; + private final BinaryRecordDataSerializer binarySerializer = BinaryRecordDataSerializer.INSTANCE; + + private transient BinaryRecordData reuseRow; + private transient BinaryRecordDataWriter reuseWriter; private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer(); @@ -96,19 +104,30 @@ public class RecordDataSerializer extends TypeSerializer { @Override public void serialize(RecordData recordData, DataOutputView target) throws IOException { - target.writeInt(types.length); - for (int i = 0; i < types.length; i++) { - fieldSerializers[i].serialize(fieldGetters[i].getFieldOrNull(recordData), target); + if (recordData instanceof BinaryRecordData) { + target.writeBoolean(true); + binarySerializer.serialize((BinaryRecordData) recordData, target); + } else { + target.writeBoolean(false); + target.writeInt(types.length); + for (int i = 0; i < types.length; i++) { + fieldSerializers[i].serialize(fieldGetters[i].getFieldOrNull(recordData), target); + } } } @Override public RecordData deserialize(DataInputView source) throws IOException { - Object[] fields = new Object[source.readInt()]; - for (int i = 0; i < fields.length; i++) { - fields[i] = fieldSerializers[i].deserialize(source); + boolean isBinary = source.readBoolean(); + if (isBinary) { + return binarySerializer.deserialize(source); + } else { + Object[] fields = new Object[source.readInt()]; + for (int i = 0; i < fields.length; i++) { + fields[i] = fieldSerializers[i].deserialize(source); + } + return GenericRecordData.of(fields); } - return GenericRecordData.of(fields); } @Override @@ -195,6 +214,32 @@ public class RecordDataSerializer extends TypeSerializer { return new RecordDataSerializerSnapshot(types, fieldSerializers); } + /** Convert {@link RecordData} into {@link BinaryRecordData}. */ + public BinaryRecordData toBinaryRecordData(RecordData row) { + if (row instanceof BinaryRecordData) { + return (BinaryRecordData) row; + } + if (reuseRow == null) { + reuseRow = new BinaryRecordData(types.length); + reuseWriter = new BinaryRecordDataWriter(reuseRow); + } + reuseWriter.reset(); + for (int i = 0; i < types.length; i++) { + if (row.isNullAt(i)) { + reuseWriter.setNullAt(i); + } else { + BinaryWriter.write( + reuseWriter, + i, + fieldGetters[i].getFieldOrNull(reuseRow), + types[i], + fieldSerializers[i]); + } + } + reuseWriter.complete(); + return reuseRow; + } + /** {@link TypeSerializerSnapshot} for {@link RecordDataSerializer}. */ public static final class RecordDataSerializerSnapshot implements TypeSerializerSnapshot { diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/binary/BinaryRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/binary/BinaryRecordDataSerializer.java new file mode 100644 index 000000000..8fb77e44f --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/binary/BinaryRecordDataSerializer.java @@ -0,0 +1,160 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.serializer.data.binary; + +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.memory.MemorySegmentWritable; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.binary.BinaryRecordData; +import com.ververica.cdc.common.data.binary.BinarySegmentUtils; +import com.ververica.cdc.runtime.serializer.TypeSerializerSingleton; + +import java.io.IOException; + +import static com.ververica.cdc.common.utils.Preconditions.checkArgument; + +/** Serializer for {@link BinaryRecordData}. */ +@Internal +public class BinaryRecordDataSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + public static final BinaryRecordDataSerializer INSTANCE = new BinaryRecordDataSerializer(); + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public BinaryRecordData createInstance() { + return new BinaryRecordData(1); + } + + @Override + public BinaryRecordData copy(BinaryRecordData from) { + return copy(from, new BinaryRecordData(from.getArity())); + } + + @Override + public BinaryRecordData copy(BinaryRecordData from, BinaryRecordData reuse) { + return from.copy(reuse); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(BinaryRecordData record, DataOutputView target) throws IOException { + target.writeInt(record.getArity()); + target.writeInt(record.getSizeInBytes()); + if (target instanceof MemorySegmentWritable) { + serializeWithoutLength(record, (MemorySegmentWritable) target); + } else { + BinarySegmentUtils.copyToView( + record.getSegments(), record.getOffset(), record.getSizeInBytes(), target); + } + } + + @Override + public BinaryRecordData deserialize(DataInputView source) throws IOException { + BinaryRecordData row = new BinaryRecordData(source.readInt()); + int length = source.readInt(); + byte[] bytes = new byte[length]; + source.readFully(bytes); + row.pointTo(MemorySegmentFactory.wrap(bytes), 0, length); + return row; + } + + @Override + public BinaryRecordData deserialize(BinaryRecordData reuse, DataInputView source) + throws IOException { + MemorySegment[] segments = reuse.getSegments(); + checkArgument( + segments == null || (segments.length == 1 && reuse.getOffset() == 0), + "Reuse BinaryRecordData should have no segments or only one segment and offset start at 0."); + + int arity = source.readInt(); + int length = source.readInt(); + if (segments == null || segments[0].size() < length) { + segments = new MemorySegment[] {MemorySegmentFactory.wrap(new byte[length])}; + } + source.readFully(segments[0].getArray(), 0, length); + reuse.pointTo(segments, 0, length); + return reuse; + } + + // ============================ Page related operations =================================== + + private static void serializeWithoutLength( + BinaryRecordData record, MemorySegmentWritable writable) throws IOException { + if (record.getSegments().length == 1) { + writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes()); + } else { + serializeWithoutLengthSlow(record, writable); + } + } + + public static void serializeWithoutLengthSlow( + BinaryRecordData record, MemorySegmentWritable out) throws IOException { + int remainSize = record.getSizeInBytes(); + int posInSegOfRecord = record.getOffset(); + int segmentSize = record.getSegments()[0].size(); + for (MemorySegment segOfRecord : record.getSegments()) { + int nWrite = Math.min(segmentSize - posInSegOfRecord, remainSize); + assert nWrite > 0; + out.write(segOfRecord, posInSegOfRecord, nWrite); + + // next new segment. + posInSegOfRecord = 0; + remainSize -= nWrite; + if (remainSize == 0) { + break; + } + } + checkArgument(remainSize == 0); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int length = source.readInt(); + target.writeInt(length); + target.write(source, length); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new BinaryRecordDataSerializerSnapshot(); + } + + /** {@link TypeSerializerSnapshot} for {@link BinaryRecordDataSerializer}. */ + public static final class BinaryRecordDataSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public BinaryRecordDataSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java new file mode 100644 index 000000000..88a830767 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java @@ -0,0 +1,392 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.serializer.data.writer; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.ArrayData; +import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.LocalZonedTimestampData; +import com.ververica.cdc.common.data.MapData; +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.StringData; +import com.ververica.cdc.common.data.TimestampData; +import com.ververica.cdc.common.data.ZonedTimestampData; +import com.ververica.cdc.common.data.binary.BinaryFormat; +import com.ververica.cdc.common.data.binary.BinaryRecordData; +import com.ververica.cdc.common.data.binary.BinarySegmentUtils; +import com.ververica.cdc.common.data.binary.BinaryStringData; +import com.ververica.cdc.runtime.serializer.data.ArrayDataSerializer; +import com.ververica.cdc.runtime.serializer.data.RecordDataSerializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static com.ververica.cdc.common.data.binary.BinaryRecordData.TIMESTAMP_DELIMITER; + +/** + * Use the special format to write data to a {@link MemorySegment} (its capacity grows + * automatically). + * + *

If write a format binary: 1. New a writer. 2. Write each field by writeXX or setNullAt. + * (Variable length fields can not be written repeatedly.) 3. Invoke {@link #complete()}. + * + *

If want to reuse this writer, please invoke {@link #reset()} first. + */ +@Internal +abstract class AbstractBinaryWriter implements BinaryWriter { + + protected MemorySegment segment; + + protected int cursor; + + protected DataOutputViewStreamWrapper outputView; + + /** Set offset and size to fix len part. */ + protected abstract void setOffsetAndSize(int pos, int offset, long size); + + /** Get field offset. */ + protected abstract int getFieldOffset(int pos); + + /** After grow, need point to new memory. */ + protected abstract void afterGrow(); + + protected abstract void setNullBit(int ordinal); + + /** See {@link BinarySegmentUtils#readStringData(MemorySegment[], int, int, long)}. */ + @Override + public void writeString(int pos, StringData input) { + BinaryStringData string = (BinaryStringData) input; + if (string.getSegments() == null) { + String javaObject = string.toString(); + writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8)); + } else { + int len = string.getSizeInBytes(); + if (len <= 7) { + byte[] bytes = BinarySegmentUtils.allocateReuseBytes(len); + BinarySegmentUtils.copyToBytes( + string.getSegments(), string.getOffset(), bytes, 0, len); + writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); + } else { + writeSegmentsToVarLenPart(pos, string.getSegments(), string.getOffset(), len); + } + } + } + + private void writeBytes(int pos, byte[] bytes) { + int len = bytes.length; + if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) { + writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); + } else { + writeBytesToVarLenPart(pos, bytes, len); + } + } + + @Override + public void writeArray(int pos, ArrayData input, ArrayDataSerializer serializer) { + throw new UnsupportedOperationException("Not support array data."); + } + + @Override + public void writeMap(int pos, MapData input, TypeSerializer serializer) { + throw new UnsupportedOperationException("Not support map data."); + } + + private DataOutputViewStreamWrapper getOutputView() { + if (outputView == null) { + outputView = new DataOutputViewStreamWrapper(new BinaryRowWriterOutputView()); + } + return outputView; + } + + @Override + public void writeRecord(int pos, RecordData input, RecordDataSerializer serializer) { + if (input instanceof BinaryFormat) { + BinaryFormat row = (BinaryFormat) input; + writeSegmentsToVarLenPart( + pos, row.getSegments(), row.getOffset(), row.getSizeInBytes()); + } else { + BinaryRecordData row = serializer.toBinaryRecordData(input); + writeSegmentsToVarLenPart( + pos, row.getSegments(), row.getOffset(), row.getSizeInBytes()); + } + } + + @Override + public void writeBinary(int pos, byte[] bytes) { + int len = bytes.length; + if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) { + writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); + } else { + writeBytesToVarLenPart(pos, bytes, len); + } + } + + @Override + public void writeDecimal(int pos, DecimalData value, int precision) { + assert value == null || (value.precision() == precision); + + if (DecimalData.isCompact(precision)) { + assert value != null; + writeLong(pos, value.toUnscaledLong()); + } else { + // grow the global buffer before writing data. + ensureCapacity(16); + + // zero-out the bytes + segment.putLong(cursor, 0L); + segment.putLong(cursor + 8, 0L); + + // Make sure Decimal object has the same scale as DecimalType. + // Note that we may pass in null Decimal object to set null for it. + if (value == null) { + setNullBit(pos); + // keep the offset for future update + setOffsetAndSize(pos, cursor, 0); + } else { + final byte[] bytes = value.toUnscaledBytes(); + assert bytes.length <= 16; + + // Write the bytes to the variable length portion. + segment.put(cursor, bytes, 0, bytes.length); + setOffsetAndSize(pos, cursor, bytes.length); + } + + // move the cursor forward. + cursor += 16; + } + } + + @Override + public void writeTimestamp(int pos, TimestampData value, int precision) { + if (TimestampData.isCompact(precision)) { + writeLong(pos, value.getMillisecond()); + } else { + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + ensureCapacity(8); + + if (value == null) { + setNullBit(pos); + // zero-out the bytes + segment.putLong(cursor, 0L); + setOffsetAndSize(pos, cursor, 0); + } else { + segment.putLong(cursor, value.getMillisecond()); + setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); + } + + cursor += 8; + } + } + + @Override + public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision) { + if (LocalZonedTimestampData.isCompact(precision)) { + writeLong(pos, value.getEpochMillisecond()); + } else { + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + ensureCapacity(8); + + if (value == null) { + setNullBit(pos); + // zero-out the bytes + segment.putLong(cursor, 0L); + setOffsetAndSize(pos, cursor, 0); + } else { + segment.putLong(cursor, value.getEpochMillisecond()); + setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond()); + } + + cursor += 8; + } + } + + @Override + public void writeZonedTimestamp(int pos, ZonedTimestampData value, int precision) { + String timestampString = + String.join( + TIMESTAMP_DELIMITER, + Arrays.asList( + String.valueOf(value.getMillisecond()), + String.valueOf(value.getNanoOfMillisecond()), + value.getZoneId())); + writeString(pos, new BinaryStringData(timestampString)); + } + + private void zeroBytes(int offset, int size) { + for (int i = offset; i < offset + size; i++) { + segment.put(i, (byte) 0); + } + } + + protected void zeroOutPaddingBytes(int numBytes) { + if ((numBytes & 0x07) > 0) { + segment.putLong(cursor + ((numBytes >> 3) << 3), 0L); + } + } + + protected void ensureCapacity(int neededSize) { + final int length = cursor + neededSize; + if (segment.size() < length) { + grow(length); + } + } + + private void writeSegmentsToVarLenPart( + int pos, MemorySegment[] segments, int offset, int size) { + final int roundedSize = roundNumberOfBytesToNearestWord(size); + + // grow the global buffer before writing data. + ensureCapacity(roundedSize); + + zeroOutPaddingBytes(size); + + if (segments.length == 1) { + segments[0].copyTo(offset, segment, cursor, size); + } else { + writeMultiSegmentsToVarLenPart(segments, offset, size); + } + + setOffsetAndSize(pos, cursor, size); + + // move the cursor forward. + cursor += roundedSize; + } + + private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, int offset, int size) { + // Write the bytes to the variable length portion. + int needCopy = size; + int fromOffset = offset; + int toOffset = cursor; + for (MemorySegment sourceSegment : segments) { + int remain = sourceSegment.size() - fromOffset; + if (remain > 0) { + int copySize = remain > needCopy ? needCopy : remain; + sourceSegment.copyTo(fromOffset, segment, toOffset, copySize); + needCopy -= copySize; + toOffset += copySize; + fromOffset = 0; + } else { + fromOffset -= sourceSegment.size(); + } + } + } + + private void writeBytesToVarLenPart(int pos, byte[] bytes, int len) { + final int roundedSize = roundNumberOfBytesToNearestWord(len); + + // grow the global buffer before writing data. + ensureCapacity(roundedSize); + + zeroOutPaddingBytes(len); + + // Write the bytes to the variable length portion. + segment.put(cursor, bytes, 0, len); + + setOffsetAndSize(pos, cursor, len); + + // move the cursor forward. + cursor += roundedSize; + } + + /** Increases the capacity to ensure that it can hold at least the minimum capacity argument. */ + private void grow(int minCapacity) { + int oldCapacity = segment.size(); + int newCapacity = oldCapacity + (oldCapacity >> 1); + if (newCapacity - minCapacity < 0) { + newCapacity = minCapacity; + } + segment = MemorySegmentFactory.wrap(Arrays.copyOf(segment.getArray(), newCapacity)); + afterGrow(); + } + + protected static int roundNumberOfBytesToNearestWord(int numBytes) { + int remainder = numBytes & 0x07; + if (remainder == 0) { + return numBytes; + } else { + return numBytes + (8 - remainder); + } + } + + private static void writeBytesToFixLenPart( + MemorySegment segment, int fieldOffset, byte[] bytes, int len) { + long firstByte = len | 0x80; // first bit is 1, other bits is len + long sevenBytes = 0L; // real data + if (BinaryRecordData.LITTLE_ENDIAN) { + for (int i = 0; i < len; i++) { + sevenBytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L)); + } + } else { + for (int i = 0; i < len; i++) { + sevenBytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 8L)); + } + } + + final long offsetAndSize = (firstByte << 56) | sevenBytes; + + segment.putLong(fieldOffset, offsetAndSize); + } + + @Internal + public MemorySegment getSegments() { + return segment; + } + + /** OutputView for write Generic. */ + private class BinaryRowWriterOutputView extends OutputStream { + + /** + * Writes the specified byte to this output stream. The general contract for write + * is that one byte is written to the output stream. The byte to be written is the + * eight low-order bits of the argument b. The 24 high-order bits of b + * are ignored. + */ + @Override + public void write(int b) throws IOException { + ensureCapacity(1); + segment.put(cursor, (byte) b); + cursor += 1; + } + + @Override + public void write(byte[] b) throws IOException { + ensureCapacity(b.length); + segment.put(cursor, b, 0, b.length); + cursor += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + ensureCapacity(len); + segment.put(cursor, b, off, len); + cursor += len; + } + + public void write(MemorySegment seg, int off, int len) throws IOException { + ensureCapacity(len); + seg.copyTo(off, segment, cursor, len); + cursor += len; + } + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/BinaryRecordDataWriter.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/BinaryRecordDataWriter.java new file mode 100644 index 000000000..3ad104023 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/BinaryRecordDataWriter.java @@ -0,0 +1,123 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.serializer.data.writer; + +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.binary.BinarySegmentUtils; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.binary.BinaryRecordData; + +/** Writer for {@link BinaryRecordData}. */ +@Internal +public final class BinaryRecordDataWriter extends AbstractBinaryWriter { + + private final int nullBitsSizeInBytes; + private final BinaryRecordData row; + private final int fixedSize; + + public BinaryRecordDataWriter(BinaryRecordData row) { + this(row, 0); + } + + public BinaryRecordDataWriter(BinaryRecordData row, int initialSize) { + this.nullBitsSizeInBytes = BinaryRecordData.calculateBitSetWidthInBytes(row.getArity()); + this.fixedSize = row.getFixedLengthPartSize(); + this.cursor = fixedSize; + + this.segment = MemorySegmentFactory.wrap(new byte[fixedSize + initialSize]); + this.row = row; + this.row.pointTo(segment, 0, segment.size()); + } + + /** First, reset. */ + @Override + public void reset() { + this.cursor = fixedSize; + for (int i = 0; i < nullBitsSizeInBytes; i += 8) { + segment.putLong(i, 0L); + } + } + + /** Default not null. */ + @Override + public void setNullAt(int pos) { + setNullBit(pos); + segment.putLong(getFieldOffset(pos), 0L); + } + + @Override + public void setNullBit(int pos) { + BinarySegmentUtils.bitSet(segment, 0, pos + BinaryRecordData.HEADER_SIZE_IN_BITS); + } + + @Override + public void writeBoolean(int pos, boolean value) { + segment.putBoolean(getFieldOffset(pos), value); + } + + @Override + public void writeByte(int pos, byte value) { + segment.put(getFieldOffset(pos), value); + } + + @Override + public void writeShort(int pos, short value) { + segment.putShort(getFieldOffset(pos), value); + } + + @Override + public void writeInt(int pos, int value) { + segment.putInt(getFieldOffset(pos), value); + } + + @Override + public void writeLong(int pos, long value) { + segment.putLong(getFieldOffset(pos), value); + } + + @Override + public void writeFloat(int pos, float value) { + segment.putFloat(getFieldOffset(pos), value); + } + + @Override + public void writeDouble(int pos, double value) { + segment.putDouble(getFieldOffset(pos), value); + } + + @Override + public void complete() { + row.setTotalSize(cursor); + } + + @Override + public int getFieldOffset(int pos) { + return nullBitsSizeInBytes + 8 * pos; + } + + @Override + public void setOffsetAndSize(int pos, int offset, long size) { + final long offsetAndSize = ((long) offset << 32) | size; + segment.putLong(getFieldOffset(pos), offsetAndSize); + } + + @Override + public void afterGrow() { + row.pointTo(segment, 0, segment.size()); + } +} diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/BinaryWriter.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/BinaryWriter.java new file mode 100644 index 000000000..930f33567 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/data/writer/BinaryWriter.java @@ -0,0 +1,151 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.serializer.data.writer; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.common.data.ArrayData; +import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.LocalZonedTimestampData; +import com.ververica.cdc.common.data.MapData; +import com.ververica.cdc.common.data.RecordData; +import com.ververica.cdc.common.data.StringData; +import com.ververica.cdc.common.data.TimestampData; +import com.ververica.cdc.common.data.ZonedTimestampData; +import com.ververica.cdc.common.types.DataType; +import com.ververica.cdc.common.types.DecimalType; +import com.ververica.cdc.common.types.LocalZonedTimestampType; +import com.ververica.cdc.common.types.TimestampType; +import com.ververica.cdc.common.types.ZonedTimestampType; +import com.ververica.cdc.runtime.serializer.data.ArrayDataSerializer; +import com.ververica.cdc.runtime.serializer.data.RecordDataSerializer; + +/** + * Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write + * each field by writeXX or setNullAt. (Same field can not be written repeatedly.) 3. Invoke {@link + * #complete()}. + */ +@Internal +public interface BinaryWriter { + + /** Reset writer to prepare next write. */ + void reset(); + + /** Set null to this field. */ + void setNullAt(int pos); + + void writeBoolean(int pos, boolean value); + + void writeByte(int pos, byte value); + + void writeShort(int pos, short value); + + void writeInt(int pos, int value); + + void writeLong(int pos, long value); + + void writeFloat(int pos, float value); + + void writeDouble(int pos, double value); + + void writeString(int pos, StringData value); + + void writeBinary(int pos, byte[] bytes); + + void writeDecimal(int pos, DecimalData value, int precision); + + void writeTimestamp(int pos, TimestampData value, int precision); + + void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision); + + void writeZonedTimestamp(int pos, ZonedTimestampData value, int precision); + + void writeArray(int pos, ArrayData value, ArrayDataSerializer serializer); + + void writeMap(int pos, MapData value, TypeSerializer serializer); + + void writeRecord(int pos, RecordData value, RecordDataSerializer serializer); + + /** Finally, complete write to set real size to binary. */ + void complete(); + + static void write( + BinaryWriter writer, int pos, Object o, DataType type, TypeSerializer serializer) { + switch (type.getTypeRoot()) { + case BOOLEAN: + writer.writeBoolean(pos, (boolean) o); + break; + case TINYINT: + writer.writeByte(pos, (byte) o); + break; + case SMALLINT: + writer.writeShort(pos, (short) o); + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + writer.writeInt(pos, (int) o); + break; + case BIGINT: + writer.writeLong(pos, (long) o); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) type; + writer.writeTimestamp(pos, (TimestampData) o, timestampType.getPrecision()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type; + writer.writeLocalZonedTimestamp( + pos, (LocalZonedTimestampData) o, lzTs.getPrecision()); + break; + case TIMESTAMP_WITH_TIME_ZONE: + ZonedTimestampType zTs = (ZonedTimestampType) type; + writer.writeZonedTimestamp(pos, (ZonedTimestampData) o, zTs.getPrecision()); + break; + case FLOAT: + writer.writeFloat(pos, (float) o); + break; + case DOUBLE: + writer.writeDouble(pos, (double) o); + break; + case CHAR: + case VARCHAR: + writer.writeString(pos, (StringData) o); + break; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + writer.writeDecimal(pos, (DecimalData) o, decimalType.getPrecision()); + break; + case ARRAY: + writer.writeArray(pos, (ArrayData) o, (ArrayDataSerializer) serializer); + break; + case MAP: + writer.writeMap(pos, (MapData) o, (TypeSerializer) serializer); + break; + case ROW: + writer.writeRecord(pos, (RecordData) o, (RecordDataSerializer) serializer); + break; + case BINARY: + case VARBINARY: + writer.writeBinary(pos, (byte[]) o); + break; + default: + throw new UnsupportedOperationException("Not support type: " + type); + } + } +} diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorTest.java new file mode 100644 index 000000000..281a56dbe --- /dev/null +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/SchemaOperatorTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.runtime.operators.schema; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import com.ververica.cdc.common.data.GenericRecordData; +import com.ververica.cdc.common.data.GenericStringData; +import com.ververica.cdc.common.event.DataChangeEvent; +import com.ververica.cdc.common.event.Event; +import com.ververica.cdc.common.event.TableId; +import com.ververica.cdc.common.types.DataTypes; +import com.ververica.cdc.common.types.RowType; +import com.ververica.cdc.runtime.serializer.event.EventSerializer; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** A test for the {@link SchemaOperator}. */ +public class SchemaOperatorTest { + @Test + void testProcessElement() throws Exception { + final int maxParallelism = 4; + final int parallelism = 2; + final OperatorID opID = new OperatorID(); + final TableId tableId = TableId.tableId("testProcessElement"); + final RowType rowType = DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING()); + + List> testHarnesses = new ArrayList<>(); + for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness testHarness = + createTestHarness(maxParallelism, parallelism, subtaskIndex, opID); + testHarnesses.add(testHarness); + testHarness.setup(EventSerializer.INSTANCE); + testHarness.open(); + + Map meta = new HashMap<>(); + meta.put("subtask", String.valueOf(subtaskIndex)); + List testData = + Arrays.asList( + DataChangeEvent.updateEvent( + tableId, + rowType, + GenericRecordData.of(1L, GenericStringData.fromString("1")), + GenericRecordData.of(2L, GenericStringData.fromString("2")), + meta), + DataChangeEvent.updateEvent( + tableId, + rowType, + GenericRecordData.of(3L, GenericStringData.fromString("3")), + GenericRecordData.of(4L, GenericStringData.fromString("4")), + meta)); + for (Event event : testData) { + testHarness.processElement(event, 0); + } + + Collection> result = testHarness.getRecordOutput(); + assertThat(result.stream().map(StreamRecord::getValue).collect(Collectors.toList())) + .isEqualTo(testData); + } + + for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) { + testHarnesses.get(subtaskIndex).close(); + } + } + + private OneInputStreamOperatorTestHarness createTestHarness( + int maxParallelism, int parallelism, int subtaskIndex, OperatorID opID) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new SchemaOperator(), + maxParallelism, + parallelism, + subtaskIndex, + EventSerializer.INSTANCE, + opID); + } +}