From e2c134e1a6f14460106a73c4dbd9c1f67a24bac8 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 25 Feb 2019 12:49:21 +0300 Subject: [PATCH] Fixed - FSTCodec memory leak #1927 --- .../java/org/redisson/codec/FstCodec.java | 91 +++++++++++++++++-- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/codec/FstCodec.java b/redisson/src/main/java/org/redisson/codec/FstCodec.java index fe184a41e..982a48c40 100644 --- a/redisson/src/main/java/org/redisson/codec/FstCodec.java +++ b/redisson/src/main/java/org/redisson/codec/FstCodec.java @@ -16,10 +16,15 @@ package org.redisson.codec; import java.io.IOException; +import java.lang.reflect.Field; import org.nustaq.serialization.FSTConfiguration; +import org.nustaq.serialization.FSTDecoder; +import org.nustaq.serialization.FSTEncoder; import org.nustaq.serialization.FSTObjectInput; import org.nustaq.serialization.FSTObjectOutput; +import org.nustaq.serialization.coders.FSTStreamDecoder; +import org.nustaq.serialization.coders.FSTStreamEncoder; import org.redisson.client.codec.BaseCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -41,6 +46,74 @@ import io.netty.buffer.ByteBufOutputStream; */ public class FstCodec extends BaseCodec { + static class FSTDefaultStreamCoderFactory implements FSTConfiguration.StreamCoderFactory { + + Field chBufField; + Field ascStringCacheField; + + { + try { + chBufField = FSTStreamDecoder.class.getDeclaredField("chBufS"); + ascStringCacheField = FSTStreamDecoder.class.getDeclaredField("ascStringCache"); + } catch (Exception e) { + throw new IllegalStateException(e); + } + ascStringCacheField.setAccessible(true); + chBufField.setAccessible(true); + } + + private FSTConfiguration fstConfiguration; + + FSTDefaultStreamCoderFactory(FSTConfiguration fstConfiguration) { + this.fstConfiguration = fstConfiguration; + } + + @Override + public FSTEncoder createStreamEncoder() { + return new FSTStreamEncoder(fstConfiguration); + } + + @Override + public FSTDecoder createStreamDecoder() { + return new FSTStreamDecoder(fstConfiguration) { + public String readStringUTF() throws IOException { + try { + String res = super.readStringUTF(); + chBufField.set(this, null); + return res; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public String readStringAsc() throws IOException { + try { + String res = super.readStringAsc(); + ascStringCacheField.set(this, null); + return res; + } catch (Exception e) { + throw new IOException(e); + } + } + }; + } + + static ThreadLocal input = new ThreadLocal(); + static ThreadLocal output = new ThreadLocal(); + + @Override + public ThreadLocal getInput() { + return input; + } + + @Override + public ThreadLocal getOutput() { + return output; + } + + } + private final FSTConfiguration config; public FstCodec() { @@ -79,21 +152,23 @@ public class FstCodec extends BaseCodec { public FstCodec(FSTConfiguration fstConfiguration) { config = fstConfiguration; + config.setShareReferences(false); + config.setStreamCoderFactory(new FSTDefaultStreamCoderFactory(config)); } private final Decoder decoder = new Decoder() { @Override public Object decode(ByteBuf buf, State state) throws IOException { + ByteBufInputStream in = new ByteBufInputStream(buf); + FSTObjectInput inputStream = config.getObjectInput(in); try { - ByteBufInputStream in = new ByteBufInputStream(buf); - FSTObjectInput inputStream = config.getObjectInput(in); return inputStream.readObject(); } catch (IOException e) { throw e; } catch (Exception e) { throw new IOException(e); - } finally { - config.getStreamCoderFactory().getInput().remove(); +// } finally { +// inputStream.resetForReuseUseArray(empty); } } }; @@ -103,9 +178,9 @@ public class FstCodec extends BaseCodec { @Override public ByteBuf encode(Object in) throws IOException { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); + ByteBufOutputStream os = new ByteBufOutputStream(out); + FSTObjectOutput oos = config.getObjectOutput(os); try { - ByteBufOutputStream os = new ByteBufOutputStream(out); - FSTObjectOutput oos = config.getObjectOutput(os); oos.writeObject(in); oos.flush(); return os.buffer(); @@ -115,8 +190,8 @@ public class FstCodec extends BaseCodec { } catch (Exception e) { out.release(); throw new IOException(e); - } finally { - config.getStreamCoderFactory().getOutput().remove(); +// } finally { +// oos.resetForReUse(empty); } } };