support zero-copy for fury serialization

Signed-off-by: mubai <chaokun.yck@antgroup.com>
pull/6077/head
mubai 8 months ago
parent 601d0cb776
commit 7f1c0e437c

@ -17,13 +17,10 @@ package org.redisson.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.apache.fury.Fury; import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury; import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.FuryBuilder; import org.apache.fury.config.FuryBuilder;
import org.apache.fury.config.Language; import org.apache.fury.config.Language;
import org.apache.fury.io.FuryStreamReader;
import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils; import org.apache.fury.memory.MemoryUtils;
import org.redisson.client.codec.BaseCodec; import org.redisson.client.codec.BaseCodec;
@ -32,7 +29,6 @@ import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.Encoder;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
/** /**
* <a href="https://github.com/apache/fury">Apache Fury</a> codec * <a href="https://github.com/apache/fury">Apache Fury</a> codec
@ -92,7 +88,7 @@ public class FuryCodec extends BaseCodec {
try { try {
return fury.deserialize(furyBuffer); return fury.deserialize(furyBuffer);
} finally { } finally {
buf.writerIndex(buf.writerIndex() + furyBuffer.writerIndex()); buf.readerIndex(buf.readerIndex() + furyBuffer.readerIndex());
} }
} }
}; };
@ -101,14 +97,22 @@ public class FuryCodec extends BaseCodec {
@Override @Override
public ByteBuf encode(Object in) throws IOException { public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try { MemoryBuffer furyBuffer;
ByteBufOutputStream baos = new ByteBufOutputStream(out); int remainingSize = out.capacity() - out.writerIndex();
fury.serialize(baos, in); if (out.hasArray()) {
return baos.buffer(); furyBuffer = MemoryUtils.wrap(out.array(), out.arrayOffset() + out.writerIndex(),
} catch (Exception e) { remainingSize);
out.release(); } else {
throw e; furyBuffer = MemoryUtils.buffer(out.memoryAddress() + out.writerIndex(), remainingSize);
}
int size = furyBuffer.size();
fury.serialize(furyBuffer, in);
if (furyBuffer.size() > size) {
out.writeBytes(furyBuffer.getHeapMemory(), 0, furyBuffer.size());
} else {
out.writerIndex(out.writerIndex() + furyBuffer.writerIndex());
} }
return out;
} }
}; };

Loading…
Cancel
Save