From 72d683e17beb9a9f3ebc8aaa762cfaf95e7c8396 Mon Sep 17 00:00:00 2001 From: mubai Date: Thu, 15 Aug 2024 00:10:07 +0800 Subject: [PATCH] check nioBufferCount and memoryAddress Signed-off-by: mubai --- .../java/org/redisson/codec/FuryCodec.java | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/codec/FuryCodec.java b/redisson/src/main/java/org/redisson/codec/FuryCodec.java index 6e0a21f9e..32546a145 100644 --- a/redisson/src/main/java/org/redisson/codec/FuryCodec.java +++ b/redisson/src/main/java/org/redisson/codec/FuryCodec.java @@ -17,10 +17,13 @@ package org.redisson.codec; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.ByteBufOutputStream; import org.apache.fury.Fury; import org.apache.fury.ThreadSafeFury; import org.apache.fury.config.FuryBuilder; import org.apache.fury.config.Language; +import org.apache.fury.io.FuryStreamReader; import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryUtils; import org.redisson.client.codec.BaseCodec; @@ -84,11 +87,15 @@ public class FuryCodec extends BaseCodec { private final Decoder decoder = new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { - MemoryBuffer furyBuffer = MemoryUtils.wrap(buf.nioBuffer()); - try { - return fury.deserialize(furyBuffer); - } finally { - buf.readerIndex(buf.readerIndex() + furyBuffer.readerIndex()); + if (buf.nioBufferCount() == 1) { + MemoryBuffer furyBuffer = MemoryUtils.wrap(buf.nioBuffer()); + try { + return fury.deserialize(furyBuffer); + } finally { + buf.readerIndex(buf.readerIndex() + furyBuffer.readerIndex()); + } + } else { + return fury.deserialize(FuryStreamReader.of(new ByteBufInputStream(buf))); } } }; @@ -97,22 +104,34 @@ public class FuryCodec extends BaseCodec { @Override public ByteBuf encode(Object in) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - MemoryBuffer furyBuffer; + MemoryBuffer furyBuffer = null; int remainingSize = out.capacity() - out.writerIndex(); if (out.hasArray()) { furyBuffer = MemoryUtils.wrap(out.array(), out.arrayOffset() + out.writerIndex(), remainingSize); - } else { + } else if (out.hasMemoryAddress()) { furyBuffer = MemoryUtils.buffer(out.memoryAddress() + out.writerIndex(), remainingSize); } - int size = furyBuffer.size(); - fury.serialize(furyBuffer, in); - if (furyBuffer.size() > size) { - out.writeBytes(furyBuffer.getHeapMemory(), 0, furyBuffer.size()); + if (furyBuffer != null) { + int size = furyBuffer.size(); + fury.serialize(furyBuffer, in); + if (furyBuffer.size() > size) { + out.writeBytes(furyBuffer.getHeapMemory(), 0, furyBuffer.size()); + } else { + out.writerIndex(out.writerIndex() + furyBuffer.writerIndex()); + } + return out; } else { - out.writerIndex(out.writerIndex() + furyBuffer.writerIndex()); + try { + ByteBufOutputStream baos = new ByteBufOutputStream(out); + fury.serialize(baos, in); + return baos.buffer(); + } catch (Exception e) { + out.release(); + throw e; + } + } - return out; } };