Merge pull request #6077 from chaokunyang/fury_zero_copy_deserialization

wrap ByteBuf into Fury Buffer for zero-copy deserialization
pull/6098/head
Nikita Koksharov 6 months ago committed by GitHub
commit b92a99ff66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,6 +24,8 @@ import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.FuryBuilder;
import org.apache.fury.config.Language;
import org.apache.fury.io.FuryStreamReader;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -85,7 +87,16 @@ public class FuryCodec extends BaseCodec {
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return fury.deserialize(FuryStreamReader.of(new ByteBufInputStream(buf)));
if (buf.nioBufferCount() == 1) {
MemoryBuffer furyBuffer = MemoryUtils.wrap(buf.nioBuffer());
try {
return fury.deserialize(furyBuffer);
} finally {
buf.readerIndex(buf.readerIndex() + furyBuffer.readerIndex());
}
} else {
return fury.deserialize(FuryStreamReader.of(new ByteBufInputStream(buf)));
}
}
};
@ -93,13 +104,33 @@ public class FuryCodec extends BaseCodec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream baos = new ByteBufOutputStream(out);
fury.serialize(baos, in);
return baos.buffer();
} catch (Exception e) {
out.release();
throw e;
MemoryBuffer furyBuffer = null;
int remainingSize = out.capacity() - out.writerIndex();
if (out.hasArray()) {
furyBuffer = MemoryUtils.wrap(out.array(), out.arrayOffset() + out.writerIndex(),
remainingSize);
} else if (out.hasMemoryAddress()) {
furyBuffer = MemoryUtils.buffer(out.memoryAddress() + out.writerIndex(), remainingSize);
}
if (furyBuffer != null) {
int size = furyBuffer.size();
fury.serialize(furyBuffer, in);
if (furyBuffer.size() > size) {
out.writeBytes(furyBuffer.getHeapMemory(), 0, furyBuffer.size());
} else {
out.writerIndex(out.writerIndex() + furyBuffer.writerIndex());
}
return out;
} else {
try {
ByteBufOutputStream baos = new ByteBufOutputStream(out);
fury.serialize(baos, in);
return baos.buffer();
} catch (Exception e) {
out.release();
throw e;
}
}
}
};

Loading…
Cancel
Save