Fixed - memory leak caused by FastThreadLocal used in CodecDecoder. #2309

pull/2348/head
Nikita Koksharov 6 years ago
parent cbfb32ee7b
commit 3a47ed40c0

@ -65,7 +65,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.FastThreadLocal;
/**
* Redis protocol command decoder
@ -84,8 +83,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
final ExecutorService executor;
private final boolean decodeInExecutor;
private final FastThreadLocal<State> state = new FastThreadLocal<State>();
public CommandDecoder(ExecutorService executor, boolean decodeInExecutor) {
this.decodeInExecutor = decodeInExecutor;
this.executor = executor;
@ -95,12 +92,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();
if (state.get() == null) {
state.set(new State());
if (state() == null) {
state(new State());
}
state.get().setDecoderState(null);
if (data == null) {
while (in.writerIndex() > in.readerIndex()) {
int endIndex = skipCommand(in);
@ -138,8 +133,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
ByteBuf copy = in.copy(in.readerIndex(), in.writerIndex() - in.readerIndex());
in.skipBytes(in.writerIndex() - in.readerIndex());
executor.execute(() -> {
state.set(new State());
state.get().setDecoderState(null);
state(new State());
try {
decodeCommand(ctx.channel(), copy, data);
@ -245,19 +239,19 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected void sendNext(Channel channel) {
channel.pipeline().get(CommandsQueue.class).sendNextCommand(channel);
state.set(null);
state(null);
}
private void decodeCommandBatch(Channel channel, ByteBuf in, QueueCommand data,
CommandsData commandBatch) throws Exception {
int i = state.get().getBatchIndex();
int i = state().getBatchIndex();
Throwable error = null;
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> commandData = null;
checkpoint();
state.get().setBatchIndex(i);
state().setBatchIndex(i);
int endIndex = skipCommand(in);
try {
@ -328,7 +322,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
sendNext(channel);
} else {
checkpoint();
state.get().setBatchIndex(i);
state().setBatchIndex(i);
}
}
@ -381,18 +375,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Object result = null;
if (buf != null) {
Decoder<Object> decoder = selectDecoder(data, parts);
result = decoder.decode(buf, state.get());
result = decoder.decode(buf, state());
}
handleResult(data, parts, result, false, channel);
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));
state.get().incLevel();
state().incLevel();
decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData);
state.get().decLevel();
state().decLevel();
} else {
String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);
@ -434,7 +428,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
return;
}
Object result = decoder.decode(respParts, state.get());
Object result = decoder.decode(respParts, state());
decodeResult(data, parts, channel, result);
}
@ -479,7 +473,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
if (multiDecoder != null) {
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state.get());
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state());
if (mDecoder != null) {
return mDecoder;
}

Loading…
Cancel
Save