[FLINK-35325][cdc-connector][paimon] Support specify column order when adding new columns to a table

This closes #3323.
pull/3348/head
joyCurry30 8 months ago committed by GitHub
parent 5b28d1a579
commit 0214166049
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -44,6 +44,24 @@ public final class AddColumnEvent implements SchemaChangeEvent {
this.addedColumns = addedColumns;
}
public static AddColumnEvent.ColumnWithPosition first(Column addColumn) {
return new ColumnWithPosition(addColumn, ColumnPosition.FIRST, null);
}
public static AddColumnEvent.ColumnWithPosition last(Column addColumn) {
return new ColumnWithPosition(addColumn, ColumnPosition.LAST, null);
}
public static AddColumnEvent.ColumnWithPosition before(
Column addColumn, String existedColumnName) {
return new ColumnWithPosition(addColumn, ColumnPosition.BEFORE, existedColumnName);
}
public static AddColumnEvent.ColumnWithPosition after(
Column addColumn, String existedColumnName) {
return new ColumnWithPosition(addColumn, ColumnPosition.AFTER, existedColumnName);
}
/** Returns the added columns. */
public List<ColumnWithPosition> getAddedColumns() {
return addedColumns;

@ -34,12 +34,16 @@ import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
/**
* A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table
* only.
@ -129,35 +133,90 @@ public class PaimonMetadataApplier implements MetadataApplier {
private void applyAddColumn(AddColumnEvent event)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getAddedColumns()
.forEach(
(column) -> {
SchemaChange tableChange =
SchemaChange.addColumn(
column.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(
column.getAddColumn().getType())
.getLogicalType()));
tableChangeList.add(tableChange);
});
List<SchemaChange> tableChangeList = applyAddColumnEventWithPosition(event);
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
}
private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
throws Catalog.TableNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
SchemaChange tableChange;
switch (columnWithPosition.getPosition()) {
case FIRST:
tableChange =
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName()));
tableChangeList.add(tableChange);
break;
case LAST:
SchemaChange schemaChangeWithLastPosition =
SchemaChangeProvider.add(columnWithPosition);
tableChangeList.add(schemaChangeWithLastPosition);
break;
case BEFORE:
SchemaChange schemaChangeWithBeforePosition =
applyAddColumnWithBeforePosition(
event.tableId().getSchemaName(),
event.tableId().getTableName(),
columnWithPosition);
tableChangeList.add(schemaChangeWithBeforePosition);
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
SchemaChange.Move after =
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
tableChangeList.add(tableChange);
break;
default:
throw new IllegalArgumentException(
"Unknown column position: " + columnWithPosition.getPosition());
}
}
return tableChangeList;
}
private SchemaChange applyAddColumnWithBeforePosition(
String schemaName,
String tableName,
AddColumnEvent.ColumnWithPosition columnWithPosition)
throws Catalog.TableNotExistException {
String existedColumnName = columnWithPosition.getExistedColumnName();
Table table = catalog.getTable(new Identifier(schemaName, tableName));
List<String> columnNames = table.rowType().getFieldNames();
int index = checkColumnPosition(existedColumnName, columnNames);
SchemaChange.Move after =
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1));
return SchemaChangeProvider.add(columnWithPosition, after);
}
private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
if (existedColumnName == null) {
return 0;
}
int index = columnNames.indexOf(existedColumnName);
checkArgument(index != -1, "Column %s not found", existedColumnName);
return index;
}
private void applyDropColumn(DropColumnEvent event)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getDroppedColumnNames()
.forEach(
(column) -> {
SchemaChange tableChange = SchemaChange.dropColumn(column);
tableChangeList.add(tableChange);
});
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
@ -170,10 +229,8 @@ public class PaimonMetadataApplier implements MetadataApplier {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getNameMapping()
.forEach(
(oldName, newName) -> {
SchemaChange tableChange = SchemaChange.renameColumn(oldName, newName);
tableChangeList.add(tableChange);
});
(oldName, newName) ->
tableChangeList.add(SchemaChangeProvider.rename(oldName, newName)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
@ -186,15 +243,9 @@ public class PaimonMetadataApplier implements MetadataApplier {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getTypeMapping()
.forEach(
(oldName, newType) -> {
SchemaChange tableChange =
SchemaChange.updateColumnType(
oldName,
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(newType)
.getLogicalType()));
tableChangeList.add(tableChange);
});
(oldName, newType) ->
tableChangeList.add(
SchemaChangeProvider.updateColumnType(oldName, newType)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.paimon.sink;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.schema.SchemaChange;
/**
* The SchemaChangeProvider class provides static methods to create SchemaChange objects that
* represent different types of schema modifications.
*/
public class SchemaChangeProvider {
/**
* Creates a SchemaChange object for adding a column without specifying its position.
*
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
* intended position within the schema.
* @return A SchemaChange object representing the addition of a column.
*/
public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment());
}
/**
* Creates a SchemaChange object for adding a column with a specified position.
*
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
* intended position within the schema.
* @param move The move operation to indicate the column's new position.
* @return A SchemaChange object representing the addition of a column with position
* information.
*/
public static SchemaChange add(
AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment(),
move);
}
/**
* Creates a SchemaChange object to update the data type of a column.
*
* @param oldColumnName The name of the column whose data type is to be updated.
* @param newType The new DataType for the column.
* @return A SchemaChange object representing the update of the column's data type.
*/
public static SchemaChange updateColumnType(String oldColumnName, DataType newType) {
return SchemaChange.updateColumnType(
oldColumnName,
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
}
/**
* Creates a SchemaChange object for renaming a column.
*
* @param oldColumnName The current name of the column to be renamed.
* @param newColumnName The new name for the column.
* @return A SchemaChange object representing the renaming of a column.
*/
public static SchemaChange rename(String oldColumnName, String newColumnName) {
return SchemaChange.renameColumn(oldColumnName, newColumnName);
}
/**
* Creates a SchemaChange object for dropping a column.
*
* @param columnName The name of the column to be dropped.
* @return A SchemaChange object representing the deletion of a column.
*/
public static SchemaChange drop(String columnName) {
return SchemaChange.dropColumn(columnName);
}
}

@ -333,4 +333,87 @@ public class PaimonMetadataApplierTest {
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
}
@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testAddColumnWithPosition(String metastore)
throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
initialize(metastore);
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
CreateTableEvent createTableEvent =
new CreateTableEvent(
TableId.parse("test.table1"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent);
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"col3",
org.apache.flink.cdc.common.types.DataTypes
.STRING()))); // default last position.
AddColumnEvent addColumnEvent =
new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
metadataApplier.applySchemaChange(addColumnEvent);
RowType tableSchema =
new RowType(
Arrays.asList(
new DataField(0, "col1", DataTypes.STRING().notNull()),
new DataField(1, "col2", DataTypes.INT()),
new DataField(2, "col3", DataTypes.STRING())));
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
addedColumns.clear();
addedColumns.add(
AddColumnEvent.first(
Column.physicalColumn(
"col4_first",
org.apache.flink.cdc.common.types.DataTypes.STRING())));
addedColumns.add(
AddColumnEvent.last(
Column.physicalColumn(
"col5_last",
org.apache.flink.cdc.common.types.DataTypes.STRING())));
addedColumns.add(
AddColumnEvent.before(
Column.physicalColumn(
"col6_before",
org.apache.flink.cdc.common.types.DataTypes.STRING()),
"col2"));
addedColumns.add(
AddColumnEvent.after(
Column.physicalColumn(
"col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()),
"col2"));
addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
metadataApplier.applySchemaChange(addColumnEvent);
tableSchema =
new RowType(
Arrays.asList(
new DataField(3, "col4_first", DataTypes.STRING()),
new DataField(0, "col1", DataTypes.STRING().notNull()),
new DataField(5, "col6_before", DataTypes.STRING()),
new DataField(1, "col2", DataTypes.INT()),
new DataField(6, "col7_after", DataTypes.STRING()),
new DataField(2, "col3", DataTypes.STRING()),
new DataField(4, "col5_last", DataTypes.STRING())));
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
}
}

Loading…
Cancel
Save