[3.0][cdc-common] Introduce internal data structures and generic implementations (#2688)

This closes #2651.
pull/2695/head
Leonard Xu 1 year ago committed by GitHub
parent 98c929e6e4
commit 4930698a9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,242 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.ArrayType;
import com.ververica.cdc.common.types.DataType;
import javax.annotation.Nullable;
import java.io.Serializable;
import static com.ververica.cdc.common.types.DataTypeChecks.getFieldCount;
import static com.ververica.cdc.common.types.DataTypeChecks.getPrecision;
import static com.ververica.cdc.common.types.DataTypeChecks.getScale;
/**
* Base interface of an internal data structure representing data of {@link ArrayType}.
*
* <p>Note: All elements of this data structure must be internal data structures and must be of the
* same type. See {@link RecordData} for more information about internal data structures.
*
* <p>Use {@link GenericArrayData} to construct instances of this interface from regular Java
* arrays.
*/
@PublicEvolving
public interface ArrayData {
/** Returns the number of elements in this array. */
int size();
// ------------------------------------------------------------------------------------------
// Read-only accessor methods
// ------------------------------------------------------------------------------------------
/** Returns true if the element is null at the given position. */
boolean isNullAt(int pos);
/** Returns the boolean value at the given position. */
boolean getBoolean(int pos);
/** Returns the byte value at the given position. */
byte getByte(int pos);
/** Returns the short value at the given position. */
short getShort(int pos);
/** Returns the integer value at the given position. */
int getInt(int pos);
/** Returns the long value at the given position. */
long getLong(int pos);
/** Returns the float value at the given position. */
float getFloat(int pos);
/** Returns the double value at the given position. */
double getDouble(int pos);
/** Returns the string value at the given position. */
StringData getString(int pos);
/**
* Returns the decimal value at the given position.
*
* <p>The precision and scale are required to determine whether the decimal value was stored in
* a compact representation (see {@link DecimalData}).
*/
DecimalData getDecimal(int pos, int precision, int scale);
/**
* Returns the timestamp value at the given position.
*
* <p>The precision is required to determine whether the timestamp value was stored in a compact
* representation (see {@link TimestampData}).
*/
TimestampData getTimestamp(int pos, int precision);
/**
* Returns the local zoned timestamp value at the given position.
*
* <p>The precision is required to determine whether the timestamp value was stored in a compact
* representation (see {@link LocalZonedTimestampData}).
*/
LocalZonedTimestampData getLocalZonedTimestamp(int pos, int precision);
/**
* Returns the zoned timestamp value at the given position.
*
* <p>The precision is required to determine whether the timestamp value was stored in a compact
* representation (see {@link ZonedTimestampData}).
*/
ZonedTimestampData getZonedTimestamp(int pos, int precision);
/** Returns the binary value at the given position. */
byte[] getBinary(int pos);
/** Returns the array value at the given position. */
ArrayData getArray(int pos);
/** Returns the map value at the given position. */
MapData getMap(int pos);
/**
* Returns the record value at the given position.
*
* <p>The number of fields is required to correctly extract the row.
*/
RecordData getRecord(int pos, int numFields);
// ------------------------------------------------------------------------------------------
// Conversion Utilities
// ------------------------------------------------------------------------------------------
boolean[] toBooleanArray();
byte[] toByteArray();
short[] toShortArray();
int[] toIntArray();
long[] toLongArray();
float[] toFloatArray();
double[] toDoubleArray();
// ------------------------------------------------------------------------------------------
// Access Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates an accessor for getting elements in an internal array data structure at the given
* position.
*
* @param elementType the element type of the array
*/
static ElementGetter createElementGetter(DataType elementType) {
final ElementGetter elementGetter;
// ordered by type root definition
switch (elementType.getTypeRoot()) {
case CHAR:
case VARCHAR:
elementGetter = ArrayData::getString;
break;
case BOOLEAN:
elementGetter = ArrayData::getBoolean;
break;
case BINARY:
case VARBINARY:
elementGetter = ArrayData::getBinary;
break;
case DECIMAL:
final int decimalPrecision = getPrecision(elementType);
final int decimalScale = getScale(elementType);
elementGetter =
(array, pos) -> array.getDecimal(pos, decimalPrecision, decimalScale);
break;
case TINYINT:
elementGetter = ArrayData::getByte;
break;
case SMALLINT:
elementGetter = ArrayData::getShort;
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
elementGetter = ArrayData::getInt;
break;
case BIGINT:
elementGetter = ArrayData::getLong;
break;
case FLOAT:
elementGetter = ArrayData::getFloat;
break;
case DOUBLE:
elementGetter = ArrayData::getDouble;
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = getPrecision(elementType);
elementGetter = (array, pos) -> array.getTimestamp(pos, timestampPrecision);
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampLtzPrecision = getPrecision(elementType);
elementGetter =
(array, pos) -> array.getLocalZonedTimestamp(pos, timestampLtzPrecision);
break;
case TIMESTAMP_WITH_TIME_ZONE:
final int timestampTzPrecision = getPrecision(elementType);
elementGetter = (array, pos) -> array.getZonedTimestamp(pos, timestampTzPrecision);
break;
case ARRAY:
elementGetter = ArrayData::getArray;
break;
case MAP:
elementGetter = ArrayData::getMap;
break;
case ROW:
final int rowFieldCount = getFieldCount(elementType);
elementGetter = (array, pos) -> array.getRecord(pos, rowFieldCount);
break;
default:
throw new IllegalArgumentException();
}
if (!elementType.isNullable()) {
return elementGetter;
}
return (array, pos) -> {
if (array.isNullAt(pos)) {
return null;
}
return elementGetter.getElementOrNull(array, pos);
};
}
/**
* Accessor for getting the elements of an array during runtime.
*
* @see #createElementGetter(DataType)
*/
@PublicEvolving
interface ElementGetter extends Serializable {
@Nullable
Object getElementOrNull(ArrayData array, int pos);
}
}

@ -0,0 +1,233 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.DecimalType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import static com.ververica.cdc.common.utils.Preconditions.checkArgument;
/**
* An internal data structure representing data of {@link DecimalType}.
*
* <p>This data structure is immutable and might store decimal values in a compact representation
* (as a long value) if values are small enough.
*/
@PublicEvolving
public final class DecimalData implements Comparable<DecimalData> {
static final int MAX_COMPACT_PRECISION = 18;
/** Maximum number of decimal digits an Int can represent. (1e9 < Int.MaxValue < 1e10) */
static final int MAX_INT_DIGITS = 9;
/** Maximum number of decimal digits a Long can represent. (1e18 < Long.MaxValue < 1e19) */
static final int MAX_LONG_DIGITS = 18;
static final long[] POW10 = new long[MAX_COMPACT_PRECISION + 1];
static {
POW10[0] = 1;
for (int i = 1; i < POW10.length; i++) {
POW10[i] = 10 * POW10[i - 1];
}
}
// The semantics of the fields are as follows:
// - `precision` and `scale` represent the precision and scale of SQL decimal type
// - If `decimalVal` is set, it represents the whole decimal value
// - Otherwise, the decimal value is longVal/(10^scale).
//
// Note that the (precision, scale) must be correct.
// if precision > MAX_COMPACT_PRECISION,
// `decimalVal` represents the value. `longVal` is undefined
// otherwise, (longVal, scale) represents the value
// `decimalVal` may be set and cached
final int precision;
final int scale;
final long longVal;
BigDecimal decimalVal;
// this constructor does not perform any sanity check.
DecimalData(int precision, int scale, long longVal, BigDecimal decimalVal) {
this.precision = precision;
this.scale = scale;
this.longVal = longVal;
this.decimalVal = decimalVal;
}
// ------------------------------------------------------------------------------------------
// Public Interfaces
// ------------------------------------------------------------------------------------------
/**
* Returns the <i>precision</i> of this {@link DecimalData}.
*
* <p>The precision is the number of digits in the unscaled value.
*/
public int precision() {
return precision;
}
/** Returns the <i>scale</i> of this {@link DecimalData}. */
public int scale() {
return scale;
}
/** Converts this {@link DecimalData} into an instance of {@link BigDecimal}. */
public BigDecimal toBigDecimal() {
BigDecimal bd = decimalVal;
if (bd == null) {
decimalVal = bd = BigDecimal.valueOf(longVal, scale);
}
return bd;
}
/**
* Returns a long describing the <i>unscaled value</i> of this {@link DecimalData}.
*
* @throws ArithmeticException if this {@link DecimalData} does not exactly fit in a long.
*/
public long toUnscaledLong() {
if (isCompact()) {
return longVal;
} else {
return toBigDecimal().unscaledValue().longValueExact();
}
}
/**
* Returns a byte array describing the <i>unscaled value</i> of this {@link DecimalData}.
*
* @return the unscaled byte array of this {@link DecimalData}.
*/
public byte[] toUnscaledBytes() {
return toBigDecimal().unscaledValue().toByteArray();
}
/** Returns whether the decimal value is small enough to be stored in a long. */
public boolean isCompact() {
return precision <= MAX_COMPACT_PRECISION;
}
/** Returns a copy of this {@link DecimalData} object. */
public DecimalData copy() {
return new DecimalData(precision, scale, longVal, decimalVal);
}
@Override
public int hashCode() {
return toBigDecimal().hashCode();
}
@Override
public int compareTo(@Nonnull DecimalData that) {
if (this.isCompact() && that.isCompact() && this.scale == that.scale) {
return Long.compare(this.longVal, that.longVal);
}
return this.toBigDecimal().compareTo(that.toBigDecimal());
}
@Override
public boolean equals(final Object o) {
if (!(o instanceof DecimalData)) {
return false;
}
DecimalData that = (DecimalData) o;
return this.compareTo(that) == 0;
}
@Override
public String toString() {
return toBigDecimal().toPlainString();
}
// ------------------------------------------------------------------------------------------
// Constructor Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates an instance of {@link DecimalData} from a {@link BigDecimal} and the given precision
* and scale.
*
* <p>The returned decimal value may be rounded to have the desired scale. The precision will be
* checked. If the precision overflows, null will be returned.
*/
public static @Nullable DecimalData fromBigDecimal(BigDecimal bd, int precision, int scale) {
bd = bd.setScale(scale, RoundingMode.HALF_UP);
if (bd.precision() > precision) {
return null;
}
long longVal = -1;
if (precision <= MAX_COMPACT_PRECISION) {
longVal = bd.movePointRight(scale).longValueExact();
}
return new DecimalData(precision, scale, longVal, bd);
}
/**
* Creates an instance of {@link DecimalData} from an unscaled long value and the given
* precision and scale.
*/
public static DecimalData fromUnscaledLong(long unscaledLong, int precision, int scale) {
checkArgument(precision > 0 && precision <= MAX_LONG_DIGITS);
return new DecimalData(precision, scale, unscaledLong, null);
}
/**
* Creates an instance of {@link DecimalData} from an unscaled byte array value and the given
* precision and scale.
*/
public static DecimalData fromUnscaledBytes(byte[] unscaledBytes, int precision, int scale) {
BigDecimal bd = new BigDecimal(new BigInteger(unscaledBytes), scale);
return fromBigDecimal(bd, precision, scale);
}
/**
* Creates an instance of {@link DecimalData} for a zero value with the given precision and
* scale.
*
* <p>The precision will be checked. If the precision overflows, null will be returned.
*/
public static @Nullable DecimalData zero(int precision, int scale) {
if (precision <= MAX_COMPACT_PRECISION) {
return new DecimalData(precision, scale, 0, null);
} else {
return fromBigDecimal(BigDecimal.ZERO, precision, scale);
}
}
// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
/** Returns whether the decimal value is small enough to be stored in a long. */
public static boolean isCompact(int precision) {
return precision <= MAX_COMPACT_PRECISION;
}
}

@ -0,0 +1,343 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.ArrayType;
import org.apache.commons.lang3.ArrayUtils;
import java.util.Arrays;
import java.util.Objects;
/**
* An internal data structure representing data of {@link ArrayType}.
*
* <p>Note: All elements of this data structure must be internal data structures and must be of the
* same type. See {@link RecordData} for more information about internal data structures.
*
* <p>{@link GenericArrayData} is a generic implementation of {@link ArrayData} which wraps regular
* Java arrays.
*
* <p>Every instance wraps a one-dimensional Java array. Non-primitive arrays can be used for
* representing element nullability. The Java array might be a primitive array such as {@code int[]}
* or an object array (i.e. instance of {@code Object[]}). Object arrays that contain boxed types
* (e.g. {@link Integer}) MUST be boxed arrays (i.e. {@code new Integer[]{1, 2, 3}}, not {@code new
* Object[]{1, 2, 3}}). For multidimensional arrays, an array of {@link GenericArrayData} MUST be
* passed. For example:
*
* <pre>{@code
* // ARRAY < ARRAY < INT NOT NULL > >
* new GenericArrayData(
* new GenericArrayData[]{
* new GenericArrayData(new int[3]),
* new GenericArrayData(new int[5])
* }
* )
* }</pre>
*/
@PublicEvolving
public final class GenericArrayData implements ArrayData {
private final Object array;
private final int size;
private final boolean isPrimitiveArray;
/**
* Creates an instance of {@link GenericArrayData} using the given Java array.
*
* <p>Note: All elements of the array must be internal data structures.
*/
public GenericArrayData(Object[] array) {
this(array, array.length, false);
}
public GenericArrayData(int[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
public GenericArrayData(long[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
public GenericArrayData(float[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
public GenericArrayData(double[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
public GenericArrayData(short[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
public GenericArrayData(byte[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
public GenericArrayData(boolean[] primitiveArray) {
this(primitiveArray, primitiveArray.length, true);
}
private GenericArrayData(Object array, int size, boolean isPrimitiveArray) {
this.array = array;
this.size = size;
this.isPrimitiveArray = isPrimitiveArray;
}
/**
* Returns true if this is a primitive array.
*
* <p>A primitive array is an array whose elements are of primitive type.
*/
public boolean isPrimitiveArray() {
return isPrimitiveArray;
}
/**
* Converts this {@link GenericArrayData} into an array of Java {@link Object}.
*
* <p>The method will convert a primitive array into an object array. But it will not convert
* internal data structures into external data structures (e.g. {@link StringData} to {@link
* String}).
*/
public Object[] toObjectArray() {
if (isPrimitiveArray) {
Class<?> arrayClass = array.getClass();
if (int[].class.equals(arrayClass)) {
return ArrayUtils.toObject((int[]) array);
} else if (long[].class.equals(arrayClass)) {
return ArrayUtils.toObject((long[]) array);
} else if (float[].class.equals(arrayClass)) {
return ArrayUtils.toObject((float[]) array);
} else if (double[].class.equals(arrayClass)) {
return ArrayUtils.toObject((double[]) array);
} else if (short[].class.equals(arrayClass)) {
return ArrayUtils.toObject((short[]) array);
} else if (byte[].class.equals(arrayClass)) {
return ArrayUtils.toObject((byte[]) array);
} else if (boolean[].class.equals(arrayClass)) {
return ArrayUtils.toObject((boolean[]) array);
}
throw new RuntimeException("Unsupported primitive array: " + arrayClass);
} else {
return (Object[]) array;
}
}
@Override
public int size() {
return size;
}
@Override
public boolean isNullAt(int pos) {
return !isPrimitiveArray && ((Object[]) array)[pos] == null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GenericArrayData that = (GenericArrayData) o;
return size == that.size
&& isPrimitiveArray == that.isPrimitiveArray
&& Objects.deepEquals(array, that.array);
}
@Override
public int hashCode() {
int result = Objects.hash(size, isPrimitiveArray);
result = 31 * result + Arrays.deepHashCode(new Object[] {array});
return result;
}
// ------------------------------------------------------------------------------------------
// Read-only accessor methods
// ------------------------------------------------------------------------------------------
@Override
public boolean getBoolean(int pos) {
return isPrimitiveArray ? ((boolean[]) array)[pos] : (boolean) getObject(pos);
}
@Override
public byte getByte(int pos) {
return isPrimitiveArray ? ((byte[]) array)[pos] : (byte) getObject(pos);
}
@Override
public short getShort(int pos) {
return isPrimitiveArray ? ((short[]) array)[pos] : (short) getObject(pos);
}
@Override
public int getInt(int pos) {
return isPrimitiveArray ? ((int[]) array)[pos] : (int) getObject(pos);
}
@Override
public long getLong(int pos) {
return isPrimitiveArray ? ((long[]) array)[pos] : (long) getObject(pos);
}
@Override
public float getFloat(int pos) {
return isPrimitiveArray ? ((float[]) array)[pos] : (float) getObject(pos);
}
@Override
public double getDouble(int pos) {
return isPrimitiveArray ? ((double[]) array)[pos] : (double) getObject(pos);
}
@Override
public byte[] getBinary(int pos) {
return (byte[]) getObject(pos);
}
@Override
public StringData getString(int pos) {
return (StringData) getObject(pos);
}
@Override
public DecimalData getDecimal(int pos, int precision, int scale) {
return (DecimalData) getObject(pos);
}
@Override
public TimestampData getTimestamp(int pos, int precision) {
return (TimestampData) getObject(pos);
}
@Override
public LocalZonedTimestampData getLocalZonedTimestamp(int pos, int precision) {
return (LocalZonedTimestampData) getObject(pos);
}
@Override
public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
return (ZonedTimestampData) getObject(pos);
}
@Override
public RecordData getRecord(int pos, int numFields) {
return (RecordData) getObject(pos);
}
@Override
public ArrayData getArray(int pos) {
return (ArrayData) getObject(pos);
}
@Override
public MapData getMap(int pos) {
return (MapData) getObject(pos);
}
private Object getObject(int pos) {
return ((Object[]) array)[pos];
}
// ------------------------------------------------------------------------------------------
// Conversion Utilities
// ------------------------------------------------------------------------------------------
private boolean anyNull() {
for (Object element : (Object[]) array) {
if (element == null) {
return true;
}
}
return false;
}
private void checkNoNull() {
if (anyNull()) {
throw new RuntimeException("Primitive array must not contain a null value.");
}
}
@Override
public boolean[] toBooleanArray() {
if (isPrimitiveArray) {
return (boolean[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Boolean[]) array);
}
@Override
public byte[] toByteArray() {
if (isPrimitiveArray) {
return (byte[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Byte[]) array);
}
@Override
public short[] toShortArray() {
if (isPrimitiveArray) {
return (short[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Short[]) array);
}
@Override
public int[] toIntArray() {
if (isPrimitiveArray) {
return (int[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Integer[]) array);
}
@Override
public long[] toLongArray() {
if (isPrimitiveArray) {
return (long[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Long[]) array);
}
@Override
public float[] toFloatArray() {
if (isPrimitiveArray) {
return (float[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Float[]) array);
}
@Override
public double[] toDoubleArray() {
if (isPrimitiveArray) {
return (double[]) array;
}
checkNoNull();
return ArrayUtils.toPrimitive((Double[]) array);
}
}

@ -0,0 +1,129 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.MapType;
import java.util.Map;
import java.util.Objects;
/**
* An internal data structure representing data of {@link MapType}.
*
* <p>{@link GenericMapData} is a generic implementation of {@link MapData} which wraps regular Java
* maps.
*
* <p>Note: All keys and values of this data structure must be internal data structures. All keys
* must be of the same type; same for values. See {@link RecordData} for more information about must
* be of the same type; same for values. See {@link RecordData} for more information about internal
* data structures.
*
* <p>Both keys and values can contain null for representing nullability.
*/
@PublicEvolving
public final class GenericMapData implements MapData {
private final Map<?, ?> map;
/**
* Creates an instance of {@link GenericMapData} using the given Java map.
*
* <p>Note: All keys and values of the map must be internal data structures.
*/
public GenericMapData(Map<?, ?> map) {
this.map = map;
}
/**
* Returns the value to which the specified key is mapped, or {@code null} if this map contains
* no mapping for the key. The returned value is in internal data structure.
*/
public Object get(Object key) {
return map.get(key);
}
@Override
public int size() {
return map.size();
}
@Override
public ArrayData keyArray() {
Object[] keys = map.keySet().toArray();
return new GenericArrayData(keys);
}
@Override
public ArrayData valueArray() {
Object[] values = map.values().toArray();
return new GenericArrayData(values);
}
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof GenericMapData)) {
return false;
}
// deepEquals for values of byte[]
return deepEquals(map, ((GenericMapData) o).map);
}
private static <K, V> boolean deepEquals(Map<K, V> m1, Map<?, ?> m2) {
// copied from HashMap.equals but with deepEquals comparison
if (m1.size() != m2.size()) {
return false;
}
try {
for (Map.Entry<K, V> e : m1.entrySet()) {
K key = e.getKey();
V value = e.getValue();
if (value == null) {
if (!(m2.get(key) == null && m2.containsKey(key))) {
return false;
}
} else {
if (!Objects.deepEquals(value, m2.get(key))) {
return false;
}
}
}
} catch (ClassCastException | NullPointerException unused) {
return false;
}
return true;
}
@Override
public int hashCode() {
int result = 0;
for (Object key : map.keySet()) {
// only include key because values can contain byte[]
result += 31 * Objects.hashCode(key);
}
return result;
}
@Override
public String toString() {
return map.toString();
}
}

@ -14,16 +14,36 @@
* limitations under the License.
*/
package com.ververica.cdc.common.event;
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.StringUtils;
import com.ververica.cdc.common.event.OperationType;
import com.ververica.cdc.common.types.ArrayType;
import com.ververica.cdc.common.types.MapType;
import com.ververica.cdc.common.types.RowType;
import java.io.Serializable;
import java.util.Arrays;
/** Class {@code GenericRecordData} describes the data of changed record in the external system. */
/**
* An internal data structure representing data of {@link RowType} and other (possibly nested)
* structured types such as {@link MapType}, {@link ArrayType}.
*
* <p>{@link GenericRecordData} is a generic implementation of {@link RecordData} which is backed by
* an array of Java {@link Object}. A {@link GenericRecordData} can have an arbitrary number of
* fields of different types. The fields in a row can be accessed by position (0-based) using either
* the generic {@link #getField(int)} or type-specific getters (such as {@link #getInt(int)}). A
* field can be updated by the generic {@link #setField(int, Object)}.
*
* <p>Note: All fields of this data structure must be internal data structures. See {@link
* RecordData} for more information about internal data structures.
*
* <p>The fields in {@link GenericRecordData} can be null for representing nullability.
*/
@PublicEvolving
public final class GenericRecordData implements RecordData {
public final class GenericRecordData implements RecordData, Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;
@ -113,13 +133,48 @@ public final class GenericRecordData implements RecordData {
}
@Override
public String getString(int pos) {
return (String) this.fields[pos];
public byte[] getBinary(int pos) {
return (byte[]) this.fields[pos];
}
@Override
public byte[] getBinary(int pos) {
return (byte[]) this.fields[pos];
public StringData getString(int pos) {
return (StringData) this.fields[pos];
}
@Override
public DecimalData getDecimal(int pos, int precision, int scale) {
return (DecimalData) this.fields[pos];
}
@Override
public TimestampData getTimestamp(int pos, int precision) {
return (TimestampData) this.fields[pos];
}
@Override
public ZonedTimestampData getZonedTimestamp(int pos, int precision) {
return (ZonedTimestampData) this.fields[pos];
}
@Override
public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) {
return (LocalZonedTimestampData) this.fields[pos];
}
@Override
public ArrayData getArray(int pos) {
return (ArrayData) this.fields[pos];
}
@Override
public MapData getMap(int pos) {
return (MapData) this.fields[pos];
}
@Override
public RecordData getRow(int pos, int numFields) {
return (RecordData) this.fields[pos];
}
@Override
@ -153,15 +208,13 @@ public final class GenericRecordData implements RecordData {
return sb.toString();
}
// ----------------------------------------------------------------------------------------
// Utilities
// ----------------------------------------------------------------------------------------
// ------------------------------------------------------------------------------------------
// Constructor Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates an instance of {@link GenericRecordData} with given field values.
*
* <p>By default, the record describes a {@link OperationType#INSERT} in a changelog.
*
* <p>Note: All fields of the record must be internal data structures.
*/
public static GenericRecordData of(Object... values) {

@ -0,0 +1,78 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.CharType;
import com.ververica.cdc.common.types.VarCharType;
import com.ververica.cdc.common.utils.StringUtf8Utils;
import javax.annotation.Nonnull;
import java.util.Objects;
/** A internal data structure representing data of {@link VarCharType} and {@link CharType}. */
@PublicEvolving
public final class GenericStringData implements StringData {
private final String javaStr;
private GenericStringData(String javaStr) {
this.javaStr = javaStr;
}
@Override
public byte[] toBytes() {
if (javaStr == null) {
return null;
} else {
return StringUtf8Utils.encodeUTF8(javaStr);
}
}
@Override
public int compareTo(@Nonnull StringData o) {
GenericStringData other = (GenericStringData) o;
return javaStr.compareTo(other.javaStr);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GenericStringData)) {
return false;
}
GenericStringData that = (GenericStringData) o;
return Objects.equals(javaStr, that.javaStr);
}
@Override
public int hashCode() {
return Objects.hash(javaStr);
}
// ------------------------------------------------------------------------------------------
// Constructor Utilities
// ------------------------------------------------------------------------------------------
public static GenericStringData fromString(String javaStr) {
return new GenericStringData(javaStr);
}
}

@ -0,0 +1,181 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.utils.Preconditions;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
/**
* An internal data structure representing data of {@link LocalZonedTimestampType}.
*
* <p>This data structure is immutable and consists of a epoch milliseconds and epoch
* nanos-of-millisecond since epoch {@code 1970-01-01 00:00:00}. It might be stored in a compact
* representation (as a long value) if values are small enough.
*/
@PublicEvolving
public final class LocalZonedTimestampData implements Comparable<LocalZonedTimestampData> {
// the number of milliseconds in a day
private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
// this field holds the epoch second and the milli-of-second
private final long epochMillisecond;
// this field holds the epoch nano-of-millisecond
private final int epochNanoOfMillisecond;
private LocalZonedTimestampData(long epochMillisecond, int epochNanoOfMillisecond) {
Preconditions.checkArgument(
epochNanoOfMillisecond >= 0 && epochNanoOfMillisecond <= 999_999);
this.epochMillisecond = epochMillisecond;
this.epochNanoOfMillisecond = epochNanoOfMillisecond;
}
/** Returns the number of epoch milliseconds since epoch {@code 1970-01-01 00:00:00}. */
public long getEpochMillisecond() {
return epochMillisecond;
}
/**
* Returns the number of epoch nanoseconds (the nanoseconds within the milliseconds).
*
* <p>The value range is from 0 to 999,999.
*/
public int getEpochNanoOfMillisecond() {
return epochNanoOfMillisecond;
}
/** Converts this {@link LocalZonedTimestampData} object to a {@link Instant}. */
public Instant toInstant() {
long epochSecond = epochMillisecond / 1000;
int milliOfSecond = (int) (epochMillisecond % 1000);
if (milliOfSecond < 0) {
--epochSecond;
milliOfSecond += 1000;
}
long nanoAdjustment = milliOfSecond * 1_000_000 + epochNanoOfMillisecond;
return Instant.ofEpochSecond(epochSecond, nanoAdjustment);
}
@Override
public int compareTo(LocalZonedTimestampData that) {
int cmp = Long.compare(this.epochMillisecond, that.epochMillisecond);
if (cmp == 0) {
cmp = this.epochNanoOfMillisecond - that.epochNanoOfMillisecond;
}
return cmp;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof LocalZonedTimestampData)) {
return false;
}
LocalZonedTimestampData that = (LocalZonedTimestampData) obj;
return this.epochMillisecond == that.epochMillisecond
&& this.epochNanoOfMillisecond == that.epochNanoOfMillisecond;
}
@Override
public int hashCode() {
int ret = (int) epochMillisecond ^ (int) (epochMillisecond >> 32);
return 31 * ret + epochNanoOfMillisecond;
}
@Override
public String toString() {
return describeLocalZonedTimestampInUTC0().toString();
}
/**
* Describes this {@link LocalZonedTimestampData} object in {@link LocalDateTime} under UTC0
* time zone.
*/
private LocalDateTime describeLocalZonedTimestampInUTC0() {
int date = (int) (epochMillisecond / MILLIS_PER_DAY);
int time = (int) (epochMillisecond % MILLIS_PER_DAY);
if (time < 0) {
--date;
time += MILLIS_PER_DAY;
}
long nanoOfDay = time * 1_000_000L + epochNanoOfMillisecond;
LocalDate localDate = LocalDate.ofEpochDay(date);
LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay);
return LocalDateTime.of(localDate, localTime);
}
// ------------------------------------------------------------------------------------------
// Constructor Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates an instance of {@link LocalZonedTimestampData} from epoch milliseconds.
*
* <p>The nanos-of-millisecond field will be set to zero.
*
* @param millisecond the number of epoch milliseconds since epoch {@code 1970-01-01 00:00:00};
* a negative number is the number of epoch milliseconds before epoch {@code 1970-01-01
* 00:00:00}
*/
public static LocalZonedTimestampData fromEpochMillis(long millisecond) {
return new LocalZonedTimestampData(millisecond, 0);
}
/**
* Creates an instance of {@link LocalZonedTimestampData} from epoch milliseconds and a epoch
* nanos-of-millisecond.
*
* @param millisecond the number of epoch milliseconds since epoch {@code 1970-01-01 00:00:00};
* a negative number is the number of epoch milliseconds before epoch {@code 1970-01-01
* 00:00:00}
* @param epochNanoOfMillisecond the epoch nanoseconds within the millisecond, from 0 to 999,999
*/
public static LocalZonedTimestampData fromEpochMillis(
long millisecond, int epochNanoOfMillisecond) {
return new LocalZonedTimestampData(millisecond, epochNanoOfMillisecond);
}
/**
* Creates an instance of {@link LocalZonedTimestampData} from an instance of {@link Instant}.
*
* @param instant an instance of {@link Instant}
*/
public static LocalZonedTimestampData fromInstant(Instant instant) {
long epochSecond = instant.getEpochSecond();
int nanoSecond = instant.getNano();
long millisecond = epochSecond * 1_000 + nanoSecond / 1_000_000;
int nanoOfMillisecond = nanoSecond % 1_000_000;
return new LocalZonedTimestampData(millisecond, nanoOfMillisecond);
}
/**
* Returns whether the local zoned timestamp data is small enough to be stored in a long of
* milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
}
}

@ -0,0 +1,51 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.MapType;
/**
* Base interface of an internal data structure representing data of {@link MapType}.
*
* <p>Note: All keys and values of this data structure must be internal data structures. All keys
* must be of the same type; same for values. See {@link RecordData} for more information about
* internal data structures.
*
* <p>Use {@link GenericMapData} to construct instances of this interface from regular Java maps.
*/
@PublicEvolving
public interface MapData {
/** Returns the number of key-value mappings in this map. */
int size();
/**
* Returns an array view of the keys contained in this map.
*
* <p>A key-value pair has the same index in the key array and value array.
*/
ArrayData keyArray();
/**
* Returns an array view of the values contained in this map.
*
* <p>A key-value pair has the same index in the key array and value array.
*/
ArrayData valueArray();
}

@ -0,0 +1,155 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
/**
* Class {@code RecordData} describes the data of changed record (i.e. row) in the external system.
*
* <p>The mappings from external SQL data types to the internal data structures are listed in the
* following table:
*
* <pre>
* +--------------------------------+-----------------------------------------+
* | SQL Data Types | Internal Data Structures |
* +--------------------------------+-----------------------------------------+
* | BOOLEAN | boolean |
* +--------------------------------+-----------------------------------------+
* | CHAR / VARCHAR / STRING | {@link StringData} |
* +--------------------------------+-----------------------------------------+
* | BINARY / VARBINARY / BYTES | byte[] |
* +--------------------------------+-----------------------------------------+
* | DECIMAL | {@link DecimalData} |
* +--------------------------------+-----------------------------------------+
* | TINYINT | byte |
* +--------------------------------+-----------------------------------------+
* | SMALLINT | short |
* +--------------------------------+-----------------------------------------+
* | INT | int |
* +--------------------------------+-----------------------------------------+
* | BIGINT | long |
* +--------------------------------+-----------------------------------------+
* | FLOAT | float |
* +--------------------------------+-----------------------------------------+
* | DOUBLE | double |
* +--------------------------------+-----------------------------------------+
* | DATE | int (number of days since epoch) |
* +--------------------------------+-----------------------------------------+
* | TIME | int (number of milliseconds of the day) |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP | {@link TimestampData} |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP WITH LOCAL TIME ZONE | {@link LocalZonedTimestampData} |
* +--------------------------------+-----------------------------------------+
* | TIMESTAMP WITH TIME ZONE | {@link ZonedTimestampData} |
* +--------------------------------+-----------------------------------------+
* | ROW | {@link RecordData} |
* +--------------------------------+-----------------------------------------+
* | ARRAY | {@link ArrayData} |
* +--------------------------------+-----------------------------------------+
* | MAP | {@link MapData} |
* +--------------------------------+-----------------------------------------+
* </pre>
*
* <p>Nullability is always handled by the container data structure.
*/
@PublicEvolving
public interface RecordData {
/** Returns the number of fields in this record. */
int getArity();
// ------------------------------------------------------------------------------------------
// Read-only accessor methods
// ------------------------------------------------------------------------------------------
/** Returns true if the field is null at the given position. */
boolean isNullAt(int pos);
/** Returns the boolean value at the given position. */
boolean getBoolean(int pos);
/** Returns the byte value at the given position. */
byte getByte(int pos);
/** Returns the short value at the given position. */
short getShort(int pos);
/** Returns the integer value at the given position. */
int getInt(int pos);
/** Returns the long value at the given position. */
long getLong(int pos);
/** Returns the float value at the given position. */
float getFloat(int pos);
/** Returns the double value at the given position. */
double getDouble(int pos);
/** Returns the binary value at the given position. */
byte[] getBinary(int pos);
/** Returns the string value at the given position. */
StringData getString(int pos);
/**
* Returns the decimal value at the given position.
*
* <p>The precision and scale are required to determine whether the decimal value was stored in
* a compact representation (see {@link DecimalData}).
*/
DecimalData getDecimal(int pos, int precision, int scale);
/**
* Returns the timestamp value at the given position.
*
* <p>The precision is required to determine whether the timestamp value was stored in a compact
* representation (see {@link TimestampData}).
*/
TimestampData getTimestamp(int pos, int precision);
/**
* Returns the zoned timestamp value at the given position.
*
* <p>The precision is required to determine whether the zoned timestamp value was stored in a
* compact representation (see {@link ZonedTimestampData}).
*/
ZonedTimestampData getZonedTimestamp(int pos, int precision);
/**
* Returns the local zoned timestamp value at the given position.
*
* <p>The precision is required to determine whether the local zoned timestamp value was stored
* in a compact representation (see {@link LocalZonedTimestampData}).
*/
LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision);
/** Returns the array value at the given position. */
ArrayData getArray(int pos);
/** Returns the map value at the given position. */
MapData getMap(int pos);
/**
* Returns the row value at the given position.
*
* <p>The number of fields is required to correctly extract the record.
*/
RecordData getRow(int pos, int numFields);
}

@ -0,0 +1,37 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.CharType;
import com.ververica.cdc.common.types.VarCharType;
/** An internal data structure representing data of {@link CharType} and {@link VarCharType}. */
@PublicEvolving
public interface StringData extends Comparable<StringData> {
/**
* Converts this {@link StringData} object to a UTF-8 byte array.
*
* <p>Note: The returned byte array may be reused.
*/
byte[] toBytes();
/** Converts this {@link StringData} object to a {@link String}. */
String toString();
}

@ -0,0 +1,150 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.utils.Preconditions;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
/**
* An internal data structure representing data of {@link TimestampType}.
*
* <p>This data structure is immutable and consists of a milliseconds and nanos-of-millisecond since
* {@code 1970-01-01 00:00:00} of UTC+0. It might be stored in a compact representation (as a long
* value) if values are small enough.
*/
@PublicEvolving
public final class TimestampData implements Comparable<TimestampData> {
// the number of milliseconds in a day
private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
// this field holds the integral second and the milli-of-second
private final long millisecond;
// this field holds the nano-of-millisecond
private final int nanoOfMillisecond;
private TimestampData(long millisecond, int nanoOfMillisecond) {
Preconditions.checkArgument(nanoOfMillisecond >= 0 && nanoOfMillisecond <= 999_999);
this.millisecond = millisecond;
this.nanoOfMillisecond = nanoOfMillisecond;
}
/** Returns the number of milliseconds since {@code 1970-01-01 00:00:00}. */
public long getMillisecond() {
return millisecond;
}
/**
* Returns the number of nanoseconds (the nanoseconds within the milliseconds).
*
* <p>The value range is from 0 to 999,999.
*/
public int getNanoOfMillisecond() {
return nanoOfMillisecond;
}
/** Converts this {@link TimestampData} object to a {@link Timestamp}. */
public Timestamp toTimestamp() {
return Timestamp.valueOf(toLocalDateTime());
}
/** Converts this {@link TimestampData} object to a {@link LocalDateTime}. */
public LocalDateTime toLocalDateTime() {
int date = (int) (millisecond / MILLIS_PER_DAY);
int time = (int) (millisecond % MILLIS_PER_DAY);
if (time < 0) {
--date;
time += MILLIS_PER_DAY;
}
long nanoOfDay = time * 1_000_000L + nanoOfMillisecond;
LocalDate localDate = LocalDate.ofEpochDay(date);
LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay);
return LocalDateTime.of(localDate, localTime);
}
@Override
public int compareTo(TimestampData that) {
int cmp = Long.compare(this.millisecond, that.millisecond);
if (cmp == 0) {
cmp = this.nanoOfMillisecond - that.nanoOfMillisecond;
}
return cmp;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof TimestampData)) {
return false;
}
TimestampData that = (TimestampData) obj;
return this.millisecond == that.millisecond
&& this.nanoOfMillisecond == that.nanoOfMillisecond;
}
@Override
public String toString() {
return toLocalDateTime().toString();
}
@Override
public int hashCode() {
int ret = (int) millisecond ^ (int) (millisecond >> 32);
return 31 * ret + nanoOfMillisecond;
}
// ------------------------------------------------------------------------------------------
// Constructor Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates an instance of {@link TimestampData} from an instance of {@link LocalDateTime}.
*
* @param dateTime an instance of {@link LocalDateTime}
*/
public static TimestampData fromLocalDateTime(LocalDateTime dateTime) {
long epochDay = dateTime.toLocalDate().toEpochDay();
long nanoOfDay = dateTime.toLocalTime().toNanoOfDay();
long millisecond = epochDay * MILLIS_PER_DAY + nanoOfDay / 1_000_000;
int nanoOfMillisecond = (int) (nanoOfDay % 1_000_000);
return new TimestampData(millisecond, nanoOfMillisecond);
}
/**
* Creates an instance of {@link TimestampData} from an instance of {@link Timestamp}.
*
* @param timestamp an instance of {@link Timestamp}
*/
public static TimestampData fromTimestamp(Timestamp timestamp) {
return fromLocalDateTime(timestamp.toLocalDateTime());
}
/**
* Returns whether the timestamp data is small enough to be stored in a long of milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
}
}

@ -0,0 +1,234 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.data;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.types.ZonedTimestampType;
import com.ververica.cdc.common.utils.Preconditions;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
/**
* An internal data structure representing data of {@link ZonedTimestampType}. It aims to converting
* various Java time representations into the date-time in a particular time zone.
*
* <p>The ISO date-time format is used by default, it includes the date, time (including fractional
* parts), and offset from UTC, such as '2011-12-03T10:15:30+01:00'.
*/
@PublicEvolving
public final class ZonedTimestampData implements Comparable<ZonedTimestampData> {
/**
* The ISO date-time format includes the date, time (including fractional parts), and offset
* from UTC, such as '2011-12-03T10:15:30.030431+01:00'.
*/
public static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_OFFSET_DATE_TIME;
// the number of milliseconds in a day
private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
// this field holds the integral second and the milli-of-second
private final long millisecond;
// this field holds the nano-of-millisecond
private final int nanoOfMillisecond;
// this field holds time zone id
private final String zoneId;
private ZonedTimestampData(long millisecond, int nanoOfMillisecond, String zoneId) {
Preconditions.checkArgument(nanoOfMillisecond >= 0 && nanoOfMillisecond <= 999_999);
Preconditions.checkNotNull(zoneId);
this.millisecond = millisecond;
this.nanoOfMillisecond = nanoOfMillisecond;
this.zoneId = zoneId;
}
/** Returns the zoned date-time with time-zone. */
public ZonedDateTime getZonedDateTime() {
return ZonedDateTime.of(getLocalDateTimePart(), ZoneId.of(zoneId));
}
/** Converts this {@link ZonedTimestampData} object to a {@link Instant}. */
public Instant toInstant() {
return ZonedDateTime.of(getLocalDateTimePart(), ZoneId.of(zoneId)).toInstant();
}
/** Converts this {@link ZonedTimestampData} object to a {@link Timestamp}. */
public Timestamp toTimestamp() {
return Timestamp.from(toInstant());
}
/** Returns the number of milliseconds since {@code 1970-01-01 00:00:00}. */
public long getMillisecond() {
return millisecond;
}
/**
* Returns the number of nanoseconds (the nanoseconds within the milliseconds).
*
* <p>The value range is from 0 to 999,999.
*/
public int getNanoOfMillisecond() {
return nanoOfMillisecond;
}
/** Returns the {@code LocalDateTime} part of this zoned date-time. */
public LocalDateTime getLocalDateTimePart() {
int date = (int) (millisecond / MILLIS_PER_DAY);
int time = (int) (millisecond % MILLIS_PER_DAY);
if (time < 0) {
--date;
time += MILLIS_PER_DAY;
}
long nanoOfDay = time * 1_000_000L + nanoOfMillisecond;
LocalDate localDate = LocalDate.ofEpochDay(date);
LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay);
return LocalDateTime.of(localDate, localTime);
}
/** Returns the {@code ZoneId} part of this zoned date-time. */
public ZoneId getTimeZoneId() {
return ZoneId.of(zoneId);
}
@Override
public int compareTo(ZonedTimestampData that) {
// converts to instant and then compare
long epochMillisecond = this.toInstant().getEpochSecond();
int epochNanoOfMillisecond = this.toInstant().getNano();
long thatEpochMillisecond = that.toInstant().getEpochSecond();
int thatEpochNanoOfMillisecond = that.toInstant().getNano();
int cmp = Long.compare(epochMillisecond, thatEpochMillisecond);
if (cmp == 0) {
cmp = epochNanoOfMillisecond - thatEpochNanoOfMillisecond;
}
return cmp;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ZonedTimestampData)) {
return false;
}
ZonedTimestampData that = (ZonedTimestampData) obj;
return this.millisecond == that.millisecond
&& this.nanoOfMillisecond == that.nanoOfMillisecond
&& this.zoneId.equals(that.zoneId);
}
@Override
public String toString() {
return getZonedDateTime().format(ISO_FORMATTER);
}
@Override
public int hashCode() {
return Objects.hash(millisecond, nanoOfMillisecond, zoneId);
}
// ------------------------------------------------------------------------------------------
// Constructor Utilities
// ------------------------------------------------------------------------------------------
/**
* Creates an instance of {@link ZonedTimestampData} from an instance of {@link LocalDateTime}.
*
* @param zonedDateTime an instance of {@link ZonedDateTime}
*/
public static ZonedTimestampData fromZonedDateTime(ZonedDateTime zonedDateTime) {
LocalDateTime dateTimePart = zonedDateTime.toLocalDateTime();
long epochDay = dateTimePart.toLocalDate().toEpochDay();
long nanoOfDay = dateTimePart.toLocalTime().toNanoOfDay();
long millisecond = epochDay * MILLIS_PER_DAY + nanoOfDay / 1_000_000;
int nanoOfMillisecond = (int) (nanoOfDay % 1_000_000);
String zoneId = zonedDateTime.getZone().toString();
return new ZonedTimestampData(millisecond, nanoOfMillisecond, zoneId);
}
/**
* Creates an instance of {@link ZonedTimestampData} from an instance of {@link LocalDateTime}.
*
* @param offsetDateTime an instance of {@link OffsetDateTime}
*/
public static ZonedTimestampData fromOffsetDateTime(OffsetDateTime offsetDateTime) {
return fromZonedDateTime(offsetDateTime.toZonedDateTime());
}
/**
* Creates an instance of {@link ZonedTimestampData} from epoch milliseconds.
*
* <p>The epoch nanos-of-millisecond field will be set to zero.
*
* @param epochMillisecond the number of epoch milliseconds since epoch {@code 1970-01-01
* 00:00:00}; a negative number is the number of epoch milliseconds before epoch {@code
* 1970-01-01 00:00:00}
*/
public static ZonedTimestampData fromEpochMillis(long epochMillisecond) {
return fromEpochMillis(epochMillisecond, 0);
}
/**
* Creates an instance of {@link ZonedTimestampData} from epoch milliseconds and a epoch
* nanos-of-millisecond.
*
* @param epochMillisecond the number of epoch milliseconds since epoch {@code 1970-01-01
* 00:00:00}; a negative number is the number of epoch milliseconds before epoch {@code
* 1970-01-01 00:00:00}
* @param epochNanoOfMillisecond the nanoseconds within the epoch millisecond, from 0 to 999,999
*/
public static ZonedTimestampData fromEpochMillis(
long epochMillisecond, int epochNanoOfMillisecond) {
long epochSecond = epochMillisecond / 1000;
int milliOfSecond = (int) (epochMillisecond % 1000);
if (milliOfSecond < 0) {
--epochSecond;
milliOfSecond += 1000;
}
long nanoAdjustment = milliOfSecond * 1_000_000 + epochNanoOfMillisecond;
return fromInstant(Instant.ofEpochSecond(epochSecond, nanoAdjustment));
}
/**
* Creates an instance of {@link ZonedTimestampData} from an instance of {@link Instant}.
*
* @param instant an instance of {@link Instant}
*/
public static ZonedTimestampData fromInstant(Instant instant) {
return fromZonedDateTime(ZonedDateTime.from(instant));
}
/**
* Returns whether the date-time part is small enough to be stored in a long of milliseconds.
*/
public static boolean isCompact(int precision) {
return precision <= 3;
}
}

@ -18,6 +18,8 @@ package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving;
import com.ververica.cdc.common.data.RecordData;
import java.io.Serializable;
import java.util.Map;

@ -1,63 +0,0 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.event;
import org.apache.flink.annotation.PublicEvolving;
/** Class {@code RecordData} describes the data of changed record in the external system. */
@PublicEvolving
public interface RecordData {
/** Returns the number of fields in this row. */
int getArity();
// ------------------------------------------------------------------------------------------
// Read-only accessor methods
// ------------------------------------------------------------------------------------------
/** Returns true if the field is null at the given position. */
boolean isNullAt(int pos);
/** Returns the boolean value at the given position. */
boolean getBoolean(int pos);
/** Returns the byte value at the given position. */
byte getByte(int pos);
/** Returns the short value at the given position. */
short getShort(int pos);
/** Returns the integer value at the given position. */
int getInt(int pos);
/** Returns the long value at the given position. */
long getLong(int pos);
/** Returns the float value at the given position. */
float getFloat(int pos);
/** Returns the double value at the given position. */
double getDouble(int pos);
/** Returns the string value at the given position. */
String getString(int pos);
/** Returns the binary value at the given position. */
byte[] getBinary(int pos);
// TODO: add more methods for other types
}

@ -39,8 +39,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.ververica.cdc.common.types.DataTypes.ROW;
/** Schema of a table or data collection. */
@PublicEvolving
public class Schema implements Serializable {
@ -125,7 +123,7 @@ public class Schema implements Serializable {
final DataField[] fields =
columns.stream().map(Schema::columnToField).toArray(DataField[]::new);
// the row should never be null
return ROW(fields).notNull();
return DataTypes.ROW(fields).notNull();
}
// -----------------------------------------------------------------------------------
@ -155,10 +153,7 @@ public class Schema implements Serializable {
// -----------------------------------------------------------------------------------
/**
* A builder for constructing an immutable but still unresolved {@link
* org.apache.flink.table.api.Schema}.
*/
/** A builder for constructing an immutable but still unresolved {@link Schema}. */
@PublicEvolving
public static final class Builder {

@ -16,6 +16,11 @@
package com.ververica.cdc.common.types;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
/**
* Utilities for checking {@link DataType} and avoiding a lot of type casting and repetitive work.
*/
@ -23,35 +28,220 @@ public final class DataTypeChecks {
private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor();
private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor();
private static final ScaleExtractor SCALE_EXTRACTOR = new ScaleExtractor();
private static final FieldCountExtractor FIELD_COUNT_EXTRACTOR = new FieldCountExtractor();
private static final FieldNamesExtractor FIELD_NAMES_EXTRACTOR = new FieldNamesExtractor();
private static final FieldTypesExtractor FIELD_TYPES_EXTRACTOR = new FieldTypesExtractor();
private static final NestedTypesExtractor NESTED_TYPES_EXTRACTOR = new NestedTypesExtractor();
/**
* Checks if the given type is a composite type.
*
* <p>Use {@link #getFieldCount(DataType)}, {@link #getFieldNames(DataType)}, {@link
* #getFieldTypes(DataType)} for unified handling of composite types.
*
* @return True if the type is composite type.
*/
public static boolean isCompositeType(DataType dataType) {
return dataType.getTypeRoot() == DataTypeRoot.ROW;
}
public static int getLength(DataType dataType) {
return dataType.accept(LENGTH_EXTRACTOR);
}
public static boolean hasLength(DataType dataType, int length) {
return getLength(dataType) == length;
}
/** Returns the precision of all types that define a precision implicitly or explicitly. */
public static int getPrecision(DataType dataType) {
return dataType.accept(PRECISION_EXTRACTOR);
}
/** Checks the precision of a type that defines a precision implicitly or explicitly. */
public static boolean hasPrecision(DataType dataType, int precision) {
return getPrecision(dataType) == precision;
}
/** Returns the scale of all types that define a scale implicitly or explicitly. */
public static int getScale(DataType dataType) {
return dataType.accept(SCALE_EXTRACTOR);
}
/** Checks the scale of all types that define a scale implicitly or explicitly. */
public static boolean hasScale(DataType dataType, int scale) {
return getScale(dataType) == scale;
}
/** Returns the field count of row and structured types. Other types return 1. */
public static int getFieldCount(DataType dataType) {
return dataType.accept(FIELD_COUNT_EXTRACTOR);
}
/** Returns the field names of row and structured types. */
public static List<String> getFieldNames(DataType dataType) {
return dataType.accept(FIELD_NAMES_EXTRACTOR);
}
/** Returns the field types of row and structured types. */
public static List<DataType> getFieldTypes(DataType dataType) {
return dataType.accept(FIELD_TYPES_EXTRACTOR);
}
public static List<DataType> getNestedTypes(DataType dataType) {
return dataType.accept(NESTED_TYPES_EXTRACTOR);
}
/**
* Checks whether the given {@link DataType} has a well-defined string representation when
* calling {@link Object#toString()} on the internal data structure. The string representation
* would be similar in SQL or in a programming language.
*
* <p>Note: This method might not be necessary anymore, once we have implemented a utility that
* can convert any internal data structure to a well-defined string representation.
*/
public static boolean hasWellDefinedString(DataType dataType) {
switch (dataType.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
return true;
default:
return false;
}
}
private DataTypeChecks() {
// no instantiation
}
// --------------------------------------------------------------------------------------------
/** Extracts an attribute of data types that define that attribute. */
/** Extracts an attribute of logical types that define that attribute. */
private static class Extractor<T> extends DataTypeDefaultVisitor<T> {
@Override
protected T defaultMethod(DataType dataType) {
throw new IllegalArgumentException(
String.format(
"Invalid use of extractor %s. Called on data type: %s",
this.getClass().getSimpleName(), dataType));
"Invalid use of extractor %s. Called on logical type: %s",
this.getClass().getName(), dataType));
}
}
private static class LengthExtractor extends Extractor<Integer> {
@Override
public Integer visit(CharType charType) {
return charType.getLength();
protected Integer defaultMethod(DataType dataType) {
OptionalInt length = DataTypes.getLength(dataType);
if (length.isPresent()) {
return length.getAsInt();
}
throw new IllegalArgumentException(
String.format(
"Invalid use of extractor %s. Called on logical type: %s",
this.getClass().getName(), dataType));
}
}
private static class PrecisionExtractor extends Extractor<Integer> {
@Override
protected Integer defaultMethod(DataType dataType) {
OptionalInt precision = DataTypes.getPrecision(dataType);
if (precision.isPresent()) {
return precision.getAsInt();
}
throw new IllegalArgumentException(
String.format(
"Invalid use of extractor %s. Called on logical type: %s",
this.getClass().getName(), dataType));
}
}
private static class ScaleExtractor extends Extractor<Integer> {
@Override
public Integer visit(DecimalType decimalType) {
return decimalType.getScale();
}
@Override
public Integer visit(TinyIntType tinyIntType) {
return 0;
}
@Override
public Integer visit(SmallIntType smallIntType) {
return 0;
}
@Override
public Integer visit(IntType intType) {
return 0;
}
@Override
public Integer visit(BigIntType bigIntType) {
return 0;
}
}
private static class FieldCountExtractor extends Extractor<Integer> {
@Override
public Integer visit(RowType rowType) {
return rowType.getFieldCount();
}
@Override
protected Integer defaultMethod(DataType dataType) {
return 1;
}
}
private static class FieldNamesExtractor extends Extractor<List<String>> {
@Override
public List<String> visit(RowType rowType) {
return rowType.getFieldNames();
}
}
private static class FieldTypesExtractor extends Extractor<List<DataType>> {
@Override
public List<DataType> visit(RowType rowType) {
return rowType.getFieldTypes();
}
}
private static class NestedTypesExtractor extends Extractor<List<DataType>> {
@Override
public List<DataType> visit(ArrayType arrayType) {
return Collections.singletonList(arrayType.getElementType());
}
@Override
public List<DataType> visit(MapType mapType) {
return Arrays.asList(mapType.getKeyType(), mapType.getValueType());
}
@Override
public Integer visit(BinaryType binaryType) {
return binaryType.getLength();
public List<DataType> visit(RowType rowType) {
return rowType.getFieldTypes();
}
}
}

@ -19,6 +19,7 @@ package com.ververica.cdc.common.types;
import org.apache.flink.annotation.PublicEvolving;
import java.util.Arrays;
import java.util.OptionalInt;
/**
* A {@link DataType} can be used to declare input and/or output types of operations. This class *
@ -304,11 +305,10 @@ public class DataTypes {
* +14:59} to {@code 9999-12-31 23:59:59.999999999 -14:59}. Leap seconds (23:59:60 and 23:59:61)
* are not supported as the semantics are closer to {@link java.time.OffsetDateTime}.
*
* <p>Compared to {@link org.apache.flink.table.types.logical.ZonedTimestampType}, the time zone
* offset information is not stored physically in every datum. Instead, the type assumes {@link
* java.time.Instant} semantics in UTC time zone at the edges of the table ecosystem. Every
* datum is interpreted in the local time zone configured in the current session for computation
* and visualization.
* <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored
* physically in every datum. Instead, the type assumes {@link java.time.Instant} semantics in
* UTC time zone at the edges of the table ecosystem. Every datum is interpreted in the local
* time zone configured in the current session for computation and visualization.
*
* <p>This type fills the gap between time zone free and time zone mandatory timestamp types by
* allowing the interpretation of UTC timestamps according to the configured session timezone.
@ -330,11 +330,10 @@ public class DataTypes {
* to {@code 9999-12-31 23:59:59.999999 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not
* supported as the semantics are closer to {@link java.time.OffsetDateTime}.
*
* <p>Compared to {@link org.apache.flink.table.types.logical.ZonedTimestampType}, the time zone
* offset information is not stored physically in every datum. Instead, the type assumes {@link
* java.time.Instant} semantics in UTC time zone at the edges of the table ecosystem. Every
* datum is interpreted in the local time zone configured in the current session for computation
* and visualization.
* <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored
* physically in every datum. Instead, the type assumes {@link java.time.Instant} semantics in
* UTC time zone at the edges of the table ecosystem. Every datum is interpreted in the local
* time zone configured in the current session for computation and visualization.
*
* <p>This type fills the gap between time zone free and time zone mandatory timestamp types by
* allowing the interpretation of UTC timestamps according to the configured session timezone.
@ -413,9 +412,82 @@ public class DataTypes {
* <p>This is shortcut for {@link #ROW(DataField...)} where the field names will be generated
* using {@code f0, f1, f2, ...}.
*
* <p>Note: Flink CDC currently doesn't support defining nested row in columns.
* <p>Note: Flink CDC currently doesn't support defining nested record in columns.
*/
public static RowType ROW(DataType... fieldTypes) {
return RowType.builder().fields(fieldTypes).build();
}
public static OptionalInt getPrecision(DataType dataType) {
return dataType.accept(PRECISION_EXTRACTOR);
}
public static OptionalInt getLength(DataType dataType) {
return dataType.accept(LENGTH_EXTRACTOR);
}
private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor();
private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor();
private static class PrecisionExtractor extends DataTypeDefaultVisitor<OptionalInt> {
@Override
public OptionalInt visit(DecimalType decimalType) {
return OptionalInt.of(decimalType.getPrecision());
}
@Override
public OptionalInt visit(TimeType timeType) {
return OptionalInt.of(timeType.getPrecision());
}
@Override
public OptionalInt visit(TimestampType timestampType) {
return OptionalInt.of(timestampType.getPrecision());
}
@Override
public OptionalInt visit(LocalZonedTimestampType localZonedTimestampType) {
return OptionalInt.of(localZonedTimestampType.getPrecision());
}
@Override
public OptionalInt visit(ZonedTimestampType zonedTimestampType) {
return OptionalInt.of(zonedTimestampType.getPrecision());
}
@Override
protected OptionalInt defaultMethod(DataType dataType) {
return OptionalInt.empty();
}
}
private static class LengthExtractor extends DataTypeDefaultVisitor<OptionalInt> {
@Override
public OptionalInt visit(CharType charType) {
return OptionalInt.of(charType.getLength());
}
@Override
public OptionalInt visit(VarCharType varCharType) {
return OptionalInt.of(varCharType.getLength());
}
@Override
public OptionalInt visit(BinaryType binaryType) {
return OptionalInt.of(binaryType.getLength());
}
@Override
public OptionalInt visit(VarBinaryType varBinaryType) {
return OptionalInt.of(varBinaryType.getLength());
}
@Override
protected OptionalInt defaultMethod(DataType dataType) {
return OptionalInt.empty();
}
}
}

@ -30,10 +30,10 @@ import java.util.stream.Collectors;
/**
* Data type of a sequence of fields. A field consists of a field name, field type, and an optional
* description. The most specific type of a row of a table is a row type. In this case, each column
* of the row corresponds to the field of the row type that has the same ordinal position as the
* column. Compared to the SQL standard, an optional field description simplifies the handling with
* complex structures.
* description. The most specific type of a record of a table is a record type. In this case, each
* column of the record corresponds to the field of the record type that has the same ordinal
* position as the column. Compared to the SQL standard, an optional field description simplifies
* the handling with complex structures.
*/
@PublicEvolving
public final class RowType extends DataType {
@ -66,6 +66,10 @@ public final class RowType extends DataType {
return fields.stream().map(DataField::getName).collect(Collectors.toList());
}
public List<DataType> getFieldTypes() {
return fields.stream().map(DataField::getType).collect(Collectors.toList());
}
public DataType getTypeAt(int i) {
return fields.get(i).getType();
}

@ -0,0 +1,265 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.common.utils;
import org.apache.flink.annotation.Internal;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
/** Utilities for String UTF-8. */
@Internal
public final class StringUtf8Utils {
private static final int MAX_BYTES_PER_CHAR = 3;
private static final int MAX_BYTES_LENGTH = 1024 * 64;
private static final int MAX_CHARS_LENGTH = 1024 * 32;
private static final ThreadLocal<byte[]> BYTES_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<char[]> CHARS_LOCAL = new ThreadLocal<>();
private StringUtf8Utils() {
// do not instantiate
}
/** This method must have the same result with JDK's String.getBytes. */
public static byte[] encodeUTF8(String str) {
byte[] bytes = allocateReuseBytes(str.length() * MAX_BYTES_PER_CHAR);
int len = encodeUTF8(str, bytes);
return Arrays.copyOf(bytes, len);
}
public static int encodeUTF8(String str, byte[] bytes) {
int offset = 0;
int len = str.length();
int sl = offset + len;
int dp = 0;
int dlASCII = dp + Math.min(len, bytes.length);
// ASCII only optimized loop
while (dp < dlASCII && str.charAt(offset) < '\u0080') {
bytes[dp++] = (byte) str.charAt(offset++);
}
while (offset < sl) {
char c = str.charAt(offset++);
if (c < 0x80) {
// Have at most seven bits
bytes[dp++] = (byte) c;
} else if (c < 0x800) {
// 2 bytes, 11 bits
bytes[dp++] = (byte) (0xc0 | (c >> 6));
bytes[dp++] = (byte) (0x80 | (c & 0x3f));
} else if (Character.isSurrogate(c)) {
final int uc;
int ip = offset - 1;
if (Character.isHighSurrogate(c)) {
if (sl - ip < 2) {
uc = -1;
} else {
char d = str.charAt(ip + 1);
if (Character.isLowSurrogate(d)) {
uc = Character.toCodePoint(c, d);
} else {
// for some illegal character
// the jdk will ignore the origin character and cast it to '?'
// this acts the same with jdk
return defaultEncodeUTF8(str, bytes);
}
}
} else {
if (Character.isLowSurrogate(c)) {
// for some illegal character
// the jdk will ignore the origin character and cast it to '?'
// this acts the same with jdk
return defaultEncodeUTF8(str, bytes);
} else {
uc = c;
}
}
if (uc < 0) {
bytes[dp++] = (byte) '?';
} else {
bytes[dp++] = (byte) (0xf0 | ((uc >> 18)));
bytes[dp++] = (byte) (0x80 | ((uc >> 12) & 0x3f));
bytes[dp++] = (byte) (0x80 | ((uc >> 6) & 0x3f));
bytes[dp++] = (byte) (0x80 | (uc & 0x3f));
offset++; // 2 chars
}
} else {
// 3 bytes, 16 bits
bytes[dp++] = (byte) (0xe0 | ((c >> 12)));
bytes[dp++] = (byte) (0x80 | ((c >> 6) & 0x3f));
bytes[dp++] = (byte) (0x80 | (c & 0x3f));
}
}
return dp;
}
public static int defaultEncodeUTF8(String str, byte[] bytes) {
try {
byte[] buffer = str.getBytes("UTF-8");
System.arraycopy(buffer, 0, bytes, 0, buffer.length);
return buffer.length;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("encodeUTF8 error", e);
}
}
public static String decodeUTF8(byte[] input, int offset, int byteLen) {
char[] chars = allocateReuseChars(byteLen);
int len = decodeUTF8Strict(input, offset, byteLen, chars);
if (len < 0) {
return defaultDecodeUTF8(input, offset, byteLen);
}
return new String(chars, 0, len);
}
public static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
final int sl = sp + len;
int dp = 0;
int dlASCII = Math.min(len, da.length);
// ASCII only optimized loop
while (dp < dlASCII && sa[sp] >= 0) {
da[dp++] = (char) sa[sp++];
}
while (sp < sl) {
int b1 = sa[sp++];
if (b1 >= 0) {
// 1 byte, 7 bits: 0xxxxxxx
da[dp++] = (char) b1;
} else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) {
// 2 bytes, 11 bits: 110xxxxx 10xxxxxx
if (sp < sl) {
int b2 = sa[sp++];
if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2)
return -1;
} else {
da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80)));
}
continue;
}
return -1;
} else if ((b1 >> 4) == -2) {
// 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx
if (sp + 1 < sl) {
int b2 = sa[sp++];
int b3 = sa[sp++];
if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80)
|| (b2 & 0xc0) != 0x80
|| (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3)
return -1;
} else {
char c =
(char)
((b1 << 12)
^ (b2 << 6)
^ (b3
^ (((byte) 0xE0 << 12)
^ ((byte) 0x80 << 6)
^ ((byte) 0x80))));
if (Character.isSurrogate(c)) {
return -1;
} else {
da[dp++] = c;
}
}
continue;
}
return -1;
} else if ((b1 >> 3) == -2) {
// 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
if (sp + 2 < sl) {
int b2 = sa[sp++];
int b3 = sa[sp++];
int b4 = sa[sp++];
int uc =
((b1 << 18)
^ (b2 << 12)
^ (b3 << 6)
^ (b4
^ (((byte) 0xF0 << 18)
^ ((byte) 0x80 << 12)
^ ((byte) 0x80 << 6)
^ ((byte) 0x80))));
// isMalformed4 and shortest form check
if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80)
|| !Character.isSupplementaryCodePoint(uc)) {
return -1;
} else {
da[dp++] = Character.highSurrogate(uc);
da[dp++] = Character.lowSurrogate(uc);
}
continue;
}
return -1;
} else {
return -1;
}
}
return dp;
}
public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
return new String(bytes, offset, len, StandardCharsets.UTF_8);
}
/**
* Allocate bytes that is only for temporary usage, it should not be stored in somewhere else.
* Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] new and gc.
*
* <p>If there are methods that can only accept a byte[], instead of a MemorySegment[]
* parameter, we can allocate a reuse bytes and copy the MemorySegment data to byte[], then call
* the method. Such as String deserialization.
*/
public static byte[] allocateReuseBytes(int length) {
byte[] bytes = BYTES_LOCAL.get();
if (bytes == null) {
if (length <= MAX_BYTES_LENGTH) {
bytes = new byte[MAX_BYTES_LENGTH];
BYTES_LOCAL.set(bytes);
} else {
bytes = new byte[length];
}
} else if (bytes.length < length) {
bytes = new byte[length];
}
return bytes;
}
public static char[] allocateReuseChars(int length) {
char[] chars = CHARS_LOCAL.get();
if (chars == null) {
if (length <= MAX_CHARS_LENGTH) {
chars = new char[MAX_CHARS_LENGTH];
CHARS_LOCAL.set(chars);
} else {
chars = new char[length];
}
} else if (chars.length < length) {
chars = new char[length];
}
return chars;
}
}

@ -101,7 +101,8 @@ public class TypeCheckUtils {
// ordered by type root definition
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR: // The internal representation of String is BinaryString which is mutable
case VARCHAR: // The internal representation of String is BinaryString which is
// mutable
case ARRAY:
case MAP:
case ROW:

Loading…
Cancel
Save