[FLINK-34690] Cast decimal to VARCHAR as primary key in starrocks sink (#3150)

pull/3219/head
Hongshun Wang 10 months ago committed by GitHub
parent 80d461b1b8
commit abb98ee257
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -297,10 +297,20 @@ public class StarRocksUtils {
@Override
public StarRocksColumn.Builder visit(DecimalType decimalType) {
builder.setDataType(DECIMAL);
// StarRocks does not support Decimal as primary key, so decimal should be cast to
// VARCHAR.
if (!isPrimaryKeys) {
builder.setDataType(DECIMAL);
builder.setColumnSize(decimalType.getPrecision());
builder.setDecimalDigits(decimalType.getScale());
} else {
builder.setDataType(VARCHAR);
// For a DecimalType with precision N, we may need N + 1 or N + 2 characters to store it as a
// string (one for negative sign, and one for decimal point)
builder.setColumnSize(Math.min(
decimalType.getScale() != 0? decimalType.getPrecision() + 2:decimalType.getPrecision() + 1, MAX_VARCHAR_SIZE));
}
builder.setNullable(decimalType.isNullable());
builder.setColumnSize(decimalType.getPrecision());
builder.setDecimalDigits(decimalType.getScale());
return builder;
}

@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.starrocks.sink;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.VarCharType;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
@ -75,6 +76,50 @@ public class CdcDataTypeTransformerTest {
assertTrue(smallLengthColumn.isNullable());
}
@Test
public void testDecimalForPrimaryKey() {
// Map to DECIMAL of StarRocks if column is DECIMAL type and not primary key.
StarRocksColumn.Builder noPrimaryKeyBuilder =
new StarRocksColumn.Builder().setColumnName("no_primary_key").setOrdinalPosition(0);
new DecimalType(20, 1)
.accept(new StarRocksUtils.CdcDataTypeTransformer(false, noPrimaryKeyBuilder));
StarRocksColumn noPrimaryKeyColumn = noPrimaryKeyBuilder.build();
assertEquals("no_primary_key", noPrimaryKeyColumn.getColumnName());
assertEquals(0, noPrimaryKeyColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.DECIMAL, noPrimaryKeyColumn.getDataType());
assertEquals(Integer.valueOf(20), noPrimaryKeyColumn.getColumnSize().orElse(null));
assertEquals(Integer.valueOf(1), noPrimaryKeyColumn.getDecimalDigits().get());
assertTrue(noPrimaryKeyColumn.isNullable());
// Map to VARCHAR of StarRocks if column is DECIMAL type and primary key.
StarRocksColumn.Builder primaryKeyBuilder =
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
new DecimalType(20, 1)
.notNull()
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, primaryKeyBuilder));
StarRocksColumn primaryKeyColumn = primaryKeyBuilder.build();
assertEquals("primary_key", primaryKeyColumn.getColumnName());
assertEquals(1, primaryKeyColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, primaryKeyColumn.getDataType());
assertEquals(Integer.valueOf(22), primaryKeyColumn.getColumnSize().orElse(null));
assertTrue(!primaryKeyColumn.isNullable());
// Map to VARCHAR of StarRocks if column is DECIMAL type and primary key
// DECIMAL(20,0) is common in cdc pipeline, for example, the upstream cdc source is unsigned
// BIGINT.
StarRocksColumn.Builder unsignedBigIntKeyBuilder =
new StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
new DecimalType(20, 0)
.notNull()
.accept(new StarRocksUtils.CdcDataTypeTransformer(true, unsignedBigIntKeyBuilder));
StarRocksColumn unsignedBigIntColumn = unsignedBigIntKeyBuilder.build();
assertEquals("primary_key", unsignedBigIntColumn.getColumnName());
assertEquals(1, unsignedBigIntColumn.getOrdinalPosition());
assertEquals(StarRocksUtils.VARCHAR, unsignedBigIntColumn.getDataType());
assertEquals(Integer.valueOf(21), unsignedBigIntColumn.getColumnSize().orElse(null));
assertTrue(!unsignedBigIntColumn.isNullable());
}
@Test
public void testVarCharType() {
// the length fo StarRocks should be 3 times as that of CDC if CDC length * 3 <=

Loading…
Cancel
Save