From 9d1fd5d610501d1bdb57d6d7bd2405b9870aceeb Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 26 Dec 2016 14:34:31 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedissonNode.java | 5 +-- .../client/handler/CommandDecoder.java | 5 +-- .../redisson/client/protocol/CommandData.java | 5 ++- .../client/protocol/CommandsData.java | 2 +- .../client/protocol/QueueCommand.java | 8 ----- .../client/protocol/RedisCommand.java | 6 ++++ .../client/protocol/RedisCommands.java | 8 ++++- .../redisson/command/CommandAsyncService.java | 26 +++++++------- .../MasterSlaveConnectionManager.java | 6 ++-- .../redisson/connection/MasterSlaveEntry.java | 16 +++++---- .../org/redisson/connection/SingleEntry.java | 9 ++--- .../balancer/LoadBalancerManager.java | 9 ++--- .../connection/pool/ConnectionPool.java | 34 +++++++++---------- .../connection/pool/PubSubConnectionPool.java | 6 ++++ 14 files changed, 78 insertions(+), 67 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index cb941788b..2ed91c3c8 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.client.RedisConnection; +import org.redisson.client.protocol.RedisCommands; import org.redisson.config.RedissonNodeConfig; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; @@ -146,7 +147,7 @@ public class RedissonNode { private void retrieveAdresses() { ConnectionManager connectionManager = ((Redisson)redisson).getConnectionManager(); for (MasterSlaveEntry entry : connectionManager.getEntrySet()) { - RFuture readFuture = entry.connectionReadOp(); + RFuture readFuture = entry.connectionReadOp(RedisCommands.PUBLISH); if (readFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout()) && readFuture.isSuccess()) { RedisConnection connection = readFuture.getNow(); @@ -155,7 +156,7 @@ public class RedissonNode { localAddress = (InetSocketAddress) connection.getChannel().localAddress(); return; } - RFuture writeFuture = entry.connectionWriteOp(); + RFuture writeFuture = entry.connectionWriteOp(RedisCommands.PUBLISH); if (writeFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout()) && writeFuture.isSuccess()) { RedisConnection connection = writeFuture.getNow(); diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index aa227cc66..b5024789f 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -36,6 +36,7 @@ import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.QueueCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; @@ -345,11 +346,11 @@ public class CommandDecoder extends ReplayingDecoder { String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); PubSubKey key = new PubSubKey(channelName, operation); CommandData d = pubSubChannels.get(key); - if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) { + if (Arrays.asList(RedisCommands.PSUBSCRIBE.getName(), RedisCommands.SUBSCRIBE.getName()).contains(d.getCommand().getName())) { pubSubChannels.remove(key); pubSubMessageDecoders.put(channelName, d.getMessageDecoder()); } - if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) { + if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) { pubSubChannels.remove(key); pubSubMessageDecoders.remove(channelName); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java index 053160c02..7c0ca6bf5 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java @@ -15,7 +15,6 @@ */ package org.redisson.client.protocol; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -91,14 +90,14 @@ public class CommandData implements QueueCommand { @Override public List> getPubSubOperations() { - if (PUBSUB_COMMANDS.contains(getCommand().getName())) { + if (RedisCommands.PUBSUB_COMMANDS.contains(getCommand().getName())) { return Collections.singletonList((CommandData)this); } return Collections.emptyList(); } public boolean isBlockingCommand() { - return QueueCommand.TIMEOUTLESS_COMMANDS.contains(command.getName()) && !promise.isDone(); + return RedisCommands.BLOCKING_COMMANDS.contains(command.getName()) && !promise.isDone(); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index 375e5c9eb..8620cb025 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -58,7 +58,7 @@ public class CommandsData implements QueueCommand { public List> getPubSubOperations() { List> result = new ArrayList>(); for (CommandData commandData : commands) { - if (PUBSUB_COMMANDS.equals(commandData.getCommand().getName())) { + if (RedisCommands.PUBSUB_COMMANDS.equals(commandData.getCommand().getName())) { result.add((CommandData)commandData); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java b/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java index 153ad7170..69e0745fc 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java +++ b/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java @@ -15,10 +15,7 @@ */ package org.redisson.client.protocol; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; /** * @@ -26,12 +23,7 @@ import java.util.Set; * */ public interface QueueCommand { - - Set PUBSUB_COMMANDS = new HashSet(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE")); - Set TIMEOUTLESS_COMMANDS = new HashSet(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(), - RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName())); - List> getPubSubOperations(); boolean tryFailure(Throwable cause); diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommand.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommand.java index 1d67de90e..06d16f45a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -22,6 +22,12 @@ import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.EmptyConvertor; import org.redisson.client.protocol.decoder.MultiDecoder; +/** + * + * @author Nikita Koksharov + * + * @param return type + */ public class RedisCommand { public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY, STRING} diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 6e7d9f3e6..8dbb828f6 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -16,6 +16,7 @@ package org.redisson.client.protocol; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -41,7 +42,6 @@ import org.redisson.client.protocol.convertor.TypeConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; -import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; @@ -183,6 +183,9 @@ public interface RedisCommands { RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); + Set BLOCKING_COMMANDS = new HashSet( + Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName())); + RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor(), 2); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); RedisStrictCommand PFMERGE = new RedisStrictCommand("PFMERGE", new VoidReplayConvertor()); @@ -292,6 +295,9 @@ public interface RedisCommands { RedisCommand PSUBSCRIBE = new RedisCommand("PSUBSCRIBE", new PubSubStatusDecoder()); RedisCommand PUNSUBSCRIBE = new RedisCommand("PUNSUBSCRIBE", new PubSubStatusDecoder()); + Set PUBSUB_COMMANDS = new HashSet( + Arrays.asList(PSUBSCRIBE.getName(), SUBSCRIBE.getName(), PUNSUBSCRIBE.getName(), UNSUBSCRIBE.getName())); + RedisStrictCommand> CLUSTER_NODES = new RedisStrictCommand>("CLUSTER", "NODES", new ClusterNodesDecoder()); RedisCommand TIME = new RedisCommand("TIME", new LongListObjectDecoder()); RedisStrictCommand> CLUSTER_INFO = new RedisStrictCommand>("CLUSTER", "INFO", new StringMapDataDecoder()); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 2d23afab0..5b3a2721c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,9 +30,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.RedisClientResult; +import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; import org.redisson.SlotCallback; import org.redisson.api.RFuture; +import org.redisson.api.RedissonClient; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisAskException; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; @@ -42,16 +47,20 @@ import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; -import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.ScoredEntry; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; -import org.redisson.connection.PubSubConnectionEntry; import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,17 +71,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.HashMap; -import java.util.Map; -import org.redisson.RedissonReference; -import org.redisson.api.RedissonClient; -import org.redisson.api.RedissonReactiveClient; -import org.redisson.client.protocol.ScoredEntry; -import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ScanObjectEntry; -import org.redisson.config.MasterSlaveServersConfig; -import org.redisson.misc.RedissonObjectFactory; /** * @@ -629,7 +627,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getTimeout().cancel(); long timeoutTime = connectionManager.getConfig().getTimeout(); - if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) { + if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) { Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); handleBlockingOperations(details, connection, popTimeout); if (popTimeout == 0) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 6b9c8d945..4ac86f6e1 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -680,7 +680,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (entry == null) { entry = getEntry(source); } - return entry.connectionWriteOp(); + return entry.connectionWriteOp(command); } private MasterSlaveEntry getEntry(NodeSource source) { @@ -707,9 +707,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { entry = getEntry(source.getSlot()); } if (source.getAddr() != null) { - return entry.connectionReadOp(source.getAddr()); + return entry.connectionReadOp(command, source.getAddr()); } - return entry.connectionReadOp(); + return entry.connectionReadOp(command); } RFuture nextPubSubConnection(int slot) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 07b9b1943..403600572 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -32,6 +32,8 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.CommandData; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; @@ -255,7 +257,7 @@ public class MasterSlaveEntry { return; } - RFuture newConnection = connectionReadOp(); + RFuture newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE); newConnection.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -387,16 +389,16 @@ public class MasterSlaveEntry { slaveBalancer.shutdownAsync(); } - public RFuture connectionWriteOp() { - return writeConnectionHolder.get(); + public RFuture connectionWriteOp(RedisCommand command) { + return writeConnectionHolder.get(command); } - public RFuture connectionReadOp() { - return slaveBalancer.nextConnection(); + public RFuture connectionReadOp(RedisCommand command) { + return slaveBalancer.nextConnection(command); } - public RFuture connectionReadOp(InetSocketAddress addr) { - return slaveBalancer.getConnection(addr); + public RFuture connectionReadOp(RedisCommand command, InetSocketAddress addr) { + return slaveBalancer.getConnection(command, addr); } RFuture nextPubSubConnection() { diff --git a/redisson/src/main/java/org/redisson/connection/SingleEntry.java b/redisson/src/main/java/org/redisson/connection/SingleEntry.java index 6ccf53ae5..c5b0c7a5a 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleEntry.java +++ b/redisson/src/main/java/org/redisson/connection/SingleEntry.java @@ -24,6 +24,7 @@ import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.pool.PubSubConnectionPool; @@ -82,13 +83,13 @@ public class SingleEntry extends MasterSlaveEntry { } @Override - public RFuture connectionReadOp(InetSocketAddress addr) { - return super.connectionWriteOp(); + public RFuture connectionReadOp(RedisCommand command, InetSocketAddress addr) { + return super.connectionWriteOp(command); } @Override - public RFuture connectionReadOp() { - return super.connectionWriteOp(); + public RFuture connectionReadOp(RedisCommand command) { + return super.connectionWriteOp(command); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index fcfc778aa..8142da4a8 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -23,6 +23,7 @@ import org.redisson.api.RFuture; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.protocol.RedisCommand; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; @@ -141,17 +142,17 @@ public class LoadBalancerManager { return pubSubConnectionPool.get(); } - public RFuture getConnection(InetSocketAddress addr) { + public RFuture getConnection(RedisCommand command, InetSocketAddress addr) { ClientConnectionsEntry entry = addr2Entry.get(addr); if (entry != null) { - return slaveConnectionPool.get(entry); + return slaveConnectionPool.get(command, entry); } RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr); return connectionManager.newFailedFuture(exception); } - public RFuture nextConnection() { - return slaveConnectionPool.get(); + public RFuture nextConnection(RedisCommand command) { + return slaveConnectionPool.get(command); } public void returnPubSubConnection(RedisPubSubConnection connection) { diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 4e30c1582..7ede38699 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -26,6 +26,7 @@ import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; @@ -158,19 +159,12 @@ abstract class ConnectionPool { return config.getLoadBalancer().getEntry(entries); } - public RFuture get() { + public RFuture get(RedisCommand command) { for (int j = entries.size() - 1; j >= 0; j--) { final ClientConnectionsEntry entry = getEntry(); if (!entry.isFreezed() && tryAcquireConnection(entry)) { - final RPromise result = connectionManager.newPromise(); - acquireConnection(entry, new Runnable() { - @Override - public void run() { - connectTo(entry, result); - } - }); - return result; + return acquireConnection(command, entry); } } @@ -196,17 +190,10 @@ abstract class ConnectionPool { return connectionManager.newFailedFuture(exception); } - public RFuture get(ClientConnectionsEntry entry) { + public RFuture get(RedisCommand command, ClientConnectionsEntry entry) { if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) && tryAcquireConnection(entry)) { - final RPromise result = connectionManager.newPromise(); - acquireConnection(entry, new Runnable() { - @Override - public void run() { - connectTo(entry, result); - } - }); - return result; + return acquireConnection(command, entry); } RedisConnectionException exception = new RedisConnectionException( @@ -214,6 +201,17 @@ abstract class ConnectionPool { return connectionManager.newFailedFuture(exception); } + private RFuture acquireConnection(RedisCommand command, ClientConnectionsEntry entry) { + final RPromise result = connectionManager.newPromise(); + acquireConnection(entry, new Runnable() { + @Override + public void run() { + connectTo(entry, result); + } + }); + return result; + } + protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { return entry.getFailedAttempts() < config.getFailedAttempts(); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index b3bef548c..859623a9b 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -17,6 +17,8 @@ package org.redisson.connection.pool; import org.redisson.api.RFuture; import org.redisson.client.RedisPubSubConnection; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; @@ -34,6 +36,10 @@ public class PubSubConnectionPool extends ConnectionPool super(config, connectionManager, masterSlaveEntry); } + public RFuture get() { + return get(RedisCommands.PUBLISH); + } + @Override protected RedisPubSubConnection poll(ClientConnectionsEntry entry) { return entry.pollSubscribeConnection();