[FLINK-36524][pipeline-connector][paimon] Bump Paimon version to 0.9.0

This closes #3644
pull/3846/head
Kunni 3 weeks ago committed by GitHub
parent fc71888d7a
commit d3c049d8a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -29,7 +29,7 @@ limitations under the License.
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
<properties>
<paimon.version>0.8.2</paimon.version>
<paimon.version>0.9.0</paimon.version>
<hadoop.version>2.8.5</hadoop.version>
<hive.version>2.3.9</hive.version>
<mockito.version>3.4.6</mockito.version>

@ -69,6 +69,8 @@ public class PaimonDataSinkFactory implements DataSinkFactory {
}
});
Options options = Options.fromMap(catalogOptions);
// Avoid using previous table schema.
options.setString("cache-enabled", "false");
try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options)) {
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");

@ -45,8 +45,8 @@ public class PaimonCommitter implements Committer<MultiTableCommittable> {
storeMultiCommitter =
new StoreMultiCommitter(
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
commitUser,
null);
org.apache.paimon.flink.sink.Committer.createContext(
commitUser, null, true, false, null));
}
@Override

@ -142,6 +142,9 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
write.withCompactExecutor(compactExecutor);
}
@Override
public void withInsertOnly(boolean b) {}
@Override
public SinkRecord write(InternalRow internalRow) throws Exception {
return write.writeAndReturn(internalRow);

@ -151,7 +151,7 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event>
dataChangeEvent,
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
switch (tuple4.f0) {
case DYNAMIC:
case HASH_DYNAMIC:
{
bucket =
tuple4.f2.assign(
@ -159,18 +159,18 @@ public class BucketAssignOperator extends AbstractStreamOperator<Event>
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
break;
}
case FIXED:
case HASH_FIXED:
{
tuple4.f1.setRecord(genericRow);
bucket = tuple4.f1.bucket();
break;
}
case UNAWARE:
case BUCKET_UNAWARE:
{
bucket = 0;
break;
}
case GLOBAL_DYNAMIC:
case CROSS_PARTITION:
default:
{
throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);

@ -87,6 +87,7 @@ public class PaimonMetadataApplierTest {
}
catalogOptions.setString("metastore", metastore);
catalogOptions.setString("warehouse", warehouse);
catalogOptions.setString("cache-enabled", "false");
this.catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.catalog.dropDatabase(TEST_DATABASE, true, true);
}
@ -206,6 +207,30 @@ public class PaimonMetadataApplierTest {
catalog.getTable(Identifier.fromString("test.table_with_partition"));
Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys());
// Create table with upper case.
catalogOptions.setString("allow-upper-case", "true");
metadataApplier = new PaimonMetadataApplier(catalogOptions);
createTableEvent =
new CreateTableEvent(
TableId.parse("test.table_with_upper_case"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"COL1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
.primaryKey("COL1")
.build());
metadataApplier.applySchemaChange(createTableEvent);
tableSchema =
new RowType(
Arrays.asList(
new DataField(0, "COL1", DataTypes.STRING().notNull()),
new DataField(1, "col2", DataTypes.INT())));
Assertions.assertEquals(
tableSchema,
catalog.getTable(Identifier.fromString("test.table_with_upper_case")).rowType());
}
@ParameterizedTest

@ -127,7 +127,7 @@ public class PaimonSinkITCase {
+ "'metastore'='hive', "
+ "'hadoop-conf-dir'='%s', "
+ "'hive-conf-dir'='%s', "
+ "'cache-enabled'='false' "
+ "'cache-enabled'='false'"
+ ")",
warehouse, HADOOP_CONF_DIR, HIVE_CONF_DIR));
} else {

Loading…
Cancel
Save