diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java index 50dab2ac4..67a179e89 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java @@ -40,6 +40,8 @@ import org.apache.flink.cdc.common.types.VarCharType; import com.starrocks.connector.flink.catalog.StarRocksColumn; import com.starrocks.connector.flink.catalog.StarRocksTable; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.time.LocalDate; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -50,10 +52,14 @@ import java.util.List; import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision; import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale; -/** Utilities for conversion from source table to StarRocks table. */ +/** + * Utilities for conversion from source table to StarRocks table. + */ public class StarRocksUtils { - /** Convert a source table to {@link StarRocksTable}. */ + /** + * Convert a source table to {@link StarRocksTable}. + */ public static StarRocksTable toStarRocksTable( TableId tableId, Schema schema, TableCreateConfig tableCreateConfig) { if (schema.primaryKeys().isEmpty()) { @@ -107,7 +113,9 @@ public class StarRocksUtils { return tableBuilder.build(); } - /** Convert CDC data type to StarRocks data type. */ + /** + * Convert CDC data type to StarRocks data type. + */ public static void toStarRocksDataType( Column cdcColumn, boolean isPrimaryKeys, StarRocksColumn.Builder builder) { CdcDataTypeTransformer dataTypeTransformer = @@ -115,11 +123,15 @@ public class StarRocksUtils { cdcColumn.getType().accept(dataTypeTransformer); } - /** Format DATE type data. */ + /** + * Format DATE type data. + */ private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - /** Format timestamp-related type data. */ + /** + * Format timestamp-related type data. + */ private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @@ -128,9 +140,9 @@ public class StarRocksUtils { * position. * * @param fieldType the element type of the RecordData - * @param fieldPos the element position of the RecordData - * @param zoneId the time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE - * + * @param fieldPos the element position of the RecordData + * @param zoneId the time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE + * */ public static RecordData.FieldGetter createFieldGetter( DataType fieldType, int fieldPos, ZoneId zoneId) { @@ -162,9 +174,15 @@ public class StarRocksUtils { final int decimalPrecision = getPrecision(fieldType); final int decimalScale = getScale(fieldType); fieldGetter = - record -> - record.getDecimal(fieldPos, decimalPrecision, decimalScale) - .toBigDecimal(); + record -> { + /*When converting the binary array to decimal here, the result is not accurate*/ + double value = record.getDouble(fieldPos); + BigDecimal resultValue = BigDecimal.valueOf(value).setScale(decimalScale, RoundingMode.HALF_UP); + if (resultValue.compareTo(BigDecimal.ZERO) == 0 && decimalScale == 0) { + resultValue = record.getDecimal(fieldPos, decimalPrecision, decimalScale).toBigDecimal(); + } + return resultValue; + }; break; case CHAR: case VARCHAR: @@ -229,13 +247,19 @@ public class StarRocksUtils { public static final String DATETIME = "DATETIME"; public static final String JSON = "JSON"; - /** Max size of char type of StarRocks. */ + /** + * Max size of char type of StarRocks. + */ public static final int MAX_CHAR_SIZE = 255; - /** Max size of varchar type of StarRocks. */ + /** + * Max size of varchar type of StarRocks. + */ public static final int MAX_VARCHAR_SIZE = 1048576; - /** Transforms CDC {@link DataType} to StarRocks data type. */ + /** + * Transforms CDC {@link DataType} to StarRocks data type. + */ public static class CdcDataTypeTransformer extends DataTypeDefaultVisitor {