diff --git a/redisson/src/main/java/org/redisson/client/RedisAskException.java b/redisson/src/main/java/org/redisson/client/RedisAskException.java index 2f7593faf..63138570b 100644 --- a/redisson/src/main/java/org/redisson/client/RedisAskException.java +++ b/redisson/src/main/java/org/redisson/client/RedisAskException.java @@ -15,6 +15,8 @@ */ package org.redisson.client; +import org.redisson.misc.RedisURI; + /** * * @author Nikita Koksharov @@ -24,7 +26,7 @@ public class RedisAskException extends RedisRedirectException { private static final long serialVersionUID = -6969734163155547631L; - public RedisAskException(int slot, String url) { + public RedisAskException(int slot, RedisURI url) { super(slot, url); } diff --git a/redisson/src/main/java/org/redisson/client/RedisMovedException.java b/redisson/src/main/java/org/redisson/client/RedisMovedException.java index 5c86e1f02..c7e0ebd3c 100644 --- a/redisson/src/main/java/org/redisson/client/RedisMovedException.java +++ b/redisson/src/main/java/org/redisson/client/RedisMovedException.java @@ -15,6 +15,8 @@ */ package org.redisson.client; +import org.redisson.misc.RedisURI; + /** * * @author Nikita Koksharov @@ -24,7 +26,7 @@ public class RedisMovedException extends RedisRedirectException { private static final long serialVersionUID = -6969734163155547631L; - public RedisMovedException(int slot, String url) { + public RedisMovedException(int slot, RedisURI url) { super(slot, url); } diff --git a/redisson/src/main/java/org/redisson/client/RedisRedirectException.java b/redisson/src/main/java/org/redisson/client/RedisRedirectException.java index cc37e00b3..2afdc94ea 100644 --- a/redisson/src/main/java/org/redisson/client/RedisRedirectException.java +++ b/redisson/src/main/java/org/redisson/client/RedisRedirectException.java @@ -29,9 +29,9 @@ public class RedisRedirectException extends RedisException { private final int slot; private final RedisURI url; - public RedisRedirectException(int slot, String url) { + public RedisRedirectException(int slot, RedisURI url) { this.slot = slot; - this.url = new RedisURI("redis://" + url); + this.url = url; } public int getSlot() { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index b85fc0a99..e2fbe5b76 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -44,6 +44,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; +import org.redisson.misc.RedisURI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,10 +68,10 @@ public class CommandDecoder extends ReplayingDecoder { private static final char LF = '\n'; private static final char ZERO = '0'; - final ExecutorService executor; + final String scheme; - public CommandDecoder(ExecutorService executor) { - this.executor = executor; + public CommandDecoder(String scheme) { + this.scheme = scheme; } @Override @@ -314,12 +315,12 @@ public class CommandDecoder extends ReplayingDecoder { String[] errorParts = error.split(" "); int slot = Integer.valueOf(errorParts[1]); String addr = errorParts[2]; - data.tryFailure(new RedisMovedException(slot, addr)); + data.tryFailure(new RedisMovedException(slot, new RedisURI(scheme + "://" + addr))); } else if (error.startsWith("ASK")) { String[] errorParts = error.split(" "); int slot = Integer.valueOf(errorParts[1]); String addr = errorParts[2]; - data.tryFailure(new RedisAskException(slot, addr)); + data.tryFailure(new RedisAskException(slot, new RedisURI(scheme + "://" + addr))); } else if (error.startsWith("TRYAGAIN")) { data.tryFailure(new RedisTryAgainException(error + ". channel: " + channel + " data: " + data)); diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index ad6657ada..87451f8f0 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -18,6 +18,7 @@ package org.redisson.client.handler; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import org.redisson.client.ChannelName; +import org.redisson.client.RedisClientConfig; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.StringCodec; @@ -51,11 +52,11 @@ public class CommandPubSubDecoder extends CommandDecoder { private final Map entries = new HashMap<>(); private final Map> commands = new ConcurrentHashMap<>(); - private final boolean keepOrder; - - public CommandPubSubDecoder(ExecutorService executor, boolean keepOrder) { - super(executor); - this.keepOrder = keepOrder; + private final RedisClientConfig config; + + public CommandPubSubDecoder(RedisClientConfig config) { + super(config.getAddress().getScheme()); + this.config = config; } public void addPubSubCommand(ChannelName channel, CommandData data) { @@ -96,7 +97,7 @@ public class CommandPubSubDecoder extends CommandDecoder { protected void decodeResult(CommandData data, List parts, Channel channel, Object result) throws IOException { try { - if (executor.isShutdown()) { + if (config.getExecutor().isShutdown()) { return; } } catch (IllegalStateException e) { @@ -123,14 +124,14 @@ public class CommandPubSubDecoder extends CommandDecoder { channelName = ((PubSubPatternMessage) result).getPattern(); } PubSubEntry entry = entries.remove(channelName); - if (keepOrder) { + if (config.isKeepAlive()) { enqueueMessage(result, pubSubConnection, entry); } } } - if (keepOrder) { + if (config.isKeepAlive()) { if (result instanceof PubSubPatternMessage) { channelName = ((PubSubPatternMessage) result).getPattern(); } @@ -139,7 +140,7 @@ public class CommandPubSubDecoder extends CommandDecoder { enqueueMessage(result, pubSubConnection, entry); } } else { - executor.execute(new Runnable() { + config.getExecutor().execute(new Runnable() { @Override public void run() { if (result instanceof PubSubStatusMessage) { @@ -168,7 +169,7 @@ public class CommandPubSubDecoder extends CommandDecoder { return; } - executor.execute(() -> { + config.getExecutor().execute(() -> { try { while (true) { Message result = entry.getQueue().poll(); diff --git a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java index 69dc1a61e..5e5311de4 100644 --- a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java +++ b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java @@ -95,9 +95,9 @@ public class RedisChannelInitializer extends ChannelInitializer { } if (type == Type.PLAIN) { - ch.pipeline().addLast(new CommandDecoder(config.getExecutor())); + ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme())); } else { - ch.pipeline().addLast(new CommandPubSubDecoder(config.getExecutor(), config.isKeepPubSubOrder())); + ch.pipeline().addLast(new CommandPubSubDecoder(config)); } ch.pipeline().addLast(new ErrorsLoggingHandler());