ByteBufs are not released properly in SnappyCodec and LZ4Codec #1046

pull/1051/head
Nikita 8 years ago
parent 45236c4b4d
commit a45a476bd3

@ -55,6 +55,10 @@ public class LZ4Codec implements Codec {
public LZ4Codec(Codec innerCodec) { public LZ4Codec(Codec innerCodec) {
this.innerCodec = innerCodec; this.innerCodec = innerCodec;
} }
public LZ4Codec(ClassLoader classLoader) {
this(new FstCodec(classLoader));
}
private final Decoder<Object> decoder = new Decoder<Object>() { private final Decoder<Object> decoder = new Decoder<Object>() {
@Override @Override
@ -79,21 +83,28 @@ public class LZ4Codec implements Codec {
@Override @Override
public ByteBuf encode(Object in) throws IOException { public ByteBuf encode(Object in) throws IOException {
LZ4Compressor compressor = factory.fastCompressor(); ByteBuf bytes = null;
ByteBuf bytes = innerCodec.getValueEncoder().encode(in); try {
ByteBuffer srcBuf = bytes.internalNioBuffer(bytes.readerIndex(), bytes.readableBytes()); LZ4Compressor compressor = factory.fastCompressor();
bytes = innerCodec.getValueEncoder().encode(in);
int outMaxLength = compressor.maxCompressedLength(bytes.readableBytes()); ByteBuffer srcBuf = bytes.internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(outMaxLength + DECOMPRESSION_HEADER_SIZE);
out.writeInt(bytes.readableBytes()); int outMaxLength = compressor.maxCompressedLength(bytes.readableBytes());
ByteBuffer outBuf = out.internalNioBuffer(out.writerIndex(), out.writableBytes()); ByteBuf out = ByteBufAllocator.DEFAULT.buffer(outMaxLength + DECOMPRESSION_HEADER_SIZE);
int pos = outBuf.position(); out.writeInt(bytes.readableBytes());
ByteBuffer outBuf = out.internalNioBuffer(out.writerIndex(), out.writableBytes());
compressor.compress(srcBuf, outBuf); int pos = outBuf.position();
int compressedLength = outBuf.position() - pos; compressor.compress(srcBuf, outBuf);
out.writerIndex(out.writerIndex() + compressedLength);
return out; int compressedLength = outBuf.position() - pos;
out.writerIndex(out.writerIndex() + compressedLength);
return out;
} finally {
if (bytes != null) {
bytes.release();
}
}
} }
}; };

@ -60,6 +60,10 @@ public class SnappyCodec implements Codec {
this.innerCodec = innerCodec; this.innerCodec = innerCodec;
} }
public SnappyCodec(ClassLoader classLoader) {
this(new FstCodec(classLoader));
}
private final Decoder<Object> decoder = new Decoder<Object>() { private final Decoder<Object> decoder = new Decoder<Object>() {
@Override @Override
@ -67,9 +71,9 @@ public class SnappyCodec implements Codec {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try { try {
snappyDecoder.get().decode(buf, out); snappyDecoder.get().decode(buf, out);
snappyDecoder.get().reset();
return innerCodec.getValueDecoder().decode(out, state); return innerCodec.getValueDecoder().decode(out, state);
} finally { } finally {
snappyDecoder.get().reset();
out.release(); out.release();
} }
} }
@ -80,10 +84,14 @@ public class SnappyCodec implements Codec {
@Override @Override
public ByteBuf encode(Object in) throws IOException { public ByteBuf encode(Object in) throws IOException {
ByteBuf buf = innerCodec.getValueEncoder().encode(in); ByteBuf buf = innerCodec.getValueEncoder().encode(in);
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(1024*100); ByteBuf out = ByteBufAllocator.DEFAULT.buffer(buf.readableBytes() + 128);
snappyEncoder.get().encode(buf, out, buf.readableBytes()); try {
snappyEncoder.get().reset(); snappyEncoder.get().encode(buf, out, buf.readableBytes());
return out; return out;
} finally {
buf.release();
snappyEncoder.get().reset();
}
} }
}; };

Loading…
Cancel
Save