Merge pull request #1025 from sulake/bytebuf_cleanup

Ensure bytebufs are released if various encoders throw exception.
pull/1027/head
Nikita Koksharov 8 years ago committed by GitHub
commit 93473bbefe

@ -36,8 +36,9 @@ public class ByteArrayCodec implements Codec {
private final Encoder encoder = new Encoder() {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
out.writeBytes((byte[])in);
byte[] payload = (byte[])in;
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length);
out.writeBytes(payload);
return out;
}
};

@ -50,9 +50,14 @@ public class JsonJacksonMapCodec extends JsonJacksonCodec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
mapper.writeValue(os, in);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
}
}
};

@ -18,6 +18,7 @@ package org.redisson.client.codec;
import java.io.IOException;
import java.nio.charset.Charset;
import io.netty.buffer.ByteBufUtil;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
@ -40,10 +41,17 @@ public class StringCodec implements Codec {
private final Encoder encoder = new Encoder() {
@Override
public ByteBuf encode(Object in) throws IOException {
if (CharsetUtil.UTF_8.equals(charset)) {
String payload = in.toString();
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(ByteBufUtil.utf8MaxBytes(payload));
ByteBufUtil.writeUtf8(buf, payload);
return buf;
} else {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
out.writeCharSequence(in.toString(), charset);
return out;
}
}
};
private final Decoder<Object> decoder = new Decoder<Object>() {

@ -17,7 +17,7 @@ package org.redisson.client.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.netty.buffer.ByteBufUtil;
/**
*
@ -29,16 +29,18 @@ public class DefaultParamsEncoder implements Encoder {
@Override
public ByteBuf encode(Object in) {
if (in instanceof byte[]) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
buf.writeBytes((byte[])in);
return buf;
byte[] payload = (byte[])in;
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length);
out.writeBytes(payload);
return out;
}
if (in instanceof ByteBuf) {
return (ByteBuf) in;
}
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
buf.writeCharSequence(in.toString(), CharsetUtil.UTF_8);
String payload = in.toString();
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(ByteBufUtil.utf8MaxBytes(payload));
ByteBufUtil.writeUtf8(buf, payload);
return buf;
}

@ -81,11 +81,16 @@ public class FstCodec implements Codec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
FSTObjectOutput oos = config.getObjectOutput(os);
oos.writeObject(in);
oos.flush();
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
}
}
};

@ -70,9 +70,14 @@ public class JsonJacksonCodec implements Codec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
mapObjectMapper.writeValue(os, in);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
}
}
};

@ -129,8 +129,8 @@ public class KryoCodec implements Codec {
@Override
public ByteBuf encode(Object in) throws IOException {
Kryo kryo = null;
try {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream baos = new ByteBufOutputStream(out);
Output output = new Output(baos);
kryo = kryoPool.get();
@ -138,6 +138,7 @@ public class KryoCodec implements Codec {
output.close();
return baos.buffer();
} catch (Exception e) {
out.release();
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}

@ -61,14 +61,13 @@ public class LZ4Codec implements Codec {
public Object decode(ByteBuf buf, State state) throws IOException {
int decompressSize = buf.readInt();
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(decompressSize);
try {
LZ4SafeDecompressor decompressor = factory.safeDecompressor();
ByteBuffer outBuffer = out.internalNioBuffer(out.writerIndex(), out.writableBytes());
int pos = outBuffer.position();
decompressor.decompress(buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()), outBuffer);
int compressedLength = outBuffer.position() - pos;
out.writerIndex(compressedLength);
try {
return innerCodec.getValueDecoder().decode(out, state);
} finally {
out.release();

@ -61,11 +61,16 @@ public class SerializationCodec implements Codec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream result = new ByteBufOutputStream(out);
ObjectOutputStream outputStream = new ObjectOutputStream(result);
outputStream.writeObject(in);
outputStream.close();
return result.buffer();
} catch (IOException e) {
out.release();
throw e;
}
}
};

@ -65,9 +65,9 @@ public class SnappyCodec implements Codec {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
snappyDecoder.get().decode(buf, out);
snappyDecoder.get().reset();
try {
return innerCodec.getValueDecoder().decode(out, state);
} finally {
out.release();

@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.ReferenceCountUtil;
import org.redisson.RedisClientResult;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
@ -643,17 +644,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
protected void free(final Object[] params) {
for (Object obj : params) {
if (obj instanceof ByteBuf) {
((ByteBuf)obj).release();
}
ReferenceCountUtil.safeRelease(obj);
}
}
protected <V, R> void free(final AsyncDetails<V, R> details) {
for (Object obj : details.getParams()) {
if (obj instanceof ByteBuf) {
((ByteBuf)obj).release();
}
ReferenceCountUtil.safeRelease(obj);
}
}

@ -164,9 +164,8 @@ public class TasksRunnerService implements RemoteExecutorService, RemoteParams {
}
private Object executeCallable(String className, byte[] classBody, byte[] state, String scheduledRequestId) {
ByteBuf buf = null;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
try {
buf = ByteBufAllocator.DEFAULT.buffer(state.length);
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader());
@ -201,9 +200,8 @@ public class TasksRunnerService implements RemoteExecutorService, RemoteParams {
}
private void executeRunnable(String className, byte[] classBody, byte[] state, String scheduledRequestId) {
ByteBuf buf = null;
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length);
try {
buf = ByteBufAllocator.DEFAULT.buffer(state.length);
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader());

@ -41,10 +41,10 @@ public class Hash {
long h2 = LongHashFunction.xx().hashBytes(b);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer((2 * Long.SIZE) / Byte.SIZE);
try {
buf.writeLong(h1).writeLong(h2);
byte[] dst = new byte[buf.readableBytes()];
buf.readBytes(dst);
try {
return dst;
} finally {
buf.release();
@ -58,13 +58,18 @@ public class Hash {
long h2 = LongHashFunction.xx().hashBytes(bf);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer((2 * Long.SIZE) / Byte.SIZE);
try {
buf.writeLong(h1).writeLong(h2);
ByteBuf b = Base64.encode(buf);
try {
String s = b.toString(CharsetUtil.UTF_8);
return s.substring(0, s.length() - 2);
} finally {
b.release();
}
} finally {
buf.release();
return s.substring(0, s.length() - 2);
}
}
}

Loading…
Cancel
Save