Fixed an issue where the write to decimal type in starrocks sink in pipeline was null.

pull/3674/head
zhuyayun 3 months ago
parent aa4f15a636
commit 2b93b2cf0e

@ -40,6 +40,8 @@ import org.apache.flink.cdc.common.types.VarCharType;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.catalog.StarRocksTable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@ -50,10 +52,14 @@ import java.util.List;
import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
/** Utilities for conversion from source table to StarRocks table. */
/**
* Utilities for conversion from source table to StarRocks table.
*/
public class StarRocksUtils {
/** Convert a source table to {@link StarRocksTable}. */
/**
* Convert a source table to {@link StarRocksTable}.
*/
public static StarRocksTable toStarRocksTable(
TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) {
if (schema.primaryKeys().isEmpty()) {
@ -107,7 +113,9 @@ public class StarRocksUtils {
return tableBuilder.build();
}
/** Convert CDC data type to StarRocks data type. */
/**
* Convert CDC data type to StarRocks data type.
*/
public static void toStarRocksDataType(
Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) {
CdcDataTypeTransformer dataTypeTransformer =
@ -115,11 +123,15 @@ public class StarRocksUtils {
cdcColumn.getType().accept(dataTypeTransformer);
}
/** Format DATE type data. */
/**
* Format DATE type data.
*/
private static final DateTimeFormatter DATE_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
/** Format timestamp-related type data. */
/**
* Format timestamp-related type data.
*/
private static final DateTimeFormatter DATETIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@ -128,9 +140,9 @@ public class StarRocksUtils {
* position.
*
* @param fieldType the element type of the RecordData
* @param fieldPos the element position of the RecordData
* @param zoneId the time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE
* </code>
* @param fieldPos the element position of the RecordData
* @param zoneId the time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE
* </code>
*/
public static RecordData.FieldGetter createFieldGetter(
DataType fieldType, int fieldPos, ZoneId zoneId) {
@ -162,9 +174,15 @@ public class StarRocksUtils {
final int decimalPrecision = getPrecision(fieldType);
final int decimalScale = getScale(fieldType);
fieldGetter =
record ->
record.getDecimal(fieldPos, decimalPrecision, decimalScale)
.toBigDecimal();
record -> {
/*When converting the binary array to decimal here, the result is not accurate*/
double value = record.getDouble(fieldPos);
BigDecimal resultValue = BigDecimal.valueOf(value).setScale(decimalScale, RoundingMode.HALF_UP);
if (resultValue.compareTo(BigDecimal.ZERO) == 0 && decimalScale == 0) {
resultValue = record.getDecimal(fieldPos, decimalPrecision, decimalScale).toBigDecimal();
}
return resultValue;
};
break;
case CHAR:
case VARCHAR:
@ -229,13 +247,19 @@ public class StarRocksUtils {
public static final String DATETIME = "DATETIME";
public static final String JSON = "JSON";
/** Max size of char type of StarRocks. */
/**
* Max size of char type of StarRocks.
*/
public static final int MAX_CHAR_SIZE = 255;
/** Max size of varchar type of StarRocks. */
/**
* Max size of varchar type of StarRocks.
*/
public static final int MAX_VARCHAR_SIZE = 1048576;
/** Transforms CDC {@link DataType} to StarRocks data type. */
/**
* Transforms CDC {@link DataType} to StarRocks data type.
*/
public static class CdcDataTypeTransformer
extends DataTypeDefaultVisitor<StarRocksColumn.Builder> {

Loading…
Cancel
Save