check nioBufferCount and memoryAddress

Signed-off-by: mubai <chaokun.yck@antgroup.com>
pull/6077/head
mubai 8 months ago
parent 7f1c0e437c
commit 72d683e17b

@ -17,10 +17,13 @@ package org.redisson.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.apache.fury.Fury; import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury; import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.FuryBuilder; import org.apache.fury.config.FuryBuilder;
import org.apache.fury.config.Language; import org.apache.fury.config.Language;
import org.apache.fury.io.FuryStreamReader;
import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.memory.MemoryUtils; import org.apache.fury.memory.MemoryUtils;
import org.redisson.client.codec.BaseCodec; import org.redisson.client.codec.BaseCodec;
@ -84,11 +87,15 @@ public class FuryCodec extends BaseCodec {
private final Decoder<Object> decoder = new Decoder<Object>() { private final Decoder<Object> decoder = new Decoder<Object>() {
@Override @Override
public Object decode(ByteBuf buf, State state) throws IOException { public Object decode(ByteBuf buf, State state) throws IOException {
MemoryBuffer furyBuffer = MemoryUtils.wrap(buf.nioBuffer()); if (buf.nioBufferCount() == 1) {
try { MemoryBuffer furyBuffer = MemoryUtils.wrap(buf.nioBuffer());
return fury.deserialize(furyBuffer); try {
} finally { return fury.deserialize(furyBuffer);
buf.readerIndex(buf.readerIndex() + furyBuffer.readerIndex()); } finally {
buf.readerIndex(buf.readerIndex() + furyBuffer.readerIndex());
}
} else {
return fury.deserialize(FuryStreamReader.of(new ByteBufInputStream(buf)));
} }
} }
}; };
@ -97,22 +104,34 @@ public class FuryCodec extends BaseCodec {
@Override @Override
public ByteBuf encode(Object in) throws IOException { public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
MemoryBuffer furyBuffer; MemoryBuffer furyBuffer = null;
int remainingSize = out.capacity() - out.writerIndex(); int remainingSize = out.capacity() - out.writerIndex();
if (out.hasArray()) { if (out.hasArray()) {
furyBuffer = MemoryUtils.wrap(out.array(), out.arrayOffset() + out.writerIndex(), furyBuffer = MemoryUtils.wrap(out.array(), out.arrayOffset() + out.writerIndex(),
remainingSize); remainingSize);
} else { } else if (out.hasMemoryAddress()) {
furyBuffer = MemoryUtils.buffer(out.memoryAddress() + out.writerIndex(), remainingSize); furyBuffer = MemoryUtils.buffer(out.memoryAddress() + out.writerIndex(), remainingSize);
} }
int size = furyBuffer.size(); if (furyBuffer != null) {
fury.serialize(furyBuffer, in); int size = furyBuffer.size();
if (furyBuffer.size() > size) { fury.serialize(furyBuffer, in);
out.writeBytes(furyBuffer.getHeapMemory(), 0, furyBuffer.size()); if (furyBuffer.size() > size) {
out.writeBytes(furyBuffer.getHeapMemory(), 0, furyBuffer.size());
} else {
out.writerIndex(out.writerIndex() + furyBuffer.writerIndex());
}
return out;
} else { } else {
out.writerIndex(out.writerIndex() + furyBuffer.writerIndex()); try {
ByteBufOutputStream baos = new ByteBufOutputStream(out);
fury.serialize(baos, in);
return baos.buffer();
} catch (Exception e) {
out.release();
throw e;
}
} }
return out;
} }
}; };

Loading…
Cancel
Save