[FLINK-36852][pipeline-connector-paimon] Set independent MemoryPoolFactory for each table to avoid StackOverflow error.

pull/3778/head
lvyanquan 2 months ago
parent 06154e9674
commit 2d24ed0c4a

@ -56,9 +56,6 @@ public class PaimonWriter<InputT>
// Each job can only have one user name and this name must be consistent across restarts. // Each job can only have one user name and this name must be consistent across restarts.
private final String commitUser; private final String commitUser;
// all table write should share one write buffer so that writers can preempt memory
// from those of other tables
private MemoryPoolFactory memoryPoolFactory;
// deserializer that converts Input into PaimonEvent. // deserializer that converts Input into PaimonEvent.
private final PaimonRecordSerializer<InputT> serializer; private final PaimonRecordSerializer<InputT> serializer;
@ -127,38 +124,37 @@ public class PaimonWriter<InputT>
} }
} }
if (paimonEvent.getGenericRow() != null) { if (paimonEvent.getGenericRow() != null) {
FileStoreTable table; StoreSinkWrite write = getWrite(tableId);
table = getTable(tableId); try {
if (memoryPoolFactory == null) { write.write(paimonEvent.getGenericRow(), paimonEvent.getBucket());
memoryPoolFactory = } catch (Exception e) {
throw new IOException(e);
}
}
}
private StoreSinkWrite getWrite(Identifier tableId) {
FileStoreTable table = getTable(tableId);
if (writes.containsKey(tableId)) {
return writes.get(tableId);
} else {
MemoryPoolFactory memoryPoolFactory =
new MemoryPoolFactory( new MemoryPoolFactory(
// currently, the options of all tables are the same in CDC
new HeapMemorySegmentPool( new HeapMemorySegmentPool(
table.coreOptions().writeBufferSize(), table.coreOptions().writeBufferSize(),
table.coreOptions().pageSize())); table.coreOptions().pageSize()));
}
StoreSinkWrite write =
writes.computeIfAbsent(
tableId,
id -> {
StoreSinkWriteImpl storeSinkWrite = StoreSinkWriteImpl storeSinkWrite =
new StoreSinkWriteImpl( new StoreSinkWriteImpl(
table, table,
commitUser, commitUser,
ioManager, ioManager,
false, false,
false, true,
true, true,
memoryPoolFactory, memoryPoolFactory,
metricGroup); metricGroup);
storeSinkWrite.withCompactExecutor(compactExecutor); storeSinkWrite.withCompactExecutor(compactExecutor);
return storeSinkWrite; return storeSinkWrite;
});
try {
write.write(paimonEvent.getGenericRow(), paimonEvent.getBucket());
} catch (Exception e) {
throw new IOException(e);
}
} }
} }

Loading…
Cancel
Save