From 40db6d5eb7661696797acba30384e37e56cdac5a Mon Sep 17 00:00:00 2001 From: Kunni Date: Mon, 18 Nov 2024 17:25:02 +0800 Subject: [PATCH] [FLINK-36509][pipeline-connector][paimon] Add partition columns to primary key This closes #3726. --- .../paimon/sink/PaimonMetadataApplier.java | 18 +++++++--- .../sink/PaimonMetadataApplierTest.java | 35 +++++++++++++++++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 3ae3c9c4c..22d5f4922 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -153,14 +153,22 @@ public class PaimonMetadataApplier implements MetadataApplier { LogicalTypeConversion.toDataType( DataTypeUtils.toFlinkDataType(column.getType()) .getLogicalType()))); - builder.primaryKey(schema.primaryKeys().toArray(new String[0])); + List partitionKeys = new ArrayList<>(); + List primaryKeys = schema.primaryKeys(); if (partitionMaps.containsKey(event.tableId())) { - builder.partitionKeys(partitionMaps.get(event.tableId())); + partitionKeys.addAll(partitionMaps.get(event.tableId())); } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) { - builder.partitionKeys(schema.partitionKeys()); + partitionKeys.addAll(schema.partitionKeys()); } - builder.options(tableOptions); - builder.options(schema.options()); + for (String partitionColumn : partitionKeys) { + if (!primaryKeys.contains(partitionColumn)) { + primaryKeys.add(partitionColumn); + } + } + builder.partitionKeys(partitionKeys) + .primaryKey(primaryKeys) + .options(tableOptions) + .options(schema.options()); catalog.createTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), builder.build(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 214f2051e..9f3cd806c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -177,6 +177,35 @@ public class PaimonMetadataApplierTest { new DataField(2, "newcol3", DataTypes.STRING()))); Assertions.assertEquals( tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + + // Create table with partition column. + createTableEvent = + new CreateTableEvent( + TableId.parse("test.table_with_partition"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "dt", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .primaryKey("col1") + .partitionKey("dt") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "dt", DataTypes.INT().notNull()))); + Table tableWithPartition = + catalog.getTable(Identifier.fromString("test.table_with_partition")); + Assertions.assertEquals(tableSchema, tableWithPartition.rowType()); + Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys()); } @ParameterizedTest @@ -217,10 +246,10 @@ public class PaimonMetadataApplierTest { Arrays.asList( new DataField(0, "col1", DataTypes.STRING().notNull()), new DataField(1, "col2", DataTypes.STRING()), - new DataField(2, "col3", DataTypes.STRING()), - new DataField(3, "col4", DataTypes.STRING()))); + new DataField(2, "col3", DataTypes.STRING().notNull()), + new DataField(3, "col4", DataTypes.STRING().notNull()))); Assertions.assertEquals(tableSchema, table.rowType()); - Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys()); + Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), table.primaryKeys()); Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys()); Assertions.assertEquals("-1", table.options().get("bucket")); }