diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java index 04e5ee243..cc6c3842f 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java +++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java @@ -51,6 +51,7 @@ public class RedisClientConfig { private String clientName; private boolean readOnly; private boolean keepPubSubOrder = true; + private boolean decodeInExecutor; private int pingConnectionInterval; private boolean keepAlive; private boolean tcpNoDelay; @@ -92,6 +93,7 @@ public class RedisClientConfig { this.sslKeystorePassword = config.sslKeystorePassword; this.resolverGroup = config.resolverGroup; this.sslHostname = config.sslHostname; + this.decodeInExecutor = config.decodeInExecutor; } public String getSslHostname() { @@ -262,6 +264,14 @@ public class RedisClientConfig { return this; } + public boolean isDecodeInExecutor() { + return decodeInExecutor; + } + public RedisClientConfig setDecodeInExecutor(boolean decodeInExecutor) { + this.decodeInExecutor = decodeInExecutor; + return this; + } + public int getPingConnectionInterval() { return pingConnectionInterval; } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 44e65549f..7238926eb 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ExecutorService; import org.redisson.client.RedisAskException; import org.redisson.client.RedisException; @@ -43,10 +44,14 @@ import org.redisson.client.RedisMovedException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTryAgainException; +import org.redisson.client.codec.BaseCodec; +import org.redisson.client.codec.ByteArrayCodec; +import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -58,6 +63,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; @@ -70,131 +76,174 @@ import io.netty.util.CharsetUtil; * */ public class CommandDecoder extends ReplayingDecoder { + + public static class NullCodec extends BaseCodec { + + public static final NullCodec INSTANCE = new NullCodec(); + + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) { + return new Object(); + } + }; + + @Override + public Decoder getValueDecoder() { + return decoder; + } + + @Override + public Encoder getValueEncoder() { + throw new UnsupportedOperationException(); + } + + } + protected final Logger log = LoggerFactory.getLogger(getClass()); private static final char CR = '\r'; private static final char LF = '\n'; private static final char ZERO = '0'; + + public enum Status {NORMAL, FILL_BUFFER, DECODE_BUFFER} + + final ExecutorService executor; + private final boolean decodeInExecutor; + + private final ThreadLocal decoderStatus = new ThreadLocal() { + @Override + protected Status initialValue() { + return Status.NORMAL; + }; + }; + + private final ThreadLocal state = new ThreadLocal(); + + public CommandDecoder(ExecutorService executor, boolean decodeInExecutor) { + this.decodeInExecutor = decodeInExecutor; + this.executor = executor; + } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); + final QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); if (log.isTraceEnabled()) { log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); } - if (state() == null) { - boolean makeCheckpoint = false; -// commented out due to https://github.com/redisson/redisson/issues/1632. Reproduced with RedissonMapCacheTest -// -// boolean makeCheckpoint = data != null; -// if (data != null) { -// if (data instanceof CommandsData) { -// makeCheckpoint = false; -// } else { -// CommandData cmd = (CommandData)data; -// MultiDecoder decoder = cmd.getCommand().getReplayMultiDecoder(); -// if (decoder != null -// && (decoder instanceof SlotsDecoder -// || decoder instanceof ListMultiDecoder)) { -// makeCheckpoint = false; -// } -// } -// } - state(new State(makeCheckpoint)); + if (state.get() == null) { + state.set(new State()); } - state().setDecoderState(null); + state.get().setDecoderState(null); - decodeCommand(ctx, in, data); + in.markReaderIndex(); + decodeCommand(ctx.channel(), in, data); + + if (decoderStatus.get() == Status.FILL_BUFFER) { + in.resetReaderIndex(); + final ByteBuf copy = ByteBufAllocator.DEFAULT.buffer(in.writerIndex()); + in.readBytes(copy); + state.set(null); + decoderStatus.set(Status.NORMAL); + + final Channel channel = ctx.channel(); + executor.execute(new Runnable() { + @Override + public void run() { + decoderStatus.set(Status.DECODE_BUFFER); + state.set(new State()); + state.get().setDecoderState(null); + try { + decodeCommand(channel, copy, data); + } catch (Exception e) { + log.error("Unable to decode data in separate thread: " + LogHelper.toString(data), e); + } finally { + copy.release(); + decoderStatus.remove(); + state.remove(); + } + } + }); + } + } - protected void sendNext(ChannelHandlerContext ctx, QueueCommand data) { + protected void sendNext(Channel channel, QueueCommand data) { if (data != null) { - if (data.isExecuted()) { - sendNext(ctx); + if (decoderStatus.get() == Status.FILL_BUFFER || data.isExecuted()) { + sendNext(channel); } } else { - sendNext(ctx); + sendNext(channel); } } - protected void decodeCommand(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { + protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { - if (state().isMakeCheckpoint()) { - decodeFromCheckpoint(ctx, in, data, cmd); - } else { - decode(in, cmd, null, ctx, false); - } - sendNext(ctx, data); + decode(in, cmd, null, channel, false, null); + sendNext(channel, data); } catch (Exception e) { - log.error("Unable to decode data. channel: " + ctx.channel() + ", reply: " + LogHelper.toString(in) + ", command: " + LogHelper.toString(data), e); + log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in) + ", command: " + LogHelper.toString(data), e); cmd.tryFailure(e); - sendNext(ctx); + decoderStatus.set(Status.NORMAL); + sendNext(channel); throw e; } } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; try { - decodeCommandBatch(ctx, in, data, commands); + decodeCommandBatch(channel, in, data, commands); } catch (Exception e) { commands.getPromise().tryFailure(e); - sendNext(ctx); + decoderStatus.set(Status.NORMAL); + sendNext(channel); throw e; } } else { try { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx, false); + decode(in, null, null, channel, false, null); } - sendNext(ctx); + sendNext(channel); } catch (Exception e) { - log.error("Unable to decode data. channel: " + ctx.channel() + ", reply: " + LogHelper.toString(in), e); - sendNext(ctx); + log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in), e); + decoderStatus.set(Status.NORMAL); + sendNext(channel); throw e; } } } - protected void sendNext(ChannelHandlerContext ctx) { - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); - state(null); - } - - protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, - CommandData cmd) throws IOException { - StateLevel level = state().getLastLevel(); - - List prevParts = null; - if (state().getLevels().size() > 1) { - StateLevel prevLevel = state().getLevels().get(state().getLevel() - 1); - prevParts = prevLevel.getParts(); - } - - decodeList(in, cmd, prevParts, ctx, level.getSize(), level.getParts(), false); - - if (state().getLastLevel() == level) { - state().removeLastLevel(); + protected void sendNext(Channel channel) { + if (decoderStatus.get() != Status.DECODE_BUFFER) { + channel.pipeline().get(CommandsQueue.class).sendNextCommand(channel); + state.set(null); } } - - ThreadLocal>> commandsData = new ThreadLocal>>(); - private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, + private void decodeCommandBatch(Channel channel, ByteBuf in, QueueCommand data, CommandsData commandBatch) throws Exception { - int i = state().getBatchIndex(); + int i = 0; + if (decoderStatus.get() != Status.DECODE_BUFFER) { + i = state.get().getBatchIndex(); + } Throwable error = null; while (in.writerIndex() > in.readerIndex()) { CommandData commandData = null; try { - checkpoint(); - state().setBatchIndex(i); + if (decoderStatus.get() != Status.DECODE_BUFFER) { + checkpoint(); + state.get().setBatchIndex(i); + } RedisCommand cmd = commandBatch.getCommands().get(i).getCommand(); boolean skipConvertor = commandBatch.isQueued(); + List> commandsData = null; if (!commandBatch.isAtomic() || RedisCommands.EXEC.getName().equals(cmd.getName()) || RedisCommands.WAIT.getName().equals(cmd.getName())) { @@ -202,20 +251,14 @@ public class CommandDecoder extends ReplayingDecoder { if (RedisCommands.EXEC.getName().equals(cmd.getName())) { skipConvertor = false; if (commandBatch.getAttachedCommands() != null) { - commandsData.set(commandBatch.getAttachedCommands()); + commandsData = commandBatch.getAttachedCommands(); } else { - commandsData.set(commandBatch.getCommands()); + commandsData = commandBatch.getCommands(); } } } - try { - decode(in, commandData, null, ctx, skipConvertor); - } finally { - if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { - commandsData.remove(); - } - } + decode(in, commandData, null, channel, skipConvertor, commandsData); if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName()) && commandData.getPromise().isSuccess()) { @@ -229,7 +272,7 @@ public class CommandDecoder extends ReplayingDecoder { } Object res = iter.next(); - completeResponse((CommandData) command, res, ctx.channel()); + completeResponse((CommandData) command, res, channel); } if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) { @@ -250,27 +293,30 @@ public class CommandDecoder extends ReplayingDecoder { } if (commandBatch.isSkipResult() || i == commandBatch.getCommands().size()) { - RPromise promise = commandBatch.getPromise(); - if (error != null) { - if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}",ctx.channel(), LogHelper.toString(data)); - } - } else { - if (!promise.trySuccess(null) && promise.cause() instanceof RedisTimeoutException) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), LogHelper.toString(data)); + if (decoderStatus.get() != Status.FILL_BUFFER) { + RPromise promise = commandBatch.getPromise(); + if (error != null) { + if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data)); + } + } else { + if (!promise.trySuccess(null) && promise.cause() instanceof RedisTimeoutException) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data)); + } } } - sendNext(ctx); + sendNext(channel); } else { - checkpoint(); - state().setBatchIndex(i); + if (decoderStatus.get() != Status.DECODE_BUFFER) { + checkpoint(); + state.get().setBatchIndex(i); + } } } - protected void decode(ByteBuf in, CommandData data, List parts, ChannelHandlerContext ctx, boolean skipConvertor) throws IOException { + protected void decode(ByteBuf in, CommandData data, List parts, Channel channel, boolean skipConvertor, List> commandsData) throws IOException { int code = in.readByte(); - Channel channel = ctx.channel(); if (code == '+') { String result = readString(in); @@ -315,30 +361,18 @@ public class CommandDecoder extends ReplayingDecoder { Object result = null; if (buf != null) { Decoder decoder = selectDecoder(data, parts); - result = decoder.decode(buf, state()); + result = decoder.decode(buf, state.get()); } handleResult(data, parts, result, false, channel); } else if (code == '*') { long size = readLong(in); - final List respParts = new ArrayList(Math.max((int)size, 0)); + List respParts = new ArrayList(Math.max((int)size, 0)); - StateLevel lastLevel = null; - if (state().isMakeCheckpoint()) { - lastLevel = new StateLevel(size, respParts); - state().addLevel(lastLevel); - } - - state().incLevel(); + state.get().incLevel(); - decodeList(in, data, parts, ctx, size, respParts, skipConvertor); + decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData); - state().decLevel(); - - if (state().isMakeCheckpoint()) { - if (lastLevel == state().getLastLevel() && lastLevel.isFull()) { - state().removeLastLevel(); - } - } + state.get().decLevel(); } else { String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); @@ -355,31 +389,23 @@ public class CommandDecoder extends ReplayingDecoder { @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData data, List parts, - ChannelHandlerContext ctx, long size, List respParts, boolean skipConvertor) + Channel channel, long size, List respParts, boolean skipConvertor, List> commandsData) throws IOException { - if (parts == null && commandsData.get() != null) { - List> commands = commandsData.get(); + if (parts == null && commandsData != null) { for (int i = respParts.size(); i < size; i++) { int suffix = 0; - if (RedisCommands.MULTI.getName().equals(commands.get(0).getCommand().getName())) { + if (RedisCommands.MULTI.getName().equals(commandsData.get(0).getCommand().getName())) { suffix = 1; } - CommandData commandData = (CommandData) commands.get(i+suffix); - decode(in, commandData, respParts, ctx, skipConvertor); + CommandData commandData = (CommandData) commandsData.get(i+suffix); + decode(in, commandData, respParts, channel, skipConvertor, commandsData); if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) { data.tryFailure(commandData.cause()); } - - if (state().isMakeCheckpoint()) { - checkpoint(); - } } } else { for (int i = respParts.size(); i < size; i++) { - decode(in, data, respParts, ctx, skipConvertor); - if (state().isMakeCheckpoint()) { - checkpoint(); - } + decode(in, data, respParts, channel, skipConvertor, null); } } @@ -388,19 +414,23 @@ public class CommandDecoder extends ReplayingDecoder { return; } - Object result = decoder.decode(respParts, state()); - decodeResult(data, parts, ctx, result); + if (decoderStatus.get() == Status.FILL_BUFFER) { + return; + } + + Object result = decoder.decode(respParts, state.get()); + decodeResult(data, parts, channel, result); } - protected void decodeResult(CommandData data, List parts, ChannelHandlerContext ctx, + protected void decodeResult(CommandData data, List parts, Channel channel, Object result) throws IOException { if (data != null) { - handleResult(data, parts, result, true, ctx.channel()); + handleResult(data, parts, result, true, channel); } } private void handleResult(CommandData data, List parts, Object result, boolean skipConvertor, Channel channel) { - if (data != null && !skipConvertor) { + if (data != null && !skipConvertor && decoderStatus.get() != Status.FILL_BUFFER) { result = data.getCommand().getConvertor().convert(result); } if (parts != null) { @@ -411,6 +441,10 @@ public class CommandDecoder extends ReplayingDecoder { } protected void completeResponse(CommandData data, Object result, Channel channel) { + if (decoderStatus.get() == Status.FILL_BUFFER) { + return; + } + if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) { log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result)); } @@ -433,36 +467,46 @@ public class CommandDecoder extends ReplayingDecoder { if (parts != null) { MultiDecoder multiDecoder = data.getCommand().getReplayMultiDecoder(); if (multiDecoder != null) { - Decoder mDecoder = multiDecoder.getDecoder(parts.size(), state()); + Decoder mDecoder = multiDecoder.getDecoder(parts.size(), state.get()); if (mDecoder != null) { return mDecoder; } } } + Codec codec = data.getCodec(); + if (decodeInExecutor && !(codec instanceof StringCodec || codec instanceof ByteArrayCodec)) { + if (decoderStatus.get() == Status.NORMAL) { + decoderStatus.set(Status.FILL_BUFFER); + codec = NullCodec.INSTANCE; + } else if (decoderStatus.get() == Status.FILL_BUFFER) { + codec = NullCodec.INSTANCE; + } + } + Decoder decoder = data.getCommand().getReplayDecoder(); if (decoder == null) { - if (data.getCodec() == null) { + if (codec == null) { return StringCodec.INSTANCE.getValueDecoder(); } if (data.getCommand().getOutParamType() == ValueType.MAP) { if (parts != null && parts.size() % 2 != 0) { - return data.getCodec().getMapValueDecoder(); + return codec.getMapValueDecoder(); } else { - return data.getCodec().getMapKeyDecoder(); + return codec.getMapKeyDecoder(); } } else if (data.getCommand().getOutParamType() == ValueType.MAP_KEY) { - return data.getCodec().getMapKeyDecoder(); + return codec.getMapKeyDecoder(); } else if (data.getCommand().getOutParamType() == ValueType.MAP_VALUE) { - return data.getCodec().getMapValueDecoder(); + return codec.getMapValueDecoder(); } else { - return data.getCodec().getValueDecoder(); + return codec.getValueDecoder(); } } return decoder; } - public ByteBuf readBytes(ByteBuf is) throws IOException { + private ByteBuf readBytes(ByteBuf is) throws IOException { long l = readLong(is); if (l > Integer.MAX_VALUE) { throw new IllegalArgumentException( @@ -481,7 +525,7 @@ public class CommandDecoder extends ReplayingDecoder { return buffer; } - public static long readLong(ByteBuf is) throws IOException { + private long readLong(ByteBuf is) throws IOException { long size = 0; int sign = 1; int read = is.readByte(); diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index 38d28d63c..3f3c41a0d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -40,8 +40,7 @@ import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.misc.LogHelper; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.CharsetUtil; +import io.netty.channel.Channel; import io.netty.util.internal.PlatformDependent; /** @@ -57,11 +56,10 @@ public class CommandPubSubDecoder extends CommandDecoder { private final Map entries = new HashMap(); private final Map> commands = PlatformDependent.newConcurrentHashMap(); - private final ExecutorService executor; private final boolean keepOrder; - public CommandPubSubDecoder(ExecutorService executor, boolean keepOrder) { - this.executor = executor; + public CommandPubSubDecoder(ExecutorService executor, boolean keepOrder, boolean decodeInExecutor) { + super(executor, decodeInExecutor); this.keepOrder = keepOrder; } @@ -71,40 +69,36 @@ public class CommandPubSubDecoder extends CommandDecoder { } @Override - protected void decodeCommand(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { + protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { if (data == null) { try { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx, false); + decode(in, null, null, channel, false, null); } - sendNext(ctx); + sendNext(channel); } catch (Exception e) { - log.error("Unable to decode data. channel: " + ctx.channel() + ", reply: " + LogHelper.toString(in), e); - sendNext(ctx); + log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in), e); + sendNext(channel); throw e; } } else if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { - if (state().isMakeCheckpoint()) { - decodeFromCheckpoint(ctx, in, data, cmd); - } else { - while (in.writerIndex() > in.readerIndex()) { - decode(in, cmd, null, ctx, false); - } + while (in.writerIndex() > in.readerIndex()) { + decode(in, cmd, null, channel, false, null); } - sendNext(ctx, data); + sendNext(channel, data); } catch (Exception e) { - log.error("Unable to decode data. channel: " + ctx.channel() + ", reply: " + LogHelper.toString(in), e); + log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in), e); cmd.tryFailure(e); - sendNext(ctx); + sendNext(channel); throw e; } } } @Override - protected void decodeResult(CommandData data, List parts, ChannelHandlerContext ctx, + protected void decodeResult(CommandData data, List parts, Channel channel, final Object result) throws IOException { if (executor.isShutdown()) { return; @@ -113,7 +107,7 @@ public class CommandPubSubDecoder extends CommandDecoder { if (result instanceof Message) { checkpoint(); - final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(ctx.channel()); + final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); ChannelName channelName = ((Message) result).getChannel(); if (result instanceof PubSubStatusMessage) { String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); @@ -161,7 +155,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } } else { if (data != null && data.getCommand().getName().equals("PING")) { - super.decodeResult(data, parts, ctx, result); + super.decodeResult(data, parts, channel, result); } } } diff --git a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java index dcf8bf252..cb2db04fd 100644 --- a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java +++ b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java @@ -98,9 +98,9 @@ public class RedisChannelInitializer extends ChannelInitializer { } if (type == Type.PLAIN) { - ch.pipeline().addLast(new CommandDecoder()); + ch.pipeline().addLast(new CommandDecoder(config.getExecutor(), config.isDecodeInExecutor())); } else { - ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder())); + ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder(), config.isDecodeInExecutor())); } } diff --git a/redisson/src/main/java/org/redisson/client/handler/State.java b/redisson/src/main/java/org/redisson/client/handler/State.java index dbfa0a7d7..ff88b1226 100644 --- a/redisson/src/main/java/org/redisson/client/handler/State.java +++ b/redisson/src/main/java/org/redisson/client/handler/State.java @@ -33,16 +33,10 @@ public class State { private int level = -1; private List levels; - private final boolean makeCheckpoint; - public State(boolean makeCheckpoint) { - this.makeCheckpoint = makeCheckpoint; + public State() { } - public boolean isMakeCheckpoint() { - return makeCheckpoint; - } - public int getLevel() { return level; } diff --git a/redisson/src/main/java/org/redisson/config/Config.java b/redisson/src/main/java/org/redisson/config/Config.java index 1279a6c85..2b50a8211 100644 --- a/redisson/src/main/java/org/redisson/config/Config.java +++ b/redisson/src/main/java/org/redisson/config/Config.java @@ -80,6 +80,8 @@ public class Config { private boolean keepPubSubOrder = true; + private boolean decodeInExecutor = false; + private boolean useScriptCache = false; /** @@ -102,6 +104,7 @@ public class Config { oldConf.setCodec(new FstCodec()); } + setDecodeInExecutor(oldConf.isDecodeInExecutor()); setUseScriptCache(oldConf.isUseScriptCache()); setKeepPubSubOrder(oldConf.isKeepPubSubOrder()); setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout()); @@ -714,5 +717,21 @@ public class Config { return useScriptCache; } - + public boolean isDecodeInExecutor() { + return decodeInExecutor; + } + /** + * Defines whether to decode data by codec in executor's threads or netty's threads. + * If decoding data process takes long time and netty thread is used then `RedisTimeoutException` could arise time to time. + *

+ * Default is false. + * + * @param decodeInExecutor - true to use executor's threads, false to use netty's threads. + * @return config + */ + public Config setDecodeInExecutor(boolean decodeInExecutor) { + this.decodeInExecutor = decodeInExecutor; + return this; + } + } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index c6dde4a41..5c0fecb24 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -181,7 +181,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { if (cfg.getEventLoopGroup() == null) { this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); - } else { + } else { this.group = cfg.getEventLoopGroup(); } @@ -457,6 +457,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { .setSslKeystore(config.getSslKeystore()) .setSslKeystorePassword(config.getSslKeystorePassword()) .setClientName(config.getClientName()) + .setDecodeInExecutor(cfg.isDecodeInExecutor()) .setKeepPubSubOrder(cfg.isKeepPubSubOrder()) .setPingConnectionInterval(config.getPingConnectionInterval()) .setKeepAlive(config.isKeepAlive()) diff --git a/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java b/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java index b10da6610..886caa36d 100644 --- a/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java +++ b/redisson/src/main/java/org/redisson/liveobject/resolver/DefaultNamingScheme.java @@ -72,7 +72,7 @@ public class DefaultNamingScheme extends AbstractNamingScheme implements NamingS ByteBuf b = Unpooled.wrappedBuffer(ByteBufUtil.decodeHexDump(decode)); try { - return codec.getMapKeyDecoder().decode(b, new State(false)); + return codec.getMapKeyDecoder().decode(b, new State()); } catch (IOException ex) { throw new IllegalStateException("Unable to decode [" + decode + "] into object", ex); } finally { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 06f3a3bc7..7eb96fafa 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -233,7 +233,7 @@ public class RedissonBatchTest extends BaseTest { redisson.shutdown(); } - @Test + @Test(timeout = 60000) public void testWriteTimeout() { Config config = createConfig(); config.useSingleServer().setTimeout(15000); diff --git a/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java b/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java index ac43085df..c26aa3560 100644 --- a/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java +++ b/redisson/src/test/java/org/redisson/RedissonRuntimeEnvironment.java @@ -9,7 +9,7 @@ import java.util.Locale; public class RedissonRuntimeEnvironment { public static final boolean isTravis = "true".equalsIgnoreCase(System.getProperty("travisEnv")); - public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\redis-x64-4.0.2.2\\redis-server.exe"); + public static final String redisBinaryPath = System.getProperty("redisBinary", "C:\\Devel\\projects\\redis\\Redis-x64-3.2.100\\redis-server.exe"); public static final String tempDir = System.getProperty("java.io.tmpdir"); public static final String OS; public static final boolean isWindows; diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index c947d73ab..d263c44ef 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -44,7 +44,11 @@ import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisOutOfMemoryException; +import org.redisson.client.codec.BaseCodec; import org.redisson.client.codec.StringCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.Time; import org.redisson.cluster.ClusterNodeInfo; @@ -59,6 +63,10 @@ import org.redisson.connection.ConnectionListener; import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.balancer.RandomLoadBalancer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.CharsetUtil; + public class RedissonTest { protected RedissonClient redisson; @@ -390,6 +398,96 @@ public class RedissonTest { slave2.stop(); } + public static class SlowCodec extends BaseCodec { + + private final Encoder encoder = new Encoder() { + @Override + public ByteBuf encode(Object in) throws IOException { + ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); + out.writeCharSequence(in.toString(), CharsetUtil.UTF_8); + return out; + } + }; + + public final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + String str = buf.toString(CharsetUtil.UTF_8); + buf.readerIndex(buf.readableBytes()); + try { + Thread.sleep(2500); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return str; + } + }; + + public SlowCodec() { + } + + public SlowCodec(ClassLoader classLoader) { + this(); + } + + + @Override + public Decoder getValueDecoder() { + return decoder; + } + + @Override + public Encoder getValueEncoder() { + return encoder; + } + + } + + @Test + public void testDecoderInExecutor() throws Exception { + Config config = new Config(); + config.setCodec(new SlowCodec()); + config.setReferenceEnabled(false); + config.setThreads(32); + config.setNettyThreads(8); + config.setDecodeInExecutor(true); + config.useSingleServer() + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + CountDownLatch latch = new CountDownLatch(16); + AtomicBoolean hasErrors = new AtomicBoolean(); + for (int i = 0; i < 16; i++) { + Thread t = new Thread() { + public void run() { + for (int i = 0; i < 10; i++) { + try { + redisson.getBucket("123").set("1"); + redisson.getBucket("123").get(); + if (hasErrors.get()) { + latch.countDown(); + return; + } + } catch (Exception e) { + e.printStackTrace(); + hasErrors.set(true); + } + + } + latch.countDown(); + }; + }; + t.start(); + } + + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(hasErrors).isFalse(); + + redisson.shutdown(); + } + + @Test public void testFailoverWithoutErrorsInCluster() throws Exception { RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); diff --git a/redisson/src/test/java/org/redisson/codec/TypedJsonJacksonCodecTest.java b/redisson/src/test/java/org/redisson/codec/TypedJsonJacksonCodecTest.java index ebc1b7439..d5e621339 100644 --- a/redisson/src/test/java/org/redisson/codec/TypedJsonJacksonCodecTest.java +++ b/redisson/src/test/java/org/redisson/codec/TypedJsonJacksonCodecTest.java @@ -75,7 +75,7 @@ public class TypedJsonJacksonCodecTest extends BaseTest { public void shouldDeserializeTheMapCorrectly() throws Exception { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); buf.writeBytes(new ObjectMapper().writeValueAsBytes(map)); - assertThat(mapCodec.getMapValueDecoder().decode(buf, new State(false))) + assertThat(mapCodec.getMapValueDecoder().decode(buf, new State())) .isInstanceOf(Map.class) .isEqualTo(map); buf.release(); @@ -97,7 +97,7 @@ public class TypedJsonJacksonCodecTest extends BaseTest { public void shouldDeserializeTheStringCorrectly() throws Exception { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); buf.writeBytes(new ObjectMapper().writeValueAsBytes("axk")); - assertThat(stringCodec.getMapValueDecoder().decode(buf, new State(false))) + assertThat(stringCodec.getMapValueDecoder().decode(buf, new State())) .isInstanceOf(String.class) .isEqualTo("axk"); buf.release();