[FLINK-36509][pipeline-connector][paimon] Add partition columns to primary key

This closes  #3726.
pull/3737/head
Kunni 3 months ago committed by GitHub
parent d7d7f42c0b
commit 40db6d5eb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -153,14 +153,22 @@ public class PaimonMetadataApplier implements MetadataApplier {
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(column.getType())
.getLogicalType())));
builder.primaryKey(schema.primaryKeys().toArray(new String[0]));
List<String> partitionKeys = new ArrayList<>();
List<String> primaryKeys = schema.primaryKeys();
if (partitionMaps.containsKey(event.tableId())) {
builder.partitionKeys(partitionMaps.get(event.tableId()));
partitionKeys.addAll(partitionMaps.get(event.tableId()));
} else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) {
builder.partitionKeys(schema.partitionKeys());
partitionKeys.addAll(schema.partitionKeys());
}
builder.options(tableOptions);
builder.options(schema.options());
for (String partitionColumn : partitionKeys) {
if (!primaryKeys.contains(partitionColumn)) {
primaryKeys.add(partitionColumn);
}
}
builder.partitionKeys(partitionKeys)
.primaryKey(primaryKeys)
.options(tableOptions)
.options(schema.options());
catalog.createTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
builder.build(),

@ -177,6 +177,35 @@ public class PaimonMetadataApplierTest {
new DataField(2, "newcol3", DataTypes.STRING())));
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
// Create table with partition column.
createTableEvent =
new CreateTableEvent(
TableId.parse("test.table_with_partition"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
.physicalColumn(
"dt",
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
.primaryKey("col1")
.partitionKey("dt")
.build());
metadataApplier.applySchemaChange(createTableEvent);
tableSchema =
new RowType(
Arrays.asList(
new DataField(0, "col1", DataTypes.STRING().notNull()),
new DataField(1, "col2", DataTypes.INT()),
new DataField(2, "dt", DataTypes.INT().notNull())));
Table tableWithPartition =
catalog.getTable(Identifier.fromString("test.table_with_partition"));
Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys());
}
@ParameterizedTest
@ -217,10 +246,10 @@ public class PaimonMetadataApplierTest {
Arrays.asList(
new DataField(0, "col1", DataTypes.STRING().notNull()),
new DataField(1, "col2", DataTypes.STRING()),
new DataField(2, "col3", DataTypes.STRING()),
new DataField(3, "col4", DataTypes.STRING())));
new DataField(2, "col3", DataTypes.STRING().notNull()),
new DataField(3, "col4", DataTypes.STRING().notNull())));
Assertions.assertEquals(tableSchema, table.rowType());
Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys());
Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), table.primaryKeys());
Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys());
Assertions.assertEquals("-1", table.options().get("bucket"));
}

Loading…
Cancel
Save