Fixed - FSTCodec memory leak #1927

pull/1979/head
Nikita Koksharov 6 years ago
parent c16a31b09b
commit e2c134e1a6

@ -16,10 +16,15 @@
package org.redisson.codec;
import java.io.IOException;
import java.lang.reflect.Field;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTDecoder;
import org.nustaq.serialization.FSTEncoder;
import org.nustaq.serialization.FSTObjectInput;
import org.nustaq.serialization.FSTObjectOutput;
import org.nustaq.serialization.coders.FSTStreamDecoder;
import org.nustaq.serialization.coders.FSTStreamEncoder;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -41,6 +46,74 @@ import io.netty.buffer.ByteBufOutputStream;
*/
public class FstCodec extends BaseCodec {
static class FSTDefaultStreamCoderFactory implements FSTConfiguration.StreamCoderFactory {
Field chBufField;
Field ascStringCacheField;
{
try {
chBufField = FSTStreamDecoder.class.getDeclaredField("chBufS");
ascStringCacheField = FSTStreamDecoder.class.getDeclaredField("ascStringCache");
} catch (Exception e) {
throw new IllegalStateException(e);
}
ascStringCacheField.setAccessible(true);
chBufField.setAccessible(true);
}
private FSTConfiguration fstConfiguration;
FSTDefaultStreamCoderFactory(FSTConfiguration fstConfiguration) {
this.fstConfiguration = fstConfiguration;
}
@Override
public FSTEncoder createStreamEncoder() {
return new FSTStreamEncoder(fstConfiguration);
}
@Override
public FSTDecoder createStreamDecoder() {
return new FSTStreamDecoder(fstConfiguration) {
public String readStringUTF() throws IOException {
try {
String res = super.readStringUTF();
chBufField.set(this, null);
return res;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public String readStringAsc() throws IOException {
try {
String res = super.readStringAsc();
ascStringCacheField.set(this, null);
return res;
} catch (Exception e) {
throw new IOException(e);
}
}
};
}
static ThreadLocal input = new ThreadLocal();
static ThreadLocal output = new ThreadLocal();
@Override
public ThreadLocal getInput() {
return input;
}
@Override
public ThreadLocal getOutput() {
return output;
}
}
private final FSTConfiguration config;
public FstCodec() {
@ -79,21 +152,23 @@ public class FstCodec extends BaseCodec {
public FstCodec(FSTConfiguration fstConfiguration) {
config = fstConfiguration;
config.setShareReferences(false);
config.setStreamCoderFactory(new FSTDefaultStreamCoderFactory(config));
}
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBufInputStream in = new ByteBufInputStream(buf);
FSTObjectInput inputStream = config.getObjectInput(in);
try {
ByteBufInputStream in = new ByteBufInputStream(buf);
FSTObjectInput inputStream = config.getObjectInput(in);
return inputStream.readObject();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
} finally {
config.getStreamCoderFactory().getInput().remove();
// } finally {
// inputStream.resetForReuseUseArray(empty);
}
}
};
@ -103,9 +178,9 @@ public class FstCodec extends BaseCodec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
ByteBufOutputStream os = new ByteBufOutputStream(out);
FSTObjectOutput oos = config.getObjectOutput(os);
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
FSTObjectOutput oos = config.getObjectOutput(os);
oos.writeObject(in);
oos.flush();
return os.buffer();
@ -115,8 +190,8 @@ public class FstCodec extends BaseCodec {
} catch (Exception e) {
out.release();
throw new IOException(e);
} finally {
config.getStreamCoderFactory().getOutput().remove();
// } finally {
// oos.resetForReUse(empty);
}
}
};

Loading…
Cancel
Save