From 822f8c8529f39dbcf67931f8a3cabdb7ea74304d Mon Sep 17 00:00:00 2001 From: Johno Crawford Date: Mon, 4 Sep 2017 14:41:18 +0200 Subject: [PATCH] Ensure bytebufs are released if various encoders throw exception. Performance boost for utf8 string encoding. --- .../redisson/client/codec/ByteArrayCodec.java | 5 ++-- .../client/codec/JsonJacksonMapCodec.java | 11 +++++--- .../redisson/client/codec/StringCodec.java | 14 +++++++--- .../client/protocol/DefaultParamsEncoder.java | 16 ++++++----- .../java/org/redisson/codec/FstCodec.java | 15 +++++++---- .../org/redisson/codec/JsonJacksonCodec.java | 11 +++++--- .../java/org/redisson/codec/KryoCodec.java | 3 ++- .../java/org/redisson/codec/LZ4Codec.java | 15 +++++------ .../redisson/codec/SerializationCodec.java | 15 +++++++---- .../java/org/redisson/codec/SnappyCodec.java | 4 +-- .../redisson/command/CommandAsyncService.java | 9 +++---- .../redisson/executor/TasksRunnerService.java | 6 ++--- .../src/main/java/org/redisson/misc/Hash.java | 27 +++++++++++-------- 13 files changed, 91 insertions(+), 60 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java b/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java index eaee61141..be9e1d3f3 100644 --- a/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/ByteArrayCodec.java @@ -36,8 +36,9 @@ public class ByteArrayCodec implements Codec { private final Encoder encoder = new Encoder() { @Override public ByteBuf encode(Object in) throws IOException { - ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - out.writeBytes((byte[])in); + byte[] payload = (byte[])in; + ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length); + out.writeBytes(payload); return out; } }; diff --git a/redisson/src/main/java/org/redisson/client/codec/JsonJacksonMapCodec.java b/redisson/src/main/java/org/redisson/client/codec/JsonJacksonMapCodec.java index aada88164..441559979 100644 --- a/redisson/src/main/java/org/redisson/client/codec/JsonJacksonMapCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/JsonJacksonMapCodec.java @@ -50,9 +50,14 @@ public class JsonJacksonMapCodec extends JsonJacksonCodec { @Override public ByteBuf encode(Object in) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - ByteBufOutputStream os = new ByteBufOutputStream(out); - mapper.writeValue(os, in); - return os.buffer(); + try { + ByteBufOutputStream os = new ByteBufOutputStream(out); + mapper.writeValue(os, in); + return os.buffer(); + } catch (IOException e) { + out.release(); + throw e; + } } }; diff --git a/redisson/src/main/java/org/redisson/client/codec/StringCodec.java b/redisson/src/main/java/org/redisson/client/codec/StringCodec.java index 84eb0270a..07a20e3d4 100644 --- a/redisson/src/main/java/org/redisson/client/codec/StringCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/StringCodec.java @@ -18,6 +18,7 @@ package org.redisson.client.codec; import java.io.IOException; import java.nio.charset.Charset; +import io.netty.buffer.ByteBufUtil; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -40,9 +41,16 @@ public class StringCodec implements Codec { private final Encoder encoder = new Encoder() { @Override public ByteBuf encode(Object in) throws IOException { - ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - out.writeCharSequence(in.toString(), charset); - return out; + if (CharsetUtil.UTF_8.equals(charset)) { + String payload = in.toString(); + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(ByteBufUtil.utf8MaxBytes(payload)); + ByteBufUtil.writeUtf8(buf, payload); + return buf; + } else { + ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); + out.writeCharSequence(in.toString(), charset); + return out; + } } }; diff --git a/redisson/src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java b/redisson/src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java index 83d08c8ca..98a81bd16 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/DefaultParamsEncoder.java @@ -17,7 +17,7 @@ package org.redisson.client.protocol; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.CharsetUtil; +import io.netty.buffer.ByteBufUtil; /** * @@ -29,16 +29,18 @@ public class DefaultParamsEncoder implements Encoder { @Override public ByteBuf encode(Object in) { if (in instanceof byte[]) { - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); - buf.writeBytes((byte[])in); - return buf; + byte[] payload = (byte[])in; + ByteBuf out = ByteBufAllocator.DEFAULT.buffer(payload.length); + out.writeBytes(payload); + return out; } if (in instanceof ByteBuf) { return (ByteBuf) in; } - - ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); - buf.writeCharSequence(in.toString(), CharsetUtil.UTF_8); + + String payload = in.toString(); + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(ByteBufUtil.utf8MaxBytes(payload)); + ByteBufUtil.writeUtf8(buf, payload); return buf; } diff --git a/redisson/src/main/java/org/redisson/codec/FstCodec.java b/redisson/src/main/java/org/redisson/codec/FstCodec.java index f756ffc99..ef4c9c7bb 100644 --- a/redisson/src/main/java/org/redisson/codec/FstCodec.java +++ b/redisson/src/main/java/org/redisson/codec/FstCodec.java @@ -81,11 +81,16 @@ public class FstCodec implements Codec { @Override public ByteBuf encode(Object in) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - ByteBufOutputStream os = new ByteBufOutputStream(out); - FSTObjectOutput oos = config.getObjectOutput(os); - oos.writeObject(in); - oos.flush(); - return os.buffer(); + try { + ByteBufOutputStream os = new ByteBufOutputStream(out); + FSTObjectOutput oos = config.getObjectOutput(os); + oos.writeObject(in); + oos.flush(); + return os.buffer(); + } catch (IOException e) { + out.release(); + throw e; + } } }; diff --git a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java index f3122dafc..747194ae7 100755 --- a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -70,9 +70,14 @@ public class JsonJacksonCodec implements Codec { @Override public ByteBuf encode(Object in) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - ByteBufOutputStream os = new ByteBufOutputStream(out); - mapObjectMapper.writeValue(os, in); - return os.buffer(); + try { + ByteBufOutputStream os = new ByteBufOutputStream(out); + mapObjectMapper.writeValue(os, in); + return os.buffer(); + } catch (IOException e) { + out.release(); + throw e; + } } }; diff --git a/redisson/src/main/java/org/redisson/codec/KryoCodec.java b/redisson/src/main/java/org/redisson/codec/KryoCodec.java index 5f4defb39..a4c779bbc 100755 --- a/redisson/src/main/java/org/redisson/codec/KryoCodec.java +++ b/redisson/src/main/java/org/redisson/codec/KryoCodec.java @@ -129,8 +129,8 @@ public class KryoCodec implements Codec { @Override public ByteBuf encode(Object in) throws IOException { Kryo kryo = null; + ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { - ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); ByteBufOutputStream baos = new ByteBufOutputStream(out); Output output = new Output(baos); kryo = kryoPool.get(); @@ -138,6 +138,7 @@ public class KryoCodec implements Codec { output.close(); return baos.buffer(); } catch (Exception e) { + out.release(); if (e instanceof RuntimeException) { throw (RuntimeException) e; } diff --git a/redisson/src/main/java/org/redisson/codec/LZ4Codec.java b/redisson/src/main/java/org/redisson/codec/LZ4Codec.java index 630e18e07..a37e8a017 100644 --- a/redisson/src/main/java/org/redisson/codec/LZ4Codec.java +++ b/redisson/src/main/java/org/redisson/codec/LZ4Codec.java @@ -61,14 +61,13 @@ public class LZ4Codec implements Codec { public Object decode(ByteBuf buf, State state) throws IOException { int decompressSize = buf.readInt(); ByteBuf out = ByteBufAllocator.DEFAULT.buffer(decompressSize); - - LZ4SafeDecompressor decompressor = factory.safeDecompressor(); - ByteBuffer outBuffer = out.internalNioBuffer(out.writerIndex(), out.writableBytes()); - int pos = outBuffer.position(); - decompressor.decompress(buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()), outBuffer); - int compressedLength = outBuffer.position() - pos; - out.writerIndex(compressedLength); try { + LZ4SafeDecompressor decompressor = factory.safeDecompressor(); + ByteBuffer outBuffer = out.internalNioBuffer(out.writerIndex(), out.writableBytes()); + int pos = outBuffer.position(); + decompressor.decompress(buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()), outBuffer); + int compressedLength = outBuffer.position() - pos; + out.writerIndex(compressedLength); return innerCodec.getValueDecoder().decode(out, state); } finally { out.release(); @@ -91,7 +90,7 @@ public class LZ4Codec implements Codec { int pos = outBuf.position(); compressor.compress(srcBuf, outBuf); - + int compressedLength = outBuf.position() - pos; out.writerIndex(out.writerIndex() + compressedLength); return out; diff --git a/redisson/src/main/java/org/redisson/codec/SerializationCodec.java b/redisson/src/main/java/org/redisson/codec/SerializationCodec.java index 542d3aac7..b3799f2fc 100644 --- a/redisson/src/main/java/org/redisson/codec/SerializationCodec.java +++ b/redisson/src/main/java/org/redisson/codec/SerializationCodec.java @@ -61,11 +61,16 @@ public class SerializationCodec implements Codec { @Override public ByteBuf encode(Object in) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - ByteBufOutputStream result = new ByteBufOutputStream(out); - ObjectOutputStream outputStream = new ObjectOutputStream(result); - outputStream.writeObject(in); - outputStream.close(); - return result.buffer(); + try { + ByteBufOutputStream result = new ByteBufOutputStream(out); + ObjectOutputStream outputStream = new ObjectOutputStream(result); + outputStream.writeObject(in); + outputStream.close(); + return result.buffer(); + } catch (IOException e) { + out.release(); + throw e; + } } }; diff --git a/redisson/src/main/java/org/redisson/codec/SnappyCodec.java b/redisson/src/main/java/org/redisson/codec/SnappyCodec.java index dfb2938ae..87feec32c 100644 --- a/redisson/src/main/java/org/redisson/codec/SnappyCodec.java +++ b/redisson/src/main/java/org/redisson/codec/SnappyCodec.java @@ -65,9 +65,9 @@ public class SnappyCodec implements Codec { @Override public Object decode(ByteBuf buf, State state) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); - snappyDecoder.get().decode(buf, out); - snappyDecoder.get().reset(); try { + snappyDecoder.get().decode(buf, out); + snappyDecoder.get().reset(); return innerCodec.getValueDecoder().decode(out, state); } finally { out.release(); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 4be6943a7..f093cca7d 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -28,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.ReferenceCountUtil; import org.redisson.RedisClientResult; import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; @@ -643,17 +644,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { protected void free(final Object[] params) { for (Object obj : params) { - if (obj instanceof ByteBuf) { - ((ByteBuf)obj).release(); - } + ReferenceCountUtil.safeRelease(obj); } } protected void free(final AsyncDetails details) { for (Object obj : details.getParams()) { - if (obj instanceof ByteBuf) { - ((ByteBuf)obj).release(); - } + ReferenceCountUtil.safeRelease(obj); } } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 685038ddf..69254bb0d 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -164,9 +164,8 @@ public class TasksRunnerService implements RemoteExecutorService, RemoteParams { } private Object executeCallable(String className, byte[] classBody, byte[] state, String scheduledRequestId) { - ByteBuf buf = null; + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length); try { - buf = ByteBufAllocator.DEFAULT.buffer(state.length); buf.writeBytes(state); RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader()); @@ -201,9 +200,8 @@ public class TasksRunnerService implements RemoteExecutorService, RemoteParams { } private void executeRunnable(String className, byte[] classBody, byte[] state, String scheduledRequestId) { - ByteBuf buf = null; + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(state.length); try { - buf = ByteBufAllocator.DEFAULT.buffer(state.length); buf.writeBytes(state); RedissonClassLoader cl = new RedissonClassLoader(getClass().getClassLoader()); diff --git a/redisson/src/main/java/org/redisson/misc/Hash.java b/redisson/src/main/java/org/redisson/misc/Hash.java index 931d29555..13f4292fa 100644 --- a/redisson/src/main/java/org/redisson/misc/Hash.java +++ b/redisson/src/main/java/org/redisson/misc/Hash.java @@ -41,30 +41,35 @@ public class Hash { long h2 = LongHashFunction.xx().hashBytes(b); ByteBuf buf = ByteBufAllocator.DEFAULT.buffer((2 * Long.SIZE) / Byte.SIZE); - buf.writeLong(h1).writeLong(h2); - byte[] dst = new byte[buf.readableBytes()]; - buf.readBytes(dst); try { + buf.writeLong(h1).writeLong(h2); + byte[] dst = new byte[buf.readableBytes()]; + buf.readBytes(dst); return dst; } finally { buf.release(); } } - + public static String hashToBase64(ByteBuf objectState) { ByteBuffer bf = objectState.internalNioBuffer(objectState.readerIndex(), objectState.readableBytes()); long h1 = LongHashFunction.farmUo().hashBytes(bf); long h2 = LongHashFunction.xx().hashBytes(bf); ByteBuf buf = ByteBufAllocator.DEFAULT.buffer((2 * Long.SIZE) / Byte.SIZE); - buf.writeLong(h1).writeLong(h2); - - ByteBuf b = Base64.encode(buf); - String s = b.toString(CharsetUtil.UTF_8); - b.release(); - buf.release(); - return s.substring(0, s.length() - 2); + try { + buf.writeLong(h1).writeLong(h2); + ByteBuf b = Base64.encode(buf); + try { + String s = b.toString(CharsetUtil.UTF_8); + return s.substring(0, s.length() - 2); + } finally { + b.release(); + } + } finally { + buf.release(); + } } }