[3.0][cdc-common] Introduce binaryRecordData to avoid type serialization and improve performance as well

This closes #2748.
pull/2737/head
Hang Ruan 1 year ago committed by Leonard Xu
parent 6a92546340
commit ac9bab3dce

@ -142,6 +142,18 @@ public final class TimestampData implements Comparable<TimestampData> {
return new TimestampData(milliseconds, nanosOfMillisecond); return new TimestampData(milliseconds, nanosOfMillisecond);
} }
/**
* Creates an instance of {@link TimestampData} from milliseconds.
*
* <p>The nanos-of-millisecond field will be set to zero.
*
* @param milliseconds the number of milliseconds since {@code 1970-01-01 00:00:00}; a negative
* number is the number of milliseconds before {@code 1970-01-01 00:00:00}
*/
public static TimestampData fromEpochMillis(long milliseconds) {
return new TimestampData(milliseconds, 0);
}
/** /**
* Creates an instance of {@link TimestampData} from an instance of {@link Timestamp}. * Creates an instance of {@link TimestampData} from an instance of {@link Timestamp}.
* *

@ -0,0 +1,62 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import org.apache.flink.core.memory.MemorySegment;
import com.ververica.cdc.common.annotation.Internal;
/** Binary format spanning {@link MemorySegment}s. */
@Internal
public interface BinaryFormat {
/**
* It decides whether to put data in FixLenPart or VarLenPart. See more in {@link
* BinaryRecordData}.
*
* <p>If len is less than 8, its binary format is: 1-bit mark(1) = 1, 7-bits len, and 7-bytes
* data. Data is stored in fix-length part.
*
* <p>If len is greater or equal to 8, its binary format is: 1-bit mark(1) = 0, 31-bits offset
* to the data, and 4-bytes length of data. Data is stored in variable-length part.
*/
int MAX_FIX_PART_DATA_SIZE = 7;
/**
* To get the mark in highest bit of long. Form: 10000000 00000000 ... (8 bytes)
*
* <p>This is used to decide whether the data is stored in fixed-length part or variable-length
* part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
*/
long HIGHEST_FIRST_BIT = 0x80L << 56;
/**
* To get the 7 bits length in second bit to eighth bit out of a long. Form: 01111111 00000000
* ... (8 bytes)
*
* <p>This is used to get the length of the data which is stored in this long. see {@link
* #MAX_FIX_PART_DATA_SIZE} for more information.
*/
long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
/** Gets the underlying {@link MemorySegment}s this binary format spans. */
MemorySegment[] getSegments();
/** Gets the start offset of this binary data in the {@link MemorySegment}s. */
int getOffset();
/** Gets the size in bytes of this binary data. */
int getSizeInBytes();
}

@ -0,0 +1,340 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.ArrayData;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.LocalZonedTimestampData;
import com.ververica.cdc.common.data.MapData;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.StringData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.ZonedTimestampData;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DecimalType;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.types.ZonedTimestampType;
import java.nio.ByteOrder;
import static com.ververica.cdc.common.types.DataTypeRoot.DECIMAL;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* An implementation of {@link RecordData} which is backed by {@link MemorySegment} instead of
* Object. It can significantly reduce the serialization/deserialization of Java objects.
*
* <p>A Row has two part: Fixed-length part and variable-length part.
*
* <p>Fixed-length part contains 1 byte header and null bit set and field values. Null bit set is
* used for null tracking and is aligned to 8-byte word boundaries. `Field values` holds
* fixed-length primitive types and variable-length values which can be stored in 8 bytes inside. If
* it do not fit the variable-length field, then store the length and offset of variable-length
* part.
*
* <p>Fixed-length part will certainly fall into a MemorySegment, which will speed up the read and
* write of field. During the write phase, if the target memory segment has less space than fixed
* length part size, we will skip the space. So the number of fields in a single Row cannot exceed
* the capacity of a single MemorySegment, if there are too many fields, we suggest that user set a
* bigger pageSize of MemorySegment.
*
* <p>Variable-length part may fall into multiple MemorySegments.
*/
@Internal
public final class BinaryRecordData extends BinarySection implements RecordData, NullAwareGetters {
public static final boolean LITTLE_ENDIAN =
(ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : ~(0xFFL << 56L);
public static final int HEADER_SIZE_IN_BITS = 8;
public static final String TIMESTAMP_DELIMITER = "|";
public static int calculateBitSetWidthInBytes(int arity) {
return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8;
}
public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}
/**
* If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place
* updates. If it is variable-length field, can't use this method, because the underlying data
* is stored continuously.
*/
public static boolean isInFixedLengthPart(DataType type) {
switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case BIGINT:
case FLOAT:
case DOUBLE:
return true;
case DECIMAL:
return DecimalData.isCompact(((DecimalType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampData.isCompact(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return LocalZonedTimestampData.isCompact(
((LocalZonedTimestampType) type).getPrecision());
case TIMESTAMP_WITH_TIME_ZONE:
return ZonedTimestampData.isCompact(((ZonedTimestampType) type).getPrecision());
default:
return false;
}
}
public static boolean isMutable(DataType type) {
return isInFixedLengthPart(type) || type.getTypeRoot() == DECIMAL;
}
private final int arity;
private final int nullBitsSizeInBytes;
public BinaryRecordData(int arity) {
checkArgument(arity >= 0);
this.arity = arity;
this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity);
}
private int getFieldOffset(int pos) {
return offset + nullBitsSizeInBytes + pos * 8;
}
private void assertIndexIsValid(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < arity : "index (" + index + ") should < " + arity;
}
public int getFixedLengthPartSize() {
return nullBitsSizeInBytes + 8 * arity;
}
@Override
public int getArity() {
return arity;
}
@Override
public boolean isNullAt(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.bitGet(segments[0], offset, pos + HEADER_SIZE_IN_BITS);
}
@Override
public boolean getBoolean(int pos) {
assertIndexIsValid(pos);
return segments[0].getBoolean(getFieldOffset(pos));
}
@Override
public byte getByte(int pos) {
assertIndexIsValid(pos);
return segments[0].get(getFieldOffset(pos));
}
@Override
public short getShort(int pos) {
assertIndexIsValid(pos);
return segments[0].getShort(getFieldOffset(pos));
}
@Override
public int getInt(int pos) {
assertIndexIsValid(pos);
return segments[0].getInt(getFieldOffset(pos));
}
@Override
public long getLong(int pos) {
assertIndexIsValid(pos);
return segments[0].getLong(getFieldOffset(pos));
}
@Override
public float getFloat(int pos) {
assertIndexIsValid(pos);
return segments[0].getFloat(getFieldOffset(pos));
}
@Override
public double getDouble(int pos) {
assertIndexIsValid(pos);
return segments[0].getDouble(getFieldOffset(pos));
}
@Override
public StringData getString(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readStringData(segments, offset, fieldOffset, offsetAndLen);
}
@Override
public DecimalData getDecimal(int pos, int precision, int scale) {
assertIndexIsValid(pos);
if (DecimalData.isCompact(precision)) {
return DecimalData.fromUnscaledLong(
segments[0].getLong(getFieldOffset(pos)), precision, scale);
}
int fieldOffset = getFieldOffset(pos);
final long offsetAndSize = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readDecimalData(
segments, offset, offsetAndSize, precision, scale);
}
@Override
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);
if (TimestampData.isCompact(precision)) {
return TimestampData.fromEpochMillis(segments[0].getLong(getFieldOffset(pos)));
}
int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli);
}
@Override
public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
String[] parts = getString(pos).toString().split(TIMESTAMP_DELIMITER);
return ZonedTimestampData.of(
Long.parseLong(parts[0]), Integer.parseInt(parts[1]), parts[2]);
}
@Override
public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) {
assertIndexIsValid(pos);
if (LocalZonedTimestampData.isCompact(precision)) {
return LocalZonedTimestampData.fromEpochMillis(
segments[0].getLong(getFieldOffset(pos)));
}
int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readLocalZonedTimestampData(
segments, offset, offsetAndNanoOfMilli);
}
@Override
public byte[] getBinary(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen);
}
@Override
public ArrayData getArray(int pos) {
throw new UnsupportedOperationException("Not support ArrayData");
}
@Override
public MapData getMap(int pos) {
throw new UnsupportedOperationException("Not support MapData.");
}
@Override
public RecordData getRow(int pos, int numFields) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readRecordData(segments, numFields, offset, getLong(pos));
}
/** The bit is 1 when the field is null. Default is 0. */
@Override
public boolean anyNull() {
// Skip the header.
if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) {
return true;
}
for (int i = 8; i < nullBitsSizeInBytes; i += 8) {
if (segments[0].getLong(i) != 0) {
return true;
}
}
return false;
}
@Override
public boolean anyNull(int[] fields) {
for (int field : fields) {
if (isNullAt(field)) {
return true;
}
}
return false;
}
public BinaryRecordData copy() {
return copy(new BinaryRecordData(arity));
}
public BinaryRecordData copy(BinaryRecordData reuse) {
return copyInternal(reuse);
}
private BinaryRecordData copyInternal(BinaryRecordData reuse) {
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
return reuse;
}
public void clear() {
segments = null;
offset = 0;
sizeInBytes = 0;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
// both BinaryRowData and NestedRowData have the same memory format
if (!(o instanceof BinaryRecordData)) {
return false;
}
final BinarySection that = (BinarySection) o;
return sizeInBytes == that.sizeInBytes
&& BinarySegmentUtils.equals(
segments, offset, that.segments, that.offset, sizeInBytes);
}
@Override
public int hashCode() {
return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes);
}
public void setTotalSize(int sizeInBytes) {
this.sizeInBytes = sizeInBytes;
}
}

@ -0,0 +1,82 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import org.apache.flink.core.memory.MemorySegment;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.utils.Preconditions;
/** A basic implementation of {@link BinaryFormat} which describe a section of memory. */
@Internal
public class BinarySection implements BinaryFormat {
protected MemorySegment[] segments;
protected int offset;
protected int sizeInBytes;
public BinarySection() {}
public BinarySection(MemorySegment[] segments, int offset, int sizeInBytes) {
Preconditions.checkArgument(segments != null);
this.segments = segments;
this.offset = offset;
this.sizeInBytes = sizeInBytes;
}
public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
pointTo(new MemorySegment[] {segment}, offset, sizeInBytes);
}
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
Preconditions.checkArgument(segments != null);
this.segments = segments;
this.offset = offset;
this.sizeInBytes = sizeInBytes;
}
public MemorySegment[] getSegments() {
return segments;
}
public int getOffset() {
return offset;
}
public int getSizeInBytes() {
return sizeInBytes;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final BinarySection that = (BinarySection) o;
return sizeInBytes == that.sizeInBytes
&& BinarySegmentUtils.equals(
segments, offset, that.segments, that.offset, sizeInBytes);
}
@Override
public int hashCode() {
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
}
}

@ -0,0 +1,878 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.StringData;
import com.ververica.cdc.common.utils.StringUtf8Utils;
import javax.annotation.Nonnull;
import java.util.Arrays;
import static com.ververica.cdc.common.data.binary.BinarySegmentUtils.getBytes;
import static com.ververica.cdc.common.utils.Preconditions.checkArgument;
/**
* A lazily binary implementation of {@link StringData} which is backed by {@link MemorySegment}s
* and {@link String}.
*
* <p>Either {@link MemorySegment}s or {@link String} must be provided when constructing {@link
* BinaryStringData}. The other representation will be materialized when needed.
*
* <p>It provides many useful methods for comparison, search, and so on.
*/
@Internal
public final class BinaryStringData extends LazyBinaryFormat<String> implements StringData {
public static final BinaryStringData EMPTY_UTF8 =
BinaryStringData.fromBytes(StringUtf8Utils.encodeUTF8(""));
public BinaryStringData() {}
public BinaryStringData(String javaObject) {
super(javaObject);
}
public BinaryStringData(MemorySegment[] segments, int offset, int sizeInBytes) {
super(segments, offset, sizeInBytes);
}
public BinaryStringData(
MemorySegment[] segments, int offset, int sizeInBytes, String javaObject) {
super(segments, offset, sizeInBytes, javaObject);
}
// ------------------------------------------------------------------------------------------
// Construction Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates a {@link BinaryStringData} instance from the given address (base and offset) and
* length.
*/
public static BinaryStringData fromAddress(MemorySegment[] segments, int offset, int numBytes) {
return new BinaryStringData(segments, offset, numBytes);
}
/** Creates a {@link BinaryStringData} instance from the given Java string. */
public static BinaryStringData fromString(String str) {
if (str == null) {
return null;
} else {
return new BinaryStringData(str);
}
}
/** Creates a {@link BinaryStringData} instance from the given UTF-8 bytes. */
public static BinaryStringData fromBytes(byte[] bytes) {
return fromBytes(bytes, 0, bytes.length);
}
/**
* Creates a {@link BinaryStringData} instance from the given UTF-8 bytes with offset and number
* of bytes.
*/
public static BinaryStringData fromBytes(byte[] bytes, int offset, int numBytes) {
return new BinaryStringData(
new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, offset, numBytes);
}
/** Creates a {@link BinaryStringData} instance that contains `length` spaces. */
public static BinaryStringData blankString(int length) {
byte[] spaces = new byte[length];
Arrays.fill(spaces, (byte) ' ');
return fromBytes(spaces);
}
// ------------------------------------------------------------------------------------------
// Public Interfaces
// ------------------------------------------------------------------------------------------
@Override
public byte[] toBytes() {
ensureMaterialized();
return getBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
}
@Override
public boolean equals(Object o) {
if (o instanceof BinaryStringData) {
BinaryStringData other = (BinaryStringData) o;
if (javaObject != null && other.javaObject != null) {
return javaObject.equals(other.javaObject);
}
ensureMaterialized();
other.ensureMaterialized();
return binarySection.equals(other.binarySection);
} else {
return false;
}
}
@Override
public int hashCode() {
ensureMaterialized();
return binarySection.hashCode();
}
@Override
public String toString() {
if (javaObject == null) {
byte[] bytes =
org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReuseBytes(
binarySection.sizeInBytes);
org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(
binarySection.segments,
binarySection.offset,
bytes,
0,
binarySection.sizeInBytes);
javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, binarySection.sizeInBytes);
}
return javaObject;
}
/**
* Compares two strings lexicographically. Since UTF-8 uses groups of six bits, it is sometimes
* useful to use octal notation which uses 3-bit groups. With a calculator which can convert
* between hexadecimal and octal it can be easier to manually create or interpret UTF-8 compared
* with using binary. So we just compare the binary.
*/
@Override
public int compareTo(@Nonnull StringData o) {
// BinaryStringData is the only implementation of StringData
BinaryStringData other = (BinaryStringData) o;
if (javaObject != null && other.javaObject != null) {
return javaObject.compareTo(other.javaObject);
}
ensureMaterialized();
other.ensureMaterialized();
if (binarySection.segments.length == 1 && other.binarySection.segments.length == 1) {
int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes);
MemorySegment seg1 = binarySection.segments[0];
MemorySegment seg2 = other.binarySection.segments[0];
for (int i = 0; i < len; i++) {
int res =
(seg1.get(binarySection.offset + i) & 0xFF)
- (seg2.get(other.binarySection.offset + i) & 0xFF);
if (res != 0) {
return res;
}
}
return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
}
// if there are multi segments.
return compareMultiSegments(other);
}
/** Find the boundaries of segments, and then compare MemorySegment. */
private int compareMultiSegments(BinaryStringData other) {
if (binarySection.sizeInBytes == 0 || other.binarySection.sizeInBytes == 0) {
return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
}
int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes);
MemorySegment seg1 = binarySection.segments[0];
MemorySegment seg2 = other.binarySection.segments[0];
int segmentSize = binarySection.segments[0].size();
int otherSegmentSize = other.binarySection.segments[0].size();
int sizeOfFirst1 = segmentSize - binarySection.offset;
int sizeOfFirst2 = otherSegmentSize - other.binarySection.offset;
int varSegIndex1 = 1;
int varSegIndex2 = 1;
// find the first segment of this string.
while (sizeOfFirst1 <= 0) {
sizeOfFirst1 += segmentSize;
seg1 = binarySection.segments[varSegIndex1++];
}
while (sizeOfFirst2 <= 0) {
sizeOfFirst2 += otherSegmentSize;
seg2 = other.binarySection.segments[varSegIndex2++];
}
int offset1 = segmentSize - sizeOfFirst1;
int offset2 = otherSegmentSize - sizeOfFirst2;
int needCompare = Math.min(Math.min(sizeOfFirst1, sizeOfFirst2), len);
while (needCompare > 0) {
// compare in one segment.
for (int i = 0; i < needCompare; i++) {
int res = (seg1.get(offset1 + i) & 0xFF) - (seg2.get(offset2 + i) & 0xFF);
if (res != 0) {
return res;
}
}
if (needCompare == len) {
break;
}
len -= needCompare;
// next segment
if (sizeOfFirst1 < sizeOfFirst2) { // I am smaller
seg1 = binarySection.segments[varSegIndex1++];
offset1 = 0;
offset2 += needCompare;
sizeOfFirst1 = segmentSize;
sizeOfFirst2 -= needCompare;
} else if (sizeOfFirst1 > sizeOfFirst2) { // other is smaller
seg2 = other.binarySection.segments[varSegIndex2++];
offset2 = 0;
offset1 += needCompare;
sizeOfFirst2 = otherSegmentSize;
sizeOfFirst1 -= needCompare;
} else { // same, should go ahead both.
seg1 = binarySection.segments[varSegIndex1++];
seg2 = other.binarySection.segments[varSegIndex2++];
offset1 = 0;
offset2 = 0;
sizeOfFirst1 = segmentSize;
sizeOfFirst2 = otherSegmentSize;
}
needCompare = Math.min(Math.min(sizeOfFirst1, sizeOfFirst2), len);
}
checkArgument(needCompare == len);
return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
}
// ------------------------------------------------------------------------------------------
// Public methods on BinaryStringData
// ------------------------------------------------------------------------------------------
/** Returns the number of UTF-8 code points in the string. */
public int numChars() {
ensureMaterialized();
if (inFirstSegment()) {
int len = 0;
for (int i = 0;
i < binarySection.sizeInBytes;
i += numBytesForFirstByte(getByteOneSegment(i))) {
len++;
}
return len;
} else {
return numCharsMultiSegs();
}
}
private int numCharsMultiSegs() {
int len = 0;
int segSize = binarySection.segments[0].size();
SegmentAndOffset index = firstSegmentAndOffset(segSize);
int i = 0;
while (i < binarySection.sizeInBytes) {
int charBytes = numBytesForFirstByte(index.value());
i += charBytes;
len++;
index.skipBytes(charBytes, segSize);
}
return len;
}
/**
* Returns the {@code byte} value at the specified index. An index ranges from {@code 0} to
* {@code binarySection.sizeInBytes - 1}.
*
* @param index the index of the {@code byte} value.
* @return the {@code byte} value at the specified index of this UTF-8 bytes.
* @exception IndexOutOfBoundsException if the {@code index} argument is negative or not less
* than the length of this UTF-8 bytes.
*/
public byte byteAt(int index) {
ensureMaterialized();
int globalOffset = binarySection.offset + index;
int size = binarySection.segments[0].size();
if (globalOffset < size) {
return binarySection.segments[0].get(globalOffset);
} else {
return binarySection.segments[globalOffset / size].get(globalOffset % size);
}
}
@Override
public MemorySegment[] getSegments() {
ensureMaterialized();
return super.getSegments();
}
@Override
public int getOffset() {
ensureMaterialized();
return super.getOffset();
}
@Override
public int getSizeInBytes() {
ensureMaterialized();
return super.getSizeInBytes();
}
public void ensureMaterialized() {
ensureMaterialized(null);
}
@Override
protected BinarySection materialize(TypeSerializer<String> serializer) {
if (serializer != null) {
throw new IllegalArgumentException(
"BinaryStringData does not support custom serializers");
}
byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject);
return new BinarySection(
new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
}
/** Copy a new {@code BinaryStringData}. */
public BinaryStringData copy() {
ensureMaterialized();
byte[] copy =
org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(
binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
return new BinaryStringData(
new MemorySegment[] {MemorySegmentFactory.wrap(copy)},
0,
binarySection.sizeInBytes,
javaObject);
}
/**
* Returns a binary string that is a substring of this binary string. The substring begins at
* the specified {@code beginIndex} and extends to the character at index {@code endIndex - 1}.
*
* <p>Examples:
*
* <blockquote>
*
* <pre>
* fromString("hamburger").substring(4, 8) returns binary string "urge"
* fromString("smiles").substring(1, 5) returns binary string "mile"
* </pre>
*
* </blockquote>
*
* @param beginIndex the beginning index, inclusive.
* @param endIndex the ending index, exclusive.
* @return the specified substring, return EMPTY_UTF8 when index out of bounds instead of
* StringIndexOutOfBoundsException.
*/
public BinaryStringData substring(int beginIndex, int endIndex) {
ensureMaterialized();
if (endIndex <= beginIndex || beginIndex >= binarySection.sizeInBytes) {
return EMPTY_UTF8;
}
if (inFirstSegment()) {
MemorySegment segment = binarySection.segments[0];
int i = 0;
int c = 0;
while (i < binarySection.sizeInBytes && c < beginIndex) {
i += numBytesForFirstByte(segment.get(i + binarySection.offset));
c += 1;
}
int j = i;
while (i < binarySection.sizeInBytes && c < endIndex) {
i += numBytesForFirstByte(segment.get(i + binarySection.offset));
c += 1;
}
if (i > j) {
byte[] bytes = new byte[i - j];
segment.get(binarySection.offset + j, bytes, 0, i - j);
return fromBytes(bytes);
} else {
return EMPTY_UTF8;
}
} else {
return substringMultiSegs(beginIndex, endIndex);
}
}
private BinaryStringData substringMultiSegs(final int start, final int until) {
int segSize = binarySection.segments[0].size();
SegmentAndOffset index = firstSegmentAndOffset(segSize);
int i = 0;
int c = 0;
while (i < binarySection.sizeInBytes && c < start) {
int charSize = numBytesForFirstByte(index.value());
i += charSize;
index.skipBytes(charSize, segSize);
c += 1;
}
int j = i;
while (i < binarySection.sizeInBytes && c < until) {
int charSize = numBytesForFirstByte(index.value());
i += charSize;
index.skipBytes(charSize, segSize);
c += 1;
}
if (i > j) {
return fromBytes(
org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(
binarySection.segments, binarySection.offset + j, i - j));
} else {
return EMPTY_UTF8;
}
}
/**
* Returns true if and only if this BinaryStringData contains the specified sequence of bytes
* values.
*
* @param s the sequence to search for
* @return true if this BinaryStringData contains {@code s}, false otherwise
*/
public boolean contains(final BinaryStringData s) {
ensureMaterialized();
s.ensureMaterialized();
if (s.binarySection.sizeInBytes == 0) {
return true;
}
int find =
org.apache.flink.table.data.binary.BinarySegmentUtils.find(
binarySection.segments,
binarySection.offset,
binarySection.sizeInBytes,
s.binarySection.segments,
s.binarySection.offset,
s.binarySection.sizeInBytes);
return find != -1;
}
/**
* Tests if this BinaryStringData starts with the specified prefix.
*
* @param prefix the prefix.
* @return {@code true} if the bytes represented by the argument is a prefix of the bytes
* represented by this string; {@code false} otherwise. Note also that {@code true} will be
* returned if the argument is an empty BinaryStringData or is equal to this {@code
* BinaryStringData} object as determined by the {@link #equals(Object)} method.
*/
public boolean startsWith(final BinaryStringData prefix) {
ensureMaterialized();
prefix.ensureMaterialized();
return matchAt(prefix, 0);
}
/**
* Tests if this BinaryStringData ends with the specified suffix.
*
* @param suffix the suffix.
* @return {@code true} if the bytes represented by the argument is a suffix of the bytes
* represented by this object; {@code false} otherwise. Note that the result will be {@code
* true} if the argument is the empty string or is equal to this {@code BinaryStringData}
* object as determined by the {@link #equals(Object)} method.
*/
public boolean endsWith(final BinaryStringData suffix) {
ensureMaterialized();
suffix.ensureMaterialized();
return matchAt(suffix, binarySection.sizeInBytes - suffix.binarySection.sizeInBytes);
}
/**
* Returns a string whose value is this string, with any leading and trailing whitespace
* removed.
*
* @return A string whose value is this string, with any leading and trailing white space
* removed, or this string if it has no leading or trailing white space.
*/
public BinaryStringData trim() {
ensureMaterialized();
if (inFirstSegment()) {
int s = 0;
int e = this.binarySection.sizeInBytes - 1;
// skip all of the space (0x20) in the left side
while (s < this.binarySection.sizeInBytes && getByteOneSegment(s) == 0x20) {
s++;
}
// skip all of the space (0x20) in the right side
while (e >= s && getByteOneSegment(e) == 0x20) {
e--;
}
if (s > e) {
// empty string
return EMPTY_UTF8;
} else {
return copyBinaryStringInOneSeg(s, e - s + 1);
}
} else {
return trimMultiSegs();
}
}
private BinaryStringData trimMultiSegs() {
int s = 0;
int e = this.binarySection.sizeInBytes - 1;
int segSize = binarySection.segments[0].size();
SegmentAndOffset front = firstSegmentAndOffset(segSize);
// skip all of the space (0x20) in the left side
while (s < this.binarySection.sizeInBytes && front.value() == 0x20) {
s++;
front.nextByte(segSize);
}
SegmentAndOffset behind = lastSegmentAndOffset(segSize);
// skip all of the space (0x20) in the right side
while (e >= s && behind.value() == 0x20) {
e--;
behind.previousByte(segSize);
}
if (s > e) {
// empty string
return EMPTY_UTF8;
} else {
return copyBinaryString(s, e);
}
}
/**
* Returns the index within this string of the first occurrence of the specified substring,
* starting at the specified index.
*
* @param str the substring to search for.
* @param fromIndex the index from which to start the search.
* @return the index of the first occurrence of the specified substring, starting at the
* specified index, or {@code -1} if there is no such occurrence.
*/
public int indexOf(BinaryStringData str, int fromIndex) {
ensureMaterialized();
str.ensureMaterialized();
if (str.binarySection.sizeInBytes == 0) {
return 0;
}
if (inFirstSegment()) {
// position in byte
int byteIdx = 0;
// position is char
int charIdx = 0;
while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) {
byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx));
charIdx++;
}
do {
if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) {
return -1;
}
if (org.apache.flink.table.data.binary.BinarySegmentUtils.equals(
binarySection.segments,
binarySection.offset + byteIdx,
str.binarySection.segments,
str.binarySection.offset,
str.binarySection.sizeInBytes)) {
return charIdx;
}
byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx));
charIdx++;
} while (byteIdx < binarySection.sizeInBytes);
return -1;
} else {
return indexOfMultiSegs(str, fromIndex);
}
}
private int indexOfMultiSegs(BinaryStringData str, int fromIndex) {
// position in byte
int byteIdx = 0;
// position is char
int charIdx = 0;
int segSize = binarySection.segments[0].size();
SegmentAndOffset index = firstSegmentAndOffset(segSize);
while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) {
int charBytes = numBytesForFirstByte(index.value());
byteIdx += charBytes;
charIdx++;
index.skipBytes(charBytes, segSize);
}
do {
if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) {
return -1;
}
if (org.apache.flink.table.data.binary.BinarySegmentUtils.equals(
binarySection.segments,
binarySection.offset + byteIdx,
str.binarySection.segments,
str.binarySection.offset,
str.binarySection.sizeInBytes)) {
return charIdx;
}
int charBytes = numBytesForFirstByte(index.segment.get(index.offset));
byteIdx += charBytes;
charIdx++;
index.skipBytes(charBytes, segSize);
} while (byteIdx < binarySection.sizeInBytes);
return -1;
}
/**
* Converts all of the characters in this {@code BinaryStringData} to upper case.
*
* @return the {@code BinaryStringData}, converted to uppercase.
*/
public BinaryStringData toUpperCase() {
if (javaObject != null) {
return javaToUpperCase();
}
if (binarySection.sizeInBytes == 0) {
return EMPTY_UTF8;
}
int size = binarySection.segments[0].size();
SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
byte[] bytes = new byte[binarySection.sizeInBytes];
bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value());
for (int i = 0; i < binarySection.sizeInBytes; i++) {
byte b = segmentAndOffset.value();
if (numBytesForFirstByte(b) != 1) {
// fallback
return javaToUpperCase();
}
int upper = Character.toUpperCase((int) b);
if (upper > 127) {
// fallback
return javaToUpperCase();
}
bytes[i] = (byte) upper;
segmentAndOffset.nextByte(size);
}
return fromBytes(bytes);
}
private BinaryStringData javaToUpperCase() {
return fromString(toString().toUpperCase());
}
/**
* Converts all of the characters in this {@code BinaryStringData} to lower case.
*
* @return the {@code BinaryStringData}, converted to lowercase.
*/
public BinaryStringData toLowerCase() {
if (javaObject != null) {
return javaToLowerCase();
}
if (binarySection.sizeInBytes == 0) {
return EMPTY_UTF8;
}
int size = binarySection.segments[0].size();
SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
byte[] bytes = new byte[binarySection.sizeInBytes];
bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value());
for (int i = 0; i < binarySection.sizeInBytes; i++) {
byte b = segmentAndOffset.value();
if (numBytesForFirstByte(b) != 1) {
// fallback
return javaToLowerCase();
}
int lower = Character.toLowerCase((int) b);
if (lower > 127) {
// fallback
return javaToLowerCase();
}
bytes[i] = (byte) lower;
segmentAndOffset.nextByte(size);
}
return fromBytes(bytes);
}
private BinaryStringData javaToLowerCase() {
return fromString(toString().toLowerCase());
}
// ------------------------------------------------------------------------------------------
// Internal methods on BinaryStringData
// ------------------------------------------------------------------------------------------
byte getByteOneSegment(int i) {
return binarySection.segments[0].get(binarySection.offset + i);
}
boolean inFirstSegment() {
return binarySection.sizeInBytes + binarySection.offset <= binarySection.segments[0].size();
}
private boolean matchAt(final BinaryStringData s, int pos) {
return (inFirstSegment() && s.inFirstSegment())
? matchAtOneSeg(s, pos)
: matchAtVarSeg(s, pos);
}
private boolean matchAtOneSeg(final BinaryStringData s, int pos) {
return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes
&& pos >= 0
&& binarySection.segments[0].equalTo(
s.binarySection.segments[0],
binarySection.offset + pos,
s.binarySection.offset,
s.binarySection.sizeInBytes);
}
private boolean matchAtVarSeg(final BinaryStringData s, int pos) {
return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes
&& pos >= 0
&& org.apache.flink.table.data.binary.BinarySegmentUtils.equals(
binarySection.segments,
binarySection.offset + pos,
s.binarySection.segments,
s.binarySection.offset,
s.binarySection.sizeInBytes);
}
BinaryStringData copyBinaryStringInOneSeg(int start, int len) {
byte[] newBytes = new byte[len];
binarySection.segments[0].get(binarySection.offset + start, newBytes, 0, len);
return fromBytes(newBytes);
}
BinaryStringData copyBinaryString(int start, int end) {
int len = end - start + 1;
byte[] newBytes = new byte[len];
BinarySegmentUtils.copyToBytes(
binarySection.segments, binarySection.offset + start, newBytes, 0, len);
return fromBytes(newBytes);
}
SegmentAndOffset firstSegmentAndOffset(int segSize) {
int segIndex = binarySection.offset / segSize;
return new SegmentAndOffset(segIndex, binarySection.offset % segSize);
}
SegmentAndOffset lastSegmentAndOffset(int segSize) {
int lastOffset = binarySection.offset + binarySection.sizeInBytes - 1;
int segIndex = lastOffset / segSize;
return new SegmentAndOffset(segIndex, lastOffset % segSize);
}
private SegmentAndOffset startSegmentAndOffset(int segSize) {
return inFirstSegment()
? new SegmentAndOffset(0, binarySection.offset)
: firstSegmentAndOffset(segSize);
}
/** CurrentSegment and positionInSegment. */
class SegmentAndOffset {
int segIndex;
MemorySegment segment;
int offset;
private SegmentAndOffset(int segIndex, int offset) {
this.segIndex = segIndex;
this.segment = binarySection.segments[segIndex];
this.offset = offset;
}
private void assignSegment() {
segment =
segIndex >= 0 && segIndex < binarySection.segments.length
? binarySection.segments[segIndex]
: null;
}
void previousByte(int segSize) {
offset--;
if (offset == -1) {
segIndex--;
assignSegment();
offset = segSize - 1;
}
}
void nextByte(int segSize) {
offset++;
checkAdvance(segSize);
}
private void checkAdvance(int segSize) {
if (offset == segSize) {
advance();
}
}
private void advance() {
segIndex++;
assignSegment();
offset = 0;
}
void skipBytes(int n, int segSize) {
int remaining = segSize - this.offset;
if (remaining > n) {
this.offset += n;
} else {
while (true) {
int toSkip = Math.min(remaining, n);
n -= toSkip;
if (n <= 0) {
this.offset += toSkip;
checkAdvance(segSize);
return;
}
advance();
remaining = segSize - this.offset;
}
}
}
byte value() {
return this.segment.get(this.offset);
}
}
/**
* Returns the number of bytes for a code point with the first byte as `b`.
*
* @param b The first byte of a code point
*/
static int numBytesForFirstByte(final byte b) {
if (b >= 0) {
// 1 byte, 7 bits: 0xxxxxxx
return 1;
} else if ((b >> 5) == -2 && (b & 0x1e) != 0) {
// 2 bytes, 11 bits: 110xxxxx 10xxxxxx
return 2;
} else if ((b >> 4) == -2) {
// 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
return 3;
} else if ((b >> 3) == -2) {
// 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
return 4;
} else {
// Skip the first byte disallowed in UTF-8
// Handling errors quietly, same semantics to java String.
return 1;
}
}
}

@ -0,0 +1,136 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import com.ververica.cdc.common.annotation.Internal;
import java.io.IOException;
/**
* An abstract implementation fo {@link BinaryFormat} which is lazily serialized into binary or
* lazily deserialized into Java object.
*
* <p>The reason why we introduce this data structure is in order to save (de)serialization in
* nested function calls. Consider the following function call chain:
*
* <pre>UDF0(input) -> UDF1(result0) -> UDF2(result1) -> UDF3(result2)</pre>
*
* <p>Such nested calls, if the return values of UDFs are Java object format, it will result in
* multiple conversions between Java object and binary format:
*
* <pre>
* converterToBinary(UDF0(converterToJavaObject(input))) ->
* converterToBinary(UDF1(converterToJavaObject(result0))) ->
* converterToBinary(UDF2(converterToJavaObject(result1))) ->
* ...
* </pre>
*
* <p>So we introduced {@link LazyBinaryFormat} to avoid the redundant cost, it has three forms:
*
* <ul>
* <li>Binary form
* <li>Java object form
* <li>Binary and Java object both exist
* </ul>
*
* <p>It can lazy the conversions as much as possible. It will be converted into required form only
* when it is needed.
*/
@Internal
public abstract class LazyBinaryFormat<T> implements BinaryFormat {
T javaObject;
BinarySection binarySection;
public LazyBinaryFormat() {
this(null, null);
}
public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) {
this(javaObject, new BinarySection(segments, offset, sizeInBytes));
}
public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) {
this(null, new BinarySection(segments, offset, sizeInBytes));
}
public LazyBinaryFormat(T javaObject) {
this(javaObject, null);
}
public LazyBinaryFormat(T javaObject, BinarySection binarySection) {
this.javaObject = javaObject;
this.binarySection = binarySection;
}
public T getJavaObject() {
return javaObject;
}
public BinarySection getBinarySection() {
return binarySection;
}
/** Must be public as it is used during code generation. */
public void setJavaObject(T javaObject) {
this.javaObject = javaObject;
}
@Override
public MemorySegment[] getSegments() {
if (binarySection == null) {
throw new IllegalStateException("Lazy Binary Format was not materialized");
}
return binarySection.segments;
}
@Override
public int getOffset() {
if (binarySection == null) {
throw new IllegalStateException("Lazy Binary Format was not materialized");
}
return binarySection.offset;
}
@Override
public int getSizeInBytes() {
if (binarySection == null) {
throw new IllegalStateException("Lazy Binary Format was not materialized");
}
return binarySection.sizeInBytes;
}
/** Ensure we have materialized binary format. */
public final void ensureMaterialized(TypeSerializer<T> serializer) {
if (binarySection == null) {
try {
this.binarySection = materialize(serializer);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
/**
* Materialize java object to binary format. Inherited classes need to hold the information they
* need.
*/
protected abstract BinarySection materialize(TypeSerializer<T> serializer) throws IOException;
}

@ -0,0 +1,179 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import org.apache.flink.core.memory.MemorySegment;
import com.ververica.cdc.common.annotation.Internal;
import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
/** Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction. */
@Internal
final class MurmurHashUtils {
private static final int C1 = 0xcc9e2d51;
private static final int C2 = 0x1b873593;
public static final int DEFAULT_SEED = 42;
private MurmurHashUtils() {
// do not instantiate
}
/**
* Hash unsafe bytes, length must be aligned to 4 bytes.
*
* @param base base unsafe object
* @param offset offset for unsafe object
* @param lengthInBytes length in bytes
* @return hash code
*/
public static int hashUnsafeBytesByWords(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytesByWords(base, offset, lengthInBytes, DEFAULT_SEED);
}
/**
* Hash unsafe bytes.
*
* @param base base unsafe object
* @param offset offset for unsafe object
* @param lengthInBytes length in bytes
* @return hash code
*/
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
return hashUnsafeBytes(base, offset, lengthInBytes, DEFAULT_SEED);
}
/**
* Hash bytes in MemorySegment, length must be aligned to 4 bytes.
*
* @param segment segment.
* @param offset offset for MemorySegment
* @param lengthInBytes length in MemorySegment
* @return hash code
*/
public static int hashBytesByWords(MemorySegment segment, int offset, int lengthInBytes) {
return hashBytesByWords(segment, offset, lengthInBytes, DEFAULT_SEED);
}
/**
* Hash bytes in MemorySegment.
*
* @param segment segment.
* @param offset offset for MemorySegment
* @param lengthInBytes length in MemorySegment
* @return hash code
*/
public static int hashBytes(MemorySegment segment, int offset, int lengthInBytes) {
return hashBytes(segment, offset, lengthInBytes, DEFAULT_SEED);
}
private static int hashUnsafeBytesByWords(
Object base, long offset, int lengthInBytes, int seed) {
int h1 = hashUnsafeBytesByInt(base, offset, lengthInBytes, seed);
return fmix(h1, lengthInBytes);
}
private static int hashBytesByWords(
MemorySegment segment, int offset, int lengthInBytes, int seed) {
int h1 = hashBytesByInt(segment, offset, lengthInBytes, seed);
return fmix(h1, lengthInBytes);
}
private static int hashBytes(MemorySegment segment, int offset, int lengthInBytes, int seed) {
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashBytesByInt(segment, offset, lengthAligned, seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int k1 = mixK1(segment.get(offset + i));
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}
private static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
int h1 = hashUnsafeBytesByInt(base, offset, lengthAligned, seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
int halfWord = UNSAFE.getByte(base, offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}
private static int hashUnsafeBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = UNSAFE.getInt(base, offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return h1;
}
private static int hashBytesByInt(
MemorySegment segment, int offset, int lengthInBytes, int seed) {
assert (lengthInBytes % 4 == 0);
int h1 = seed;
for (int i = 0; i < lengthInBytes; i += 4) {
int halfWord = segment.getInt(offset + i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return h1;
}
private static int mixK1(int k1) {
k1 *= C1;
k1 = Integer.rotateLeft(k1, 15);
k1 *= C2;
return k1;
}
private static int mixH1(int h1, int k1) {
h1 ^= k1;
h1 = Integer.rotateLeft(h1, 13);
h1 = h1 * 5 + 0xe6546b64;
return h1;
}
// Finalization mix - force all bits of a hash block to avalanche
private static int fmix(int h1, int length) {
h1 ^= length;
return fmix(h1);
}
public static int fmix(int h) {
h ^= h >>> 16;
h *= 0x85ebca6b;
h ^= h >>> 13;
h *= 0xc2b2ae35;
h ^= h >>> 16;
return h;
}
public static long fmix(long h) {
h ^= (h >>> 33);
h *= 0xff51afd7ed558ccdL;
h ^= (h >>> 33);
h *= 0xc4ceb9fe1a85ec53L;
h ^= (h >>> 33);
return h;
}
}

@ -0,0 +1,33 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data.binary;
import com.ververica.cdc.common.annotation.Internal;
/** Provides null related getters. */
@Internal
public interface NullAwareGetters {
/** If no field is null, return false. Returns true if one of the columns is null. */
boolean anyNull();
/**
* For the input fields, if no field is null, return false. Returns true if one of the columns
* is null.
*/
boolean anyNull(int[] fields);
}

@ -51,6 +51,20 @@ under the License.
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -27,12 +27,16 @@ import org.apache.flink.core.memory.DataOutputView;
import com.ververica.cdc.common.data.GenericRecordData; import com.ververica.cdc.common.data.GenericRecordData;
import com.ververica.cdc.common.data.RecordData; import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.types.DataType; import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.RowType; import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.common.utils.InstantiationUtil; import com.ververica.cdc.common.utils.InstantiationUtil;
import com.ververica.cdc.runtime.serializer.InternalSerializers; import com.ververica.cdc.runtime.serializer.InternalSerializers;
import com.ververica.cdc.runtime.serializer.NestedSerializersSnapshotDelegate; import com.ververica.cdc.runtime.serializer.NestedSerializersSnapshotDelegate;
import com.ververica.cdc.runtime.serializer.NullableSerializerWrapper; import com.ververica.cdc.runtime.serializer.NullableSerializerWrapper;
import com.ververica.cdc.runtime.serializer.data.binary.BinaryRecordDataSerializer;
import com.ververica.cdc.runtime.serializer.data.writer.BinaryRecordDataWriter;
import com.ververica.cdc.runtime.serializer.data.writer.BinaryWriter;
import com.ververica.cdc.runtime.serializer.schema.DataTypeSerializer; import com.ververica.cdc.runtime.serializer.schema.DataTypeSerializer;
import java.io.IOException; import java.io.IOException;
@ -47,6 +51,10 @@ public class RecordDataSerializer extends TypeSerializer<RecordData> {
private final DataType[] types; private final DataType[] types;
private final TypeSerializer[] fieldSerializers; private final TypeSerializer[] fieldSerializers;
private final RecordData.FieldGetter[] fieldGetters; private final RecordData.FieldGetter[] fieldGetters;
private final BinaryRecordDataSerializer binarySerializer = BinaryRecordDataSerializer.INSTANCE;
private transient BinaryRecordData reuseRow;
private transient BinaryRecordDataWriter reuseWriter;
private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer(); private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer();
@ -96,19 +104,30 @@ public class RecordDataSerializer extends TypeSerializer<RecordData> {
@Override @Override
public void serialize(RecordData recordData, DataOutputView target) throws IOException { public void serialize(RecordData recordData, DataOutputView target) throws IOException {
target.writeInt(types.length); if (recordData instanceof BinaryRecordData) {
for (int i = 0; i < types.length; i++) { target.writeBoolean(true);
fieldSerializers[i].serialize(fieldGetters[i].getFieldOrNull(recordData), target); binarySerializer.serialize((BinaryRecordData) recordData, target);
} else {
target.writeBoolean(false);
target.writeInt(types.length);
for (int i = 0; i < types.length; i++) {
fieldSerializers[i].serialize(fieldGetters[i].getFieldOrNull(recordData), target);
}
} }
} }
@Override @Override
public RecordData deserialize(DataInputView source) throws IOException { public RecordData deserialize(DataInputView source) throws IOException {
Object[] fields = new Object[source.readInt()]; boolean isBinary = source.readBoolean();
for (int i = 0; i < fields.length; i++) { if (isBinary) {
fields[i] = fieldSerializers[i].deserialize(source); return binarySerializer.deserialize(source);
} else {
Object[] fields = new Object[source.readInt()];
for (int i = 0; i < fields.length; i++) {
fields[i] = fieldSerializers[i].deserialize(source);
}
return GenericRecordData.of(fields);
} }
return GenericRecordData.of(fields);
} }
@Override @Override
@ -195,6 +214,32 @@ public class RecordDataSerializer extends TypeSerializer<RecordData> {
return new RecordDataSerializerSnapshot(types, fieldSerializers); return new RecordDataSerializerSnapshot(types, fieldSerializers);
} }
/** Convert {@link RecordData} into {@link BinaryRecordData}. */
public BinaryRecordData toBinaryRecordData(RecordData row) {
if (row instanceof BinaryRecordData) {
return (BinaryRecordData) row;
}
if (reuseRow == null) {
reuseRow = new BinaryRecordData(types.length);
reuseWriter = new BinaryRecordDataWriter(reuseRow);
}
reuseWriter.reset();
for (int i = 0; i < types.length; i++) {
if (row.isNullAt(i)) {
reuseWriter.setNullAt(i);
} else {
BinaryWriter.write(
reuseWriter,
i,
fieldGetters[i].getFieldOrNull(reuseRow),
types[i],
fieldSerializers[i]);
}
}
reuseWriter.complete();
return reuseRow;
}
/** {@link TypeSerializerSnapshot} for {@link RecordDataSerializer}. */ /** {@link TypeSerializerSnapshot} for {@link RecordDataSerializer}. */
public static final class RecordDataSerializerSnapshot public static final class RecordDataSerializerSnapshot
implements TypeSerializerSnapshot<RecordData> { implements TypeSerializerSnapshot<RecordData> {

@ -0,0 +1,160 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.serializer.data.binary;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentWritable;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.data.binary.BinarySegmentUtils;
import com.ververica.cdc.runtime.serializer.TypeSerializerSingleton;
import java.io.IOException;
import static com.ververica.cdc.common.utils.Preconditions.checkArgument;
/** Serializer for {@link BinaryRecordData}. */
@Internal
public class BinaryRecordDataSerializer extends TypeSerializerSingleton<BinaryRecordData> {
private static final long serialVersionUID = 1L;
public static final BinaryRecordDataSerializer INSTANCE = new BinaryRecordDataSerializer();
@Override
public boolean isImmutableType() {
return false;
}
@Override
public BinaryRecordData createInstance() {
return new BinaryRecordData(1);
}
@Override
public BinaryRecordData copy(BinaryRecordData from) {
return copy(from, new BinaryRecordData(from.getArity()));
}
@Override
public BinaryRecordData copy(BinaryRecordData from, BinaryRecordData reuse) {
return from.copy(reuse);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(BinaryRecordData record, DataOutputView target) throws IOException {
target.writeInt(record.getArity());
target.writeInt(record.getSizeInBytes());
if (target instanceof MemorySegmentWritable) {
serializeWithoutLength(record, (MemorySegmentWritable) target);
} else {
BinarySegmentUtils.copyToView(
record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
}
}
@Override
public BinaryRecordData deserialize(DataInputView source) throws IOException {
BinaryRecordData row = new BinaryRecordData(source.readInt());
int length = source.readInt();
byte[] bytes = new byte[length];
source.readFully(bytes);
row.pointTo(MemorySegmentFactory.wrap(bytes), 0, length);
return row;
}
@Override
public BinaryRecordData deserialize(BinaryRecordData reuse, DataInputView source)
throws IOException {
MemorySegment[] segments = reuse.getSegments();
checkArgument(
segments == null || (segments.length == 1 && reuse.getOffset() == 0),
"Reuse BinaryRecordData should have no segments or only one segment and offset start at 0.");
int arity = source.readInt();
int length = source.readInt();
if (segments == null || segments[0].size() < length) {
segments = new MemorySegment[] {MemorySegmentFactory.wrap(new byte[length])};
}
source.readFully(segments[0].getArray(), 0, length);
reuse.pointTo(segments, 0, length);
return reuse;
}
// ============================ Page related operations ===================================
private static void serializeWithoutLength(
BinaryRecordData record, MemorySegmentWritable writable) throws IOException {
if (record.getSegments().length == 1) {
writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes());
} else {
serializeWithoutLengthSlow(record, writable);
}
}
public static void serializeWithoutLengthSlow(
BinaryRecordData record, MemorySegmentWritable out) throws IOException {
int remainSize = record.getSizeInBytes();
int posInSegOfRecord = record.getOffset();
int segmentSize = record.getSegments()[0].size();
for (MemorySegment segOfRecord : record.getSegments()) {
int nWrite = Math.min(segmentSize - posInSegOfRecord, remainSize);
assert nWrite > 0;
out.write(segOfRecord, posInSegOfRecord, nWrite);
// next new segment.
posInSegOfRecord = 0;
remainSize -= nWrite;
if (remainSize == 0) {
break;
}
}
checkArgument(remainSize == 0);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int length = source.readInt();
target.writeInt(length);
target.write(source, length);
}
@Override
public TypeSerializerSnapshot<BinaryRecordData> snapshotConfiguration() {
return new BinaryRecordDataSerializerSnapshot();
}
/** {@link TypeSerializerSnapshot} for {@link BinaryRecordDataSerializer}. */
public static final class BinaryRecordDataSerializerSnapshot
extends SimpleTypeSerializerSnapshot<BinaryRecordData> {
public BinaryRecordDataSerializerSnapshot() {
super(() -> INSTANCE);
}
}
}

@ -0,0 +1,392 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.serializer.data.writer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.ArrayData;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.LocalZonedTimestampData;
import com.ververica.cdc.common.data.MapData;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.StringData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.ZonedTimestampData;
import com.ververica.cdc.common.data.binary.BinaryFormat;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
import com.ververica.cdc.common.data.binary.BinarySegmentUtils;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.runtime.serializer.data.ArrayDataSerializer;
import com.ververica.cdc.runtime.serializer.data.RecordDataSerializer;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static com.ververica.cdc.common.data.binary.BinaryRecordData.TIMESTAMP_DELIMITER;
/**
* Use the special format to write data to a {@link MemorySegment} (its capacity grows
* automatically).
*
* <p>If write a format binary: 1. New a writer. 2. Write each field by writeXX or setNullAt.
* (Variable length fields can not be written repeatedly.) 3. Invoke {@link #complete()}.
*
* <p>If want to reuse this writer, please invoke {@link #reset()} first.
*/
@Internal
abstract class AbstractBinaryWriter implements BinaryWriter {
protected MemorySegment segment;
protected int cursor;
protected DataOutputViewStreamWrapper outputView;
/** Set offset and size to fix len part. */
protected abstract void setOffsetAndSize(int pos, int offset, long size);
/** Get field offset. */
protected abstract int getFieldOffset(int pos);
/** After grow, need point to new memory. */
protected abstract void afterGrow();
protected abstract void setNullBit(int ordinal);
/** See {@link BinarySegmentUtils#readStringData(MemorySegment[], int, int, long)}. */
@Override
public void writeString(int pos, StringData input) {
BinaryStringData string = (BinaryStringData) input;
if (string.getSegments() == null) {
String javaObject = string.toString();
writeBytes(pos, javaObject.getBytes(StandardCharsets.UTF_8));
} else {
int len = string.getSizeInBytes();
if (len <= 7) {
byte[] bytes = BinarySegmentUtils.allocateReuseBytes(len);
BinarySegmentUtils.copyToBytes(
string.getSegments(), string.getOffset(), bytes, 0, len);
writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
} else {
writeSegmentsToVarLenPart(pos, string.getSegments(), string.getOffset(), len);
}
}
}
private void writeBytes(int pos, byte[] bytes) {
int len = bytes.length;
if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) {
writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
} else {
writeBytesToVarLenPart(pos, bytes, len);
}
}
@Override
public void writeArray(int pos, ArrayData input, ArrayDataSerializer serializer) {
throw new UnsupportedOperationException("Not support array data.");
}
@Override
public void writeMap(int pos, MapData input, TypeSerializer<MapData> serializer) {
throw new UnsupportedOperationException("Not support map data.");
}
private DataOutputViewStreamWrapper getOutputView() {
if (outputView == null) {
outputView = new DataOutputViewStreamWrapper(new BinaryRowWriterOutputView());
}
return outputView;
}
@Override
public void writeRecord(int pos, RecordData input, RecordDataSerializer serializer) {
if (input instanceof BinaryFormat) {
BinaryFormat row = (BinaryFormat) input;
writeSegmentsToVarLenPart(
pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
} else {
BinaryRecordData row = serializer.toBinaryRecordData(input);
writeSegmentsToVarLenPart(
pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
}
}
@Override
public void writeBinary(int pos, byte[] bytes) {
int len = bytes.length;
if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) {
writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
} else {
writeBytesToVarLenPart(pos, bytes, len);
}
}
@Override
public void writeDecimal(int pos, DecimalData value, int precision) {
assert value == null || (value.precision() == precision);
if (DecimalData.isCompact(precision)) {
assert value != null;
writeLong(pos, value.toUnscaledLong());
} else {
// grow the global buffer before writing data.
ensureCapacity(16);
// zero-out the bytes
segment.putLong(cursor, 0L);
segment.putLong(cursor + 8, 0L);
// Make sure Decimal object has the same scale as DecimalType.
// Note that we may pass in null Decimal object to set null for it.
if (value == null) {
setNullBit(pos);
// keep the offset for future update
setOffsetAndSize(pos, cursor, 0);
} else {
final byte[] bytes = value.toUnscaledBytes();
assert bytes.length <= 16;
// Write the bytes to the variable length portion.
segment.put(cursor, bytes, 0, bytes.length);
setOffsetAndSize(pos, cursor, bytes.length);
}
// move the cursor forward.
cursor += 16;
}
}
@Override
public void writeTimestamp(int pos, TimestampData value, int precision) {
if (TimestampData.isCompact(precision)) {
writeLong(pos, value.getMillisecond());
} else {
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
ensureCapacity(8);
if (value == null) {
setNullBit(pos);
// zero-out the bytes
segment.putLong(cursor, 0L);
setOffsetAndSize(pos, cursor, 0);
} else {
segment.putLong(cursor, value.getMillisecond());
setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());
}
cursor += 8;
}
}
@Override
public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision) {
if (LocalZonedTimestampData.isCompact(precision)) {
writeLong(pos, value.getEpochMillisecond());
} else {
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
ensureCapacity(8);
if (value == null) {
setNullBit(pos);
// zero-out the bytes
segment.putLong(cursor, 0L);
setOffsetAndSize(pos, cursor, 0);
} else {
segment.putLong(cursor, value.getEpochMillisecond());
setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond());
}
cursor += 8;
}
}
@Override
public void writeZonedTimestamp(int pos, ZonedTimestampData value, int precision) {
String timestampString =
String.join(
TIMESTAMP_DELIMITER,
Arrays.asList(
String.valueOf(value.getMillisecond()),
String.valueOf(value.getNanoOfMillisecond()),
value.getZoneId()));
writeString(pos, new BinaryStringData(timestampString));
}
private void zeroBytes(int offset, int size) {
for (int i = offset; i < offset + size; i++) {
segment.put(i, (byte) 0);
}
}
protected void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);
}
}
protected void ensureCapacity(int neededSize) {
final int length = cursor + neededSize;
if (segment.size() < length) {
grow(length);
}
}
private void writeSegmentsToVarLenPart(
int pos, MemorySegment[] segments, int offset, int size) {
final int roundedSize = roundNumberOfBytesToNearestWord(size);
// grow the global buffer before writing data.
ensureCapacity(roundedSize);
zeroOutPaddingBytes(size);
if (segments.length == 1) {
segments[0].copyTo(offset, segment, cursor, size);
} else {
writeMultiSegmentsToVarLenPart(segments, offset, size);
}
setOffsetAndSize(pos, cursor, size);
// move the cursor forward.
cursor += roundedSize;
}
private void writeMultiSegmentsToVarLenPart(MemorySegment[] segments, int offset, int size) {
// Write the bytes to the variable length portion.
int needCopy = size;
int fromOffset = offset;
int toOffset = cursor;
for (MemorySegment sourceSegment : segments) {
int remain = sourceSegment.size() - fromOffset;
if (remain > 0) {
int copySize = remain > needCopy ? needCopy : remain;
sourceSegment.copyTo(fromOffset, segment, toOffset, copySize);
needCopy -= copySize;
toOffset += copySize;
fromOffset = 0;
} else {
fromOffset -= sourceSegment.size();
}
}
}
private void writeBytesToVarLenPart(int pos, byte[] bytes, int len) {
final int roundedSize = roundNumberOfBytesToNearestWord(len);
// grow the global buffer before writing data.
ensureCapacity(roundedSize);
zeroOutPaddingBytes(len);
// Write the bytes to the variable length portion.
segment.put(cursor, bytes, 0, len);
setOffsetAndSize(pos, cursor, len);
// move the cursor forward.
cursor += roundedSize;
}
/** Increases the capacity to ensure that it can hold at least the minimum capacity argument. */
private void grow(int minCapacity) {
int oldCapacity = segment.size();
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0) {
newCapacity = minCapacity;
}
segment = MemorySegmentFactory.wrap(Arrays.copyOf(segment.getArray(), newCapacity));
afterGrow();
}
protected static int roundNumberOfBytesToNearestWord(int numBytes) {
int remainder = numBytes & 0x07;
if (remainder == 0) {
return numBytes;
} else {
return numBytes + (8 - remainder);
}
}
private static void writeBytesToFixLenPart(
MemorySegment segment, int fieldOffset, byte[] bytes, int len) {
long firstByte = len | 0x80; // first bit is 1, other bits is len
long sevenBytes = 0L; // real data
if (BinaryRecordData.LITTLE_ENDIAN) {
for (int i = 0; i < len; i++) {
sevenBytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L));
}
} else {
for (int i = 0; i < len; i++) {
sevenBytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 8L));
}
}
final long offsetAndSize = (firstByte << 56) | sevenBytes;
segment.putLong(fieldOffset, offsetAndSize);
}
@Internal
public MemorySegment getSegments() {
return segment;
}
/** OutputView for write Generic. */
private class BinaryRowWriterOutputView extends OutputStream {
/**
* Writes the specified byte to this output stream. The general contract for <code>write
* </code> is that one byte is written to the output stream. The byte to be written is the
* eight low-order bits of the argument <code>b</code>. The 24 high-order bits of <code>b
* </code> are ignored.
*/
@Override
public void write(int b) throws IOException {
ensureCapacity(1);
segment.put(cursor, (byte) b);
cursor += 1;
}
@Override
public void write(byte[] b) throws IOException {
ensureCapacity(b.length);
segment.put(cursor, b, 0, b.length);
cursor += b.length;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
ensureCapacity(len);
segment.put(cursor, b, off, len);
cursor += len;
}
public void write(MemorySegment seg, int off, int len) throws IOException {
ensureCapacity(len);
seg.copyTo(off, segment, cursor, len);
cursor += len;
}
}
}

@ -0,0 +1,123 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.serializer.data.writer;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.data.binary.BinarySegmentUtils;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.binary.BinaryRecordData;
/** Writer for {@link BinaryRecordData}. */
@Internal
public final class BinaryRecordDataWriter extends AbstractBinaryWriter {
private final int nullBitsSizeInBytes;
private final BinaryRecordData row;
private final int fixedSize;
public BinaryRecordDataWriter(BinaryRecordData row) {
this(row, 0);
}
public BinaryRecordDataWriter(BinaryRecordData row, int initialSize) {
this.nullBitsSizeInBytes = BinaryRecordData.calculateBitSetWidthInBytes(row.getArity());
this.fixedSize = row.getFixedLengthPartSize();
this.cursor = fixedSize;
this.segment = MemorySegmentFactory.wrap(new byte[fixedSize + initialSize]);
this.row = row;
this.row.pointTo(segment, 0, segment.size());
}
/** First, reset. */
@Override
public void reset() {
this.cursor = fixedSize;
for (int i = 0; i < nullBitsSizeInBytes; i += 8) {
segment.putLong(i, 0L);
}
}
/** Default not null. */
@Override
public void setNullAt(int pos) {
setNullBit(pos);
segment.putLong(getFieldOffset(pos), 0L);
}
@Override
public void setNullBit(int pos) {
BinarySegmentUtils.bitSet(segment, 0, pos + BinaryRecordData.HEADER_SIZE_IN_BITS);
}
@Override
public void writeBoolean(int pos, boolean value) {
segment.putBoolean(getFieldOffset(pos), value);
}
@Override
public void writeByte(int pos, byte value) {
segment.put(getFieldOffset(pos), value);
}
@Override
public void writeShort(int pos, short value) {
segment.putShort(getFieldOffset(pos), value);
}
@Override
public void writeInt(int pos, int value) {
segment.putInt(getFieldOffset(pos), value);
}
@Override
public void writeLong(int pos, long value) {
segment.putLong(getFieldOffset(pos), value);
}
@Override
public void writeFloat(int pos, float value) {
segment.putFloat(getFieldOffset(pos), value);
}
@Override
public void writeDouble(int pos, double value) {
segment.putDouble(getFieldOffset(pos), value);
}
@Override
public void complete() {
row.setTotalSize(cursor);
}
@Override
public int getFieldOffset(int pos) {
return nullBitsSizeInBytes + 8 * pos;
}
@Override
public void setOffsetAndSize(int pos, int offset, long size) {
final long offsetAndSize = ((long) offset << 32) | size;
segment.putLong(getFieldOffset(pos), offsetAndSize);
}
@Override
public void afterGrow() {
row.pointTo(segment, 0, segment.size());
}
}

@ -0,0 +1,151 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.serializer.data.writer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.ArrayData;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.LocalZonedTimestampData;
import com.ververica.cdc.common.data.MapData;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.StringData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.ZonedTimestampData;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DecimalType;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.types.ZonedTimestampType;
import com.ververica.cdc.runtime.serializer.data.ArrayDataSerializer;
import com.ververica.cdc.runtime.serializer.data.RecordDataSerializer;
/**
* Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write
* each field by writeXX or setNullAt. (Same field can not be written repeatedly.) 3. Invoke {@link
* #complete()}.
*/
@Internal
public interface BinaryWriter {
/** Reset writer to prepare next write. */
void reset();
/** Set null to this field. */
void setNullAt(int pos);
void writeBoolean(int pos, boolean value);
void writeByte(int pos, byte value);
void writeShort(int pos, short value);
void writeInt(int pos, int value);
void writeLong(int pos, long value);
void writeFloat(int pos, float value);
void writeDouble(int pos, double value);
void writeString(int pos, StringData value);
void writeBinary(int pos, byte[] bytes);
void writeDecimal(int pos, DecimalData value, int precision);
void writeTimestamp(int pos, TimestampData value, int precision);
void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision);
void writeZonedTimestamp(int pos, ZonedTimestampData value, int precision);
void writeArray(int pos, ArrayData value, ArrayDataSerializer serializer);
void writeMap(int pos, MapData value, TypeSerializer<MapData> serializer);
void writeRecord(int pos, RecordData value, RecordDataSerializer serializer);
/** Finally, complete write to set real size to binary. */
void complete();
static void write(
BinaryWriter writer, int pos, Object o, DataType type, TypeSerializer<?> serializer) {
switch (type.getTypeRoot()) {
case BOOLEAN:
writer.writeBoolean(pos, (boolean) o);
break;
case TINYINT:
writer.writeByte(pos, (byte) o);
break;
case SMALLINT:
writer.writeShort(pos, (short) o);
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
writer.writeInt(pos, (int) o);
break;
case BIGINT:
writer.writeLong(pos, (long) o);
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
writer.writeTimestamp(pos, (TimestampData) o, timestampType.getPrecision());
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) type;
writer.writeLocalZonedTimestamp(
pos, (LocalZonedTimestampData) o, lzTs.getPrecision());
break;
case TIMESTAMP_WITH_TIME_ZONE:
ZonedTimestampType zTs = (ZonedTimestampType) type;
writer.writeZonedTimestamp(pos, (ZonedTimestampData) o, zTs.getPrecision());
break;
case FLOAT:
writer.writeFloat(pos, (float) o);
break;
case DOUBLE:
writer.writeDouble(pos, (double) o);
break;
case CHAR:
case VARCHAR:
writer.writeString(pos, (StringData) o);
break;
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
writer.writeDecimal(pos, (DecimalData) o, decimalType.getPrecision());
break;
case ARRAY:
writer.writeArray(pos, (ArrayData) o, (ArrayDataSerializer) serializer);
break;
case MAP:
writer.writeMap(pos, (MapData) o, (TypeSerializer<MapData>) serializer);
break;
case ROW:
writer.writeRecord(pos, (RecordData) o, (RecordDataSerializer) serializer);
break;
case BINARY:
case VARBINARY:
writer.writeBinary(pos, (byte[]) o);
break;
default:
throw new UnsupportedOperationException("Not support type: " + type);
}
}
}

@ -0,0 +1,102 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.runtime.operators.schema;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import com.ververica.cdc.common.data.GenericRecordData;
import com.ververica.cdc.common.data.GenericStringData;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.RowType;
import com.ververica.cdc.runtime.serializer.event.EventSerializer;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** A test for the {@link SchemaOperator}. */
public class SchemaOperatorTest {
@Test
void testProcessElement() throws Exception {
final int maxParallelism = 4;
final int parallelism = 2;
final OperatorID opID = new OperatorID();
final TableId tableId = TableId.tableId("testProcessElement");
final RowType rowType = DataTypes.ROW(DataTypes.BIGINT(), DataTypes.STRING());
List<OneInputStreamOperatorTestHarness<Event, Event>> testHarnesses = new ArrayList<>();
for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
OneInputStreamOperatorTestHarness<Event, Event> testHarness =
createTestHarness(maxParallelism, parallelism, subtaskIndex, opID);
testHarnesses.add(testHarness);
testHarness.setup(EventSerializer.INSTANCE);
testHarness.open();
Map<String, String> meta = new HashMap<>();
meta.put("subtask", String.valueOf(subtaskIndex));
List<Event> testData =
Arrays.asList(
DataChangeEvent.updateEvent(
tableId,
rowType,
GenericRecordData.of(1L, GenericStringData.fromString("1")),
GenericRecordData.of(2L, GenericStringData.fromString("2")),
meta),
DataChangeEvent.updateEvent(
tableId,
rowType,
GenericRecordData.of(3L, GenericStringData.fromString("3")),
GenericRecordData.of(4L, GenericStringData.fromString("4")),
meta));
for (Event event : testData) {
testHarness.processElement(event, 0);
}
Collection<StreamRecord<Event>> result = testHarness.getRecordOutput();
assertThat(result.stream().map(StreamRecord::getValue).collect(Collectors.toList()))
.isEqualTo(testData);
}
for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) {
testHarnesses.get(subtaskIndex).close();
}
}
private OneInputStreamOperatorTestHarness<Event, Event> createTestHarness(
int maxParallelism, int parallelism, int subtaskIndex, OperatorID opID)
throws Exception {
return new OneInputStreamOperatorTestHarness<>(
new SchemaOperator(),
maxParallelism,
parallelism,
subtaskIndex,
EventSerializer.INSTANCE,
opID);
}
}
Loading…
Cancel
Save