refactoring

pull/748/head
Nikita 8 years ago
parent d9081f2b39
commit 9d1fd5d610

@ -23,6 +23,7 @@ import java.util.Map.Entry;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.RedissonNodeConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
@ -146,7 +147,7 @@ public class RedissonNode {
private void retrieveAdresses() {
ConnectionManager connectionManager = ((Redisson)redisson).getConnectionManager();
for (MasterSlaveEntry entry : connectionManager.getEntrySet()) {
RFuture<RedisConnection> readFuture = entry.connectionReadOp();
RFuture<RedisConnection> readFuture = entry.connectionReadOp(RedisCommands.PUBLISH);
if (readFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout())
&& readFuture.isSuccess()) {
RedisConnection connection = readFuture.getNow();
@ -155,7 +156,7 @@ public class RedissonNode {
localAddress = (InetSocketAddress) connection.getChannel().localAddress();
return;
}
RFuture<RedisConnection> writeFuture = entry.connectionWriteOp();
RFuture<RedisConnection> writeFuture = entry.connectionWriteOp(RedisCommands.PUBLISH);
if (writeFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout())
&& writeFuture.isSuccess()) {
RedisConnection connection = writeFuture.getNow();

@ -36,6 +36,7 @@ import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
@ -345,11 +346,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = pubSubChannels.get(key);
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
if (Arrays.asList(RedisCommands.PSUBSCRIBE.getName(), RedisCommands.SUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key);
pubSubMessageDecoders.remove(channelName);
}

@ -15,7 +15,6 @@
*/
package org.redisson.client.protocol;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -91,14 +90,14 @@ public class CommandData<T, R> implements QueueCommand {
@Override
public List<CommandData<Object, Object>> getPubSubOperations() {
if (PUBSUB_COMMANDS.contains(getCommand().getName())) {
if (RedisCommands.PUBSUB_COMMANDS.contains(getCommand().getName())) {
return Collections.singletonList((CommandData<Object, Object>)this);
}
return Collections.emptyList();
}
public boolean isBlockingCommand() {
return QueueCommand.TIMEOUTLESS_COMMANDS.contains(command.getName()) && !promise.isDone();
return RedisCommands.BLOCKING_COMMANDS.contains(command.getName()) && !promise.isDone();
}
}

@ -58,7 +58,7 @@ public class CommandsData implements QueueCommand {
public List<CommandData<Object, Object>> getPubSubOperations() {
List<CommandData<Object, Object>> result = new ArrayList<CommandData<Object, Object>>();
for (CommandData<?, ?> commandData : commands) {
if (PUBSUB_COMMANDS.equals(commandData.getCommand().getName())) {
if (RedisCommands.PUBSUB_COMMANDS.equals(commandData.getCommand().getName())) {
result.add((CommandData<Object, Object>)commandData);
}
}

@ -15,10 +15,7 @@
*/
package org.redisson.client.protocol;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*
@ -26,12 +23,7 @@ import java.util.Set;
*
*/
public interface QueueCommand {
Set<String> PUBSUB_COMMANDS = new HashSet<String>(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE"));
Set<String> TIMEOUTLESS_COMMANDS = new HashSet<String>(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(),
RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName()));
List<CommandData<Object, Object>> getPubSubOperations();
boolean tryFailure(Throwable cause);

@ -22,6 +22,12 @@ import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.EmptyConvertor;
import org.redisson.client.protocol.decoder.MultiDecoder;
/**
*
* @author Nikita Koksharov
*
* @param <R> return type
*/
public class RedisCommand<R> {
public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY, STRING}

@ -16,6 +16,7 @@
package org.redisson.client.protocol;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -41,7 +42,6 @@ import org.redisson.client.protocol.convertor.TypeConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
@ -183,6 +183,9 @@ public interface RedisCommands {
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
Set<String> BLOCKING_COMMANDS = new HashSet<String>(
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName()));
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");
RedisStrictCommand<Void> PFMERGE = new RedisStrictCommand<Void>("PFMERGE", new VoidReplayConvertor());
@ -292,6 +295,9 @@ public interface RedisCommands {
RedisCommand<Object> PSUBSCRIBE = new RedisCommand<Object>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder());
Set<String> PUBSUB_COMMANDS = new HashSet<String>(
Arrays.asList(PSUBSCRIBE.getName(), SUBSCRIBE.getName(), PUNSUBSCRIBE.getName(), UNSUBSCRIBE.getName()));
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder());
RedisCommand<Object> TIME = new RedisCommand<Object>("TIME", new LongListObjectDecoder());
RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder());

@ -20,7 +20,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -28,9 +30,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedisClientResult;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
@ -42,16 +47,20 @@ import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,17 +71,6 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.HashMap;
import java.util.Map;
import org.redisson.RedissonReference;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RedissonObjectFactory;
/**
*
@ -629,7 +627,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getTimeout().cancel();
long timeoutTime = connectionManager.getConfig().getTimeout();
if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) {
if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) {
Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
handleBlockingOperations(details, connection, popTimeout);
if (popTimeout == 0) {

@ -680,7 +680,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (entry == null) {
entry = getEntry(source);
}
return entry.connectionWriteOp();
return entry.connectionWriteOp(command);
}
private MasterSlaveEntry getEntry(NodeSource source) {
@ -707,9 +707,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry = getEntry(source.getSlot());
}
if (source.getAddr() != null) {
return entry.connectionReadOp(source.getAddr());
return entry.connectionReadOp(command, source.getAddr());
}
return entry.connectionReadOp();
return entry.connectionReadOp(command);
}
RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {

@ -32,6 +32,8 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
@ -255,7 +257,7 @@ public class MasterSlaveEntry {
return;
}
RFuture<RedisConnection> newConnection = connectionReadOp();
RFuture<RedisConnection> newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE);
newConnection.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -387,16 +389,16 @@ public class MasterSlaveEntry {
slaveBalancer.shutdownAsync();
}
public RFuture<RedisConnection> connectionWriteOp() {
return writeConnectionHolder.get();
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
return writeConnectionHolder.get(command);
}
public RFuture<RedisConnection> connectionReadOp() {
return slaveBalancer.nextConnection();
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
return slaveBalancer.nextConnection(command);
}
public RFuture<RedisConnection> connectionReadOp(InetSocketAddress addr) {
return slaveBalancer.getConnection(addr);
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
return slaveBalancer.getConnection(command, addr);
}
RFuture<RedisPubSubConnection> nextPubSubConnection() {

@ -24,6 +24,7 @@ import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.pool.PubSubConnectionPool;
@ -82,13 +83,13 @@ public class SingleEntry extends MasterSlaveEntry {
}
@Override
public RFuture<RedisConnection> connectionReadOp(InetSocketAddress addr) {
return super.connectionWriteOp();
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
return super.connectionWriteOp(command);
}
@Override
public RFuture<RedisConnection> connectionReadOp() {
return super.connectionWriteOp();
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
return super.connectionWriteOp(command);
}
@Override

@ -23,6 +23,7 @@ import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
@ -141,17 +142,17 @@ public class LoadBalancerManager {
return pubSubConnectionPool.get();
}
public RFuture<RedisConnection> getConnection(InetSocketAddress addr) {
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry != null) {
return slaveConnectionPool.get(entry);
return slaveConnectionPool.get(command, entry);
}
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception);
}
public RFuture<RedisConnection> nextConnection() {
return slaveConnectionPool.get();
public RFuture<RedisConnection> nextConnection(RedisCommand<?> command) {
return slaveConnectionPool.get(command);
}
public void returnPubSubConnection(RedisPubSubConnection connection) {

@ -26,6 +26,7 @@ import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
@ -158,19 +159,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
return config.getLoadBalancer().getEntry(entries);
}
public RFuture<T> get() {
public RFuture<T> get(RedisCommand<?> command) {
for (int j = entries.size() - 1; j >= 0; j--) {
final ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed()
&& tryAcquireConnection(entry)) {
final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override
public void run() {
connectTo(entry, result);
}
});
return result;
return acquireConnection(command, entry);
}
}
@ -196,17 +190,10 @@ abstract class ConnectionPool<T extends RedisConnection> {
return connectionManager.newFailedFuture(exception);
}
public RFuture<T> get(ClientConnectionsEntry entry) {
public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {
if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
&& tryAcquireConnection(entry)) {
final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override
public void run() {
connectTo(entry, result);
}
});
return result;
return acquireConnection(command, entry);
}
RedisConnectionException exception = new RedisConnectionException(
@ -214,6 +201,17 @@ abstract class ConnectionPool<T extends RedisConnection> {
return connectionManager.newFailedFuture(exception);
}
private RFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override
public void run() {
connectTo(entry, result);
}
});
return result;
}
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
return entry.getFailedAttempts() < config.getFailedAttempts();
}

@ -17,6 +17,8 @@ package org.redisson.connection.pool;
import org.redisson.api.RFuture;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
@ -34,6 +36,10 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
super(config, connectionManager, masterSlaveEntry);
}
public RFuture<RedisPubSubConnection> get() {
return get(RedisCommands.PUBLISH);
}
@Override
protected RedisPubSubConnection poll(ClientConnectionsEntry entry) {
return entry.pollSubscribeConnection();

Loading…
Cancel
Save