[common] Shade kafka-related dependency (#386)

pull/403/head
Shengkai 3 years ago committed by GitHub
parent 81376843c2
commit 39b326fe26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -45,6 +45,10 @@ under the License.
<artifactId>kafka-log4j-appender</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

@ -156,41 +156,71 @@ public final class RowDataDebeziumDeserializeSchema
return wrapIntoNullableConverter(createNotNullConverter(type));
}
// --------------------------------------------------------------------------------
// IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is
// necessary because the maven shade plugin cannot relocate classes in
// SerializedLambdas (MSHADE-260).
// --------------------------------------------------------------------------------
/** Creates a runtime converter which assuming input object is not null. */
private DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return (dbzObj, schema) -> null;
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return null;
}
};
case BOOLEAN:
return this::convertToBoolean;
return convertToBoolean();
case TINYINT:
return (dbzObj, schema) -> Byte.parseByte(dbzObj.toString());
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
};
case SMALLINT:
return (dbzObj, schema) -> Short.parseShort(dbzObj.toString());
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return this::convertToInt;
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return this::convertToLong;
return convertToLong();
case DATE:
return this::convertToDate;
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return this::convertToTime;
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return this::convertToTimestamp;
return convertToTimestamp();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return this::convertToLocalTimeZoneTimestamp;
return convertToLocalTimeZoneTimestamp();
case FLOAT:
return this::convertToFloat;
return convertToFloat();
case DOUBLE:
return this::convertToDouble;
return convertToDouble();
case CHAR:
case VARCHAR:
return this::convertToString;
return convertToString();
case BINARY:
case VARBINARY:
return this::convertToBinary;
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
@ -204,9 +234,15 @@ public final class RowDataDebeziumDeserializeSchema
}
}
private boolean convertToBoolean(Object dbzObj, Schema schema) {
private DeserializationRuntimeConverter convertToBoolean() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return (boolean) dbzObj;
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
@ -215,52 +251,100 @@ public final class RowDataDebeziumDeserializeSchema
return Boolean.parseBoolean(dbzObj.toString());
}
}
};
}
private DeserializationRuntimeConverter convertToInt() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
private int convertToInt(Object dbzObj, Schema schema) {
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return (int) dbzObj;
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
};
}
private long convertToLong(Object dbzObj, Schema schema) {
private DeserializationRuntimeConverter convertToLong() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return (long) dbzObj;
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
};
}
private double convertToDouble(Object dbzObj, Schema schema) {
private DeserializationRuntimeConverter convertToDouble() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return (double) dbzObj;
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
};
}
private float convertToFloat(Object dbzObj, Schema schema) {
private DeserializationRuntimeConverter convertToFloat() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return (float) dbzObj;
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
};
}
private int convertToDate(Object dbzObj, Schema schema) {
private DeserializationRuntimeConverter convertToDate() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
}
};
}
private DeserializationRuntimeConverter convertToTime() {
return new DeserializationRuntimeConverter() {
private int convertToTime(Object dbzObj, Schema schema) {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
@ -269,30 +353,49 @@ public final class RowDataDebeziumDeserializeSchema
return (int) ((long) dbzObj / 1000_000);
}
} else if (dbzObj instanceof Integer) {
return (int) dbzObj;
return dbzObj;
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
}
};
}
private DeserializationRuntimeConverter convertToTimestamp() {
return new DeserializationRuntimeConverter() {
private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
return TimestampData.fromEpochMillis(
micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
return TimestampData.fromEpochMillis(
nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
};
}
private DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp() {
return new DeserializationRuntimeConverter() {
private TimestampData convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP type is encoded in string type
@ -306,14 +409,30 @@ public final class RowDataDebeziumDeserializeSchema
+ "' of type "
+ dbzObj.getClass().getName());
}
};
}
private DeserializationRuntimeConverter convertToString() {
return new DeserializationRuntimeConverter() {
private StringData convertToString(Object dbzObj, Schema schema) {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return StringData.fromString(dbzObj.toString());
}
};
}
private DeserializationRuntimeConverter convertToBinary() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
private byte[] convertToBinary(Object dbzObj, Schema schema) {
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return (byte[]) dbzObj;
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
@ -324,11 +443,18 @@ public final class RowDataDebeziumDeserializeSchema
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
};
}
private DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return (dbzObj, schema) -> {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
@ -341,7 +467,8 @@ public final class RowDataDebeziumDeserializeSchema
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal = VariableScaleDecimal.toLogical((Struct) dbzObj);
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
@ -349,6 +476,7 @@ public final class RowDataDebeziumDeserializeSchema
}
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
};
}
@ -360,7 +488,12 @@ public final class RowDataDebeziumDeserializeSchema
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return (dbzObj, schema) -> {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
GenericRowData row = new GenericRowData(arity);
@ -368,10 +501,12 @@ public final class RowDataDebeziumDeserializeSchema
String fieldName = fieldNames[i];
Object fieldValue = struct.get(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema);
Object convertedField =
convertField(fieldConverters[i], fieldValue, fieldSchema);
row.setField(i, convertedField);
}
return row;
}
};
}
@ -387,11 +522,17 @@ public final class RowDataDebeziumDeserializeSchema
private DeserializationRuntimeConverter wrapIntoNullableConverter(
DeserializationRuntimeConverter converter) {
return (dbzObj, schema) -> {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
if (dbzObj == null) {
return null;
}
return converter.convert(dbzObj, schema);
}
};
}
}

@ -69,12 +69,14 @@ under the License.
</excludes>
</filter>
</filters>
<!-- <relocations>-->
<!-- <relocation>-->
<!-- <pattern>org.apache.kafka</pattern>-->
<!-- <shadedPattern>com.ververica.cdc.connectors.kafka.shaded.org.apache.kafka</shadedPattern>-->
<!-- </relocation>-->
<!-- </relocations>-->
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.apache.kafka
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>

@ -69,6 +69,14 @@ under the License.
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>
com.ververica.cdc.connectors.shaded.org.apache.kafka
</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>

Loading…
Cancel
Save