[FLINK-36977][pipeline-connector/paimon] Apply default value when process add_column schema change envent

This closes #3824.

Co-authored-by: Leonard Xu <xbjtdcq@gmail.com>
pull/3810/merge
hiliuxg 2 weeks ago committed by GitHub
parent 551bdf395d
commit 8b554458d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -213,28 +213,23 @@ public class PaimonMetadataApplier implements MetadataApplier {
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
SchemaChange tableChange;
switch (columnWithPosition.getPosition()) {
case FIRST:
tableChange =
tableChangeList.addAll(
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName()));
tableChangeList.add(tableChange);
columnWithPosition.getAddColumn().getName())));
break;
case LAST:
SchemaChange schemaChangeWithLastPosition =
SchemaChangeProvider.add(columnWithPosition);
tableChangeList.add(schemaChangeWithLastPosition);
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition));
break;
case BEFORE:
SchemaChange schemaChangeWithBeforePosition =
tableChangeList.addAll(
applyAddColumnWithBeforePosition(
event.tableId().getSchemaName(),
event.tableId().getTableName(),
columnWithPosition);
tableChangeList.add(schemaChangeWithBeforePosition);
columnWithPosition));
break;
case AFTER:
checkNotNull(
@ -244,8 +239,7 @@ public class PaimonMetadataApplier implements MetadataApplier {
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
tableChangeList.add(tableChange);
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after));
break;
default:
throw new SchemaEvolveException(
@ -259,7 +253,7 @@ public class PaimonMetadataApplier implements MetadataApplier {
}
}
private SchemaChange applyAddColumnWithBeforePosition(
private List<SchemaChange> applyAddColumnWithBeforePosition(
String schemaName,
String tableName,
AddColumnEvent.ColumnWithPosition columnWithPosition)
@ -288,7 +282,7 @@ public class PaimonMetadataApplier implements MetadataApplier {
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getDroppedColumnNames()
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
.forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column)));
catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
@ -299,12 +293,19 @@ public class PaimonMetadataApplier implements MetadataApplier {
private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException {
try {
Map<String, String> options =
catalog.getTable(
new Identifier(
event.tableId().getSchemaName(),
event.tableId().getTableName()))
.options();
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getNameMapping()
.forEach(
(oldName, newName) ->
tableChangeList.add(
SchemaChangeProvider.rename(oldName, newName)));
tableChangeList.addAll(
SchemaChangeProvider.rename(
oldName, newName, options)));
catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException

@ -18,12 +18,19 @@
package org.apache.flink.cdc.connectors.paimon.sink;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.schema.SchemaChange;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* The SchemaChangeProvider class provides static methods to create SchemaChange objects that
* represent different types of schema modifications.
@ -37,13 +44,31 @@ public class SchemaChangeProvider {
* intended position within the schema.
* @return A SchemaChange object representing the addition of a column.
*/
public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment());
public static List<SchemaChange> add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
List<SchemaChange> result = new ArrayList<>();
result.add(
SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(
columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment()));
// if default value express exists, we need to set the default value to the table
// option
Column column = columnWithPosition.getAddColumn();
Optional.ofNullable(column.getDefaultValueExpression())
.ifPresent(
value -> {
String key =
String.format(
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX,
column.getName(),
CoreOptions.DEFAULT_VALUE_SUFFIX);
result.add(SchemaChangeProvider.setOption(key, value));
});
return result;
}
/**
@ -55,15 +80,33 @@ public class SchemaChangeProvider {
* @return A SchemaChange object representing the addition of a column with position
* information.
*/
public static SchemaChange add(
public static List<SchemaChange> add(
AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment(),
move);
List<SchemaChange> result = new ArrayList<>();
result.add(
SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(
columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment(),
move));
// if default value express exists, we need to set the default value to the table
// option
Column column = columnWithPosition.getAddColumn();
Optional.ofNullable(column.getDefaultValueExpression())
.ifPresent(
value -> {
String key =
String.format(
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX,
column.getName(),
CoreOptions.DEFAULT_VALUE_SUFFIX);
result.add(SchemaChangeProvider.setOption(key, value));
});
return result;
}
/**
@ -87,8 +130,16 @@ public class SchemaChangeProvider {
* @param newColumnName The new name for the column.
* @return A SchemaChange object representing the renaming of a column.
*/
public static SchemaChange rename(String oldColumnName, String newColumnName) {
return SchemaChange.renameColumn(oldColumnName, newColumnName);
public static List<SchemaChange> rename(
String oldColumnName, String newColumnName, Map<String, String> options) {
List<SchemaChange> result = new ArrayList<>();
result.add(SchemaChange.renameColumn(oldColumnName, newColumnName));
String defaultValue = options.get(defaultValueOptionKey(oldColumnName));
if (defaultValue != null) {
result.add(SchemaChange.removeOption(defaultValueOptionKey(oldColumnName)));
result.add(SchemaChange.setOption(defaultValueOptionKey(newColumnName), defaultValue));
}
return result;
}
/**
@ -97,7 +148,27 @@ public class SchemaChangeProvider {
* @param columnName The name of the column to be dropped.
* @return A SchemaChange object representing the deletion of a column.
*/
public static SchemaChange drop(String columnName) {
return SchemaChange.dropColumn(columnName);
public static List<SchemaChange> drop(String columnName) {
List<SchemaChange> result = new ArrayList<>();
result.add(SchemaChange.dropColumn(columnName));
result.add(SchemaChange.removeOption(defaultValueOptionKey(columnName)));
return result;
}
public static String defaultValueOptionKey(String columnName) {
return String.format(
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX, columnName, CoreOptions.DEFAULT_VALUE_SUFFIX);
}
/**
* Creates a SchemaChange object for setting an option.
*
* @param key The key of the option to be set.
* @param value The value of the option to be set.
* @return A SchemaChange object representing the setting of an option.
*/
public static SchemaChange setOption(String key, String value) {
return SchemaChange.setOption(key, value);
}
}

@ -124,7 +124,10 @@ public class PaimonMetadataApplierTest {
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"col3", org.apache.flink.cdc.common.types.DataTypes.STRING())));
"col3",
org.apache.flink.cdc.common.types.DataTypes.STRING(),
null,
"col3DefValue")));
AddColumnEvent addColumnEvent =
new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
metadataApplier.applySchemaChange(addColumnEvent);
@ -137,6 +140,12 @@ public class PaimonMetadataApplierTest {
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
Assertions.assertEquals(
"col3DefValue",
catalog.getTable(Identifier.fromString("test.table1"))
.options()
.get("fields.col3.default-value"));
Map<String, String> nameMapping = new HashMap<>();
nameMapping.put("col2", "newcol2");
nameMapping.put("col3", "newcol3");

Loading…
Cancel
Save