diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index c045ab6d4..74601ba13 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -213,28 +213,23 @@ public class PaimonMetadataApplier implements MetadataApplier { try { List tableChangeList = new ArrayList<>(); for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { - SchemaChange tableChange; switch (columnWithPosition.getPosition()) { case FIRST: - tableChange = + tableChangeList.addAll( SchemaChangeProvider.add( columnWithPosition, SchemaChange.Move.first( - columnWithPosition.getAddColumn().getName())); - tableChangeList.add(tableChange); + columnWithPosition.getAddColumn().getName()))); break; case LAST: - SchemaChange schemaChangeWithLastPosition = - SchemaChangeProvider.add(columnWithPosition); - tableChangeList.add(schemaChangeWithLastPosition); + tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition)); break; case BEFORE: - SchemaChange schemaChangeWithBeforePosition = + tableChangeList.addAll( applyAddColumnWithBeforePosition( event.tableId().getSchemaName(), event.tableId().getTableName(), - columnWithPosition); - tableChangeList.add(schemaChangeWithBeforePosition); + columnWithPosition)); break; case AFTER: checkNotNull( @@ -244,8 +239,7 @@ public class PaimonMetadataApplier implements MetadataApplier { SchemaChange.Move.after( columnWithPosition.getAddColumn().getName(), columnWithPosition.getExistedColumnName()); - tableChange = SchemaChangeProvider.add(columnWithPosition, after); - tableChangeList.add(tableChange); + tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after)); break; default: throw new SchemaEvolveException( @@ -259,7 +253,7 @@ public class PaimonMetadataApplier implements MetadataApplier { } } - private SchemaChange applyAddColumnWithBeforePosition( + private List applyAddColumnWithBeforePosition( String schemaName, String tableName, AddColumnEvent.ColumnWithPosition columnWithPosition) @@ -288,7 +282,7 @@ public class PaimonMetadataApplier implements MetadataApplier { try { List tableChangeList = new ArrayList<>(); event.getDroppedColumnNames() - .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); + .forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column))); catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException @@ -299,12 +293,19 @@ public class PaimonMetadataApplier implements MetadataApplier { private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException { try { + Map options = + catalog.getTable( + new Identifier( + event.tableId().getSchemaName(), + event.tableId().getTableName())) + .options(); List tableChangeList = new ArrayList<>(); event.getNameMapping() .forEach( (oldName, newName) -> - tableChangeList.add( - SchemaChangeProvider.rename(oldName, newName))); + tableChangeList.addAll( + SchemaChangeProvider.rename( + oldName, newName, options))); catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java index 0f966c0a9..8e0fdb00c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -18,12 +18,19 @@ package org.apache.flink.cdc.connectors.paimon.sink; import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.schema.SchemaChange; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + /** * The SchemaChangeProvider class provides static methods to create SchemaChange objects that * represent different types of schema modifications. @@ -37,13 +44,31 @@ public class SchemaChangeProvider { * intended position within the schema. * @return A SchemaChange object representing the addition of a column. */ - public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) { - return SchemaChange.addColumn( - columnWithPosition.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) - .getLogicalType()), - columnWithPosition.getAddColumn().getComment()); + public static List add(AddColumnEvent.ColumnWithPosition columnWithPosition) { + List result = new ArrayList<>(); + result.add( + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType( + columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment())); + // if default value express exists, we need to set the default value to the table + // option + Column column = columnWithPosition.getAddColumn(); + Optional.ofNullable(column.getDefaultValueExpression()) + .ifPresent( + value -> { + String key = + String.format( + "%s.%s.%s", + CoreOptions.FIELDS_PREFIX, + column.getName(), + CoreOptions.DEFAULT_VALUE_SUFFIX); + result.add(SchemaChangeProvider.setOption(key, value)); + }); + return result; } /** @@ -55,15 +80,33 @@ public class SchemaChangeProvider { * @return A SchemaChange object representing the addition of a column with position * information. */ - public static SchemaChange add( + public static List add( AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) { - return SchemaChange.addColumn( - columnWithPosition.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) - .getLogicalType()), - columnWithPosition.getAddColumn().getComment(), - move); + List result = new ArrayList<>(); + result.add( + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType( + columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment(), + move)); + // if default value express exists, we need to set the default value to the table + // option + Column column = columnWithPosition.getAddColumn(); + Optional.ofNullable(column.getDefaultValueExpression()) + .ifPresent( + value -> { + String key = + String.format( + "%s.%s.%s", + CoreOptions.FIELDS_PREFIX, + column.getName(), + CoreOptions.DEFAULT_VALUE_SUFFIX); + result.add(SchemaChangeProvider.setOption(key, value)); + }); + return result; } /** @@ -87,8 +130,16 @@ public class SchemaChangeProvider { * @param newColumnName The new name for the column. * @return A SchemaChange object representing the renaming of a column. */ - public static SchemaChange rename(String oldColumnName, String newColumnName) { - return SchemaChange.renameColumn(oldColumnName, newColumnName); + public static List rename( + String oldColumnName, String newColumnName, Map options) { + List result = new ArrayList<>(); + result.add(SchemaChange.renameColumn(oldColumnName, newColumnName)); + String defaultValue = options.get(defaultValueOptionKey(oldColumnName)); + if (defaultValue != null) { + result.add(SchemaChange.removeOption(defaultValueOptionKey(oldColumnName))); + result.add(SchemaChange.setOption(defaultValueOptionKey(newColumnName), defaultValue)); + } + return result; } /** @@ -97,7 +148,27 @@ public class SchemaChangeProvider { * @param columnName The name of the column to be dropped. * @return A SchemaChange object representing the deletion of a column. */ - public static SchemaChange drop(String columnName) { - return SchemaChange.dropColumn(columnName); + public static List drop(String columnName) { + List result = new ArrayList<>(); + result.add(SchemaChange.dropColumn(columnName)); + result.add(SchemaChange.removeOption(defaultValueOptionKey(columnName))); + return result; + } + + public static String defaultValueOptionKey(String columnName) { + return String.format( + "%s.%s.%s", + CoreOptions.FIELDS_PREFIX, columnName, CoreOptions.DEFAULT_VALUE_SUFFIX); + } + + /** + * Creates a SchemaChange object for setting an option. + * + * @param key The key of the option to be set. + * @param value The value of the option to be set. + * @return A SchemaChange object representing the setting of an option. + */ + public static SchemaChange setOption(String key, String value) { + return SchemaChange.setOption(key, value); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 7b362ee0d..07ddd3ec3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -124,7 +124,10 @@ public class PaimonMetadataApplierTest { addedColumns.add( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( - "col3", org.apache.flink.cdc.common.types.DataTypes.STRING()))); + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + null, + "col3DefValue"))); AddColumnEvent addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns); metadataApplier.applySchemaChange(addColumnEvent); @@ -137,6 +140,12 @@ public class PaimonMetadataApplierTest { Assertions.assertEquals( tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + Assertions.assertEquals( + "col3DefValue", + catalog.getTable(Identifier.fromString("test.table1")) + .options() + .get("fields.col3.default-value")); + Map nameMapping = new HashMap<>(); nameMapping.put("col2", "newcol2"); nameMapping.put("col3", "newcol3");