NodeAsync interface, Node.info, Node.getNode methods added. #420

pull/748/head
Nikita 8 years ago
parent 33660f317a
commit d45af58259

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -33,10 +34,17 @@ import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.URLBuilder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
* @param <N> node type
*/
public class RedisNodes<N extends Node> implements NodesGroup<N> {
final ConnectionManager connectionManager;
@ -45,6 +53,18 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
this.connectionManager = connectionManager;
}
@Override
public N getNode(String address) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
InetSocketAddress addr = URLBuilder.toAddress(address);
for (N node : clients) {
if (node.getAddr().equals(addr)) {
return node;
}
}
return null;
}
@Override
public Collection<N> getNodes(NodeType type) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();

@ -51,6 +51,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RPatternTopic;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RQueue;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RRemoteService;
@ -58,7 +59,6 @@ import org.redisson.api.RScheduledExecutorService;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScript;
import org.redisson.api.RSemaphore;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetMultimap;
@ -68,17 +68,16 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandSyncService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.internal.PlatformDependent;
import org.redisson.misc.RedissonObjectFactory;
/**
* Main infrastructure class allows to get access
@ -96,7 +95,6 @@ public class Redisson implements RedissonClient {
protected final QueueTransferService queueTransferService = new QueueTransferService();
protected final EvictionScheduler evictionScheduler;
protected final CommandExecutor commandExecutor;
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();
@ -112,8 +110,7 @@ public class Redisson implements RedissonClient {
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
codecProvider = config.getCodecProvider();
resolverProvider = config.getResolverProvider();
}
@ -123,7 +120,7 @@ public class Redisson implements RedissonClient {
}
public CommandExecutor getCommandExecutor() {
return commandExecutor;
return connectionManager.getCommandExecutor();
}
public ConnectionManager getConnectionManager() {
@ -188,252 +185,252 @@ public class Redisson implements RedissonClient {
@Override
public RBinaryStream getBinaryStream(String name) {
return new RedissonBinaryStream(commandExecutor, name);
return new RedissonBinaryStream(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<V>(commandExecutor, name);
return new RedissonGeo<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RGeo<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, commandExecutor, name);
return new RedissonGeo<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBucket<V> getBucket(String name) {
return new RedissonBucket<V>(commandExecutor, name);
return new RedissonBucket<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBucket<V> getBucket(String name, Codec codec) {
return new RedissonBucket<V>(codec, commandExecutor, name);
return new RedissonBucket<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RBuckets getBuckets() {
return new RedissonBuckets(this, commandExecutor);
return new RedissonBuckets(this, connectionManager.getCommandExecutor());
}
@Override
public RBuckets getBuckets(Codec codec) {
return new RedissonBuckets(this, codec, commandExecutor);
return new RedissonBuckets(this, codec, connectionManager.getCommandExecutor());
}
@Override
public <V> RHyperLogLog<V> getHyperLogLog(String name) {
return new RedissonHyperLogLog<V>(commandExecutor, name);
return new RedissonHyperLogLog<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RHyperLogLog<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLog<V>(codec, commandExecutor, name);
return new RedissonHyperLogLog<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RList<V> getList(String name) {
return new RedissonList<V>(commandExecutor, name);
return new RedissonList<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RList<V> getList(String name, Codec codec) {
return new RedissonList<V>(codec, commandExecutor, name);
return new RedissonList<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(commandExecutor, name);
return new RedissonListMultimap<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, commandExecutor, name);
return new RedissonListMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(this, commandExecutor, name, options);
return new RedissonLocalCachedMap<K, V>(this, connectionManager.getCommandExecutor(), name, options);
}
@Override
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(this, codec, commandExecutor, name, options);
return new RedissonLocalCachedMap<K, V>(this, codec, connectionManager.getCommandExecutor(), name, options);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(commandExecutor, name);
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(commandExecutor, name);
return new RedissonSetMultimap<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, commandExecutor, name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, commandExecutor, name);
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, commandExecutor, name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, commandExecutor, name);
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, commandExecutor, name);
return new RedissonSetMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RSetCache<V> getSetCache(String name) {
return new RedissonSetCache<V>(evictionScheduler, commandExecutor, name);
return new RedissonSetCache<V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RSetCache<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name);
return new RedissonSetCache<V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name);
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name);
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, commandExecutor, name);
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name, id);
return new RedissonLock(connectionManager.getCommandExecutor(), name, id);
}
@Override
public RLock getFairLock(String name) {
return new RedissonFairLock(commandExecutor, name, id);
return new RedissonFairLock(connectionManager.getCommandExecutor(), name, id);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(commandExecutor, name, id);
return new RedissonReadWriteLock(connectionManager.getCommandExecutor(), name, id);
}
@Override
public <V> RSet<V> getSet(String name) {
return new RedissonSet<V>(commandExecutor, name);
return new RedissonSet<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RSet<V> getSet(String name, Codec codec) {
return new RedissonSet<V>(codec, commandExecutor, name);
return new RedissonSet<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RScript getScript() {
return new RedissonScript(commandExecutor);
return new RedissonScript(connectionManager.getCommandExecutor());
}
@Override
public RScheduledExecutorService getExecutorService(String name) {
return new RedissonExecutorService(connectionManager.getCodec(), commandExecutor, this, name);
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name);
}
@Override
public RScheduledExecutorService getExecutorService(Codec codec, String name) {
return new RedissonExecutorService(codec, commandExecutor, this, name);
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name);
}
@Override
public RRemoteService getRemoteService() {
return new RedissonRemoteService(this, commandExecutor);
return new RedissonRemoteService(this, connectionManager.getCommandExecutor());
}
@Override
public RRemoteService getRemoteService(String name) {
return new RedissonRemoteService(this, name, commandExecutor);
return new RedissonRemoteService(this, name, connectionManager.getCommandExecutor());
}
@Override
public RRemoteService getRemoteService(Codec codec) {
return new RedissonRemoteService(codec, this, commandExecutor);
return new RedissonRemoteService(codec, this, connectionManager.getCommandExecutor());
}
@Override
public RRemoteService getRemoteService(String name, Codec codec) {
return new RedissonRemoteService(codec, this, name, commandExecutor);
return new RedissonRemoteService(codec, this, name, connectionManager.getCommandExecutor());
}
@Override
public <V> RSortedSet<V> getSortedSet(String name) {
return new RedissonSortedSet<V>(commandExecutor, name, this);
return new RedissonSortedSet<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RSortedSet<V> getSortedSet(String name, Codec codec) {
return new RedissonSortedSet<V>(codec, commandExecutor, name, this);
return new RedissonSortedSet<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(commandExecutor, name);
return new RedissonScoredSortedSet<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<V>(codec, commandExecutor, name);
return new RedissonScoredSortedSet<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RLexSortedSet getLexSortedSet(String name) {
return new RedissonLexSortedSet(commandExecutor, name);
return new RedissonLexSortedSet(connectionManager.getCommandExecutor(), name);
}
@Override
public <M> RTopic<M> getTopic(String name) {
return new RedissonTopic<M>(commandExecutor, name);
return new RedissonTopic<M>(connectionManager.getCommandExecutor(), name);
}
@Override
public <M> RTopic<M> getTopic(String name, Codec codec) {
return new RedissonTopic<M>(codec, commandExecutor, name);
return new RedissonTopic<M>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <M> RPatternTopic<M> getPatternTopic(String pattern) {
return new RedissonPatternTopic<M>(commandExecutor, pattern);
return new RedissonPatternTopic<M>(connectionManager.getCommandExecutor(), pattern);
}
@Override
public <M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopic<M>(codec, commandExecutor, pattern);
return new RedissonPatternTopic<M>(codec, connectionManager.getCommandExecutor(), pattern);
}
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name) {
return new RedissonBlockingFairQueue<V>(commandExecutor, name, semaphorePubSub, id);
return new RedissonBlockingFairQueue<V>(connectionManager.getCommandExecutor(), name, semaphorePubSub, id);
}
@Override
public <V> RBlockingFairQueue<V> getBlockingFairQueue(String name, Codec codec) {
return new RedissonBlockingFairQueue<V>(codec, commandExecutor, name, semaphorePubSub, id);
return new RedissonBlockingFairQueue<V>(codec, connectionManager.getCommandExecutor(), name, semaphorePubSub, id);
}
@Override
@ -441,102 +438,102 @@ public class Redisson implements RedissonClient {
if (destinationQueue == null) {
throw new NullPointerException();
}
return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), commandExecutor, destinationQueue.getName());
return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());
}
@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<V>(commandExecutor, name);
return new RedissonQueue<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RQueue<V> getQueue(String name, Codec codec) {
return new RedissonQueue<V>(codec, commandExecutor, name);
return new RedissonQueue<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<V>(commandExecutor, name);
return new RedissonBlockingQueue<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name);
return new RedissonBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name) {
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, commandExecutor, name);
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name, Codec codec) {
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, codec, commandExecutor, name);
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RDeque<V> getDeque(String name) {
return new RedissonDeque<V>(commandExecutor, name);
return new RedissonDeque<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RDeque<V> getDeque(String name, Codec codec) {
return new RedissonDeque<V>(codec, commandExecutor, name);
return new RedissonDeque<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name) {
return new RedissonBlockingDeque<V>(commandExecutor, name);
return new RedissonBlockingDeque<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec) {
return new RedissonBlockingDeque<V>(codec, commandExecutor, name);
return new RedissonBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name);
};
@Override
public RAtomicLong getAtomicLong(String name) {
return new RedissonAtomicLong(commandExecutor, name);
return new RedissonAtomicLong(connectionManager.getCommandExecutor(), name);
}
@Override
public RAtomicDouble getAtomicDouble(String name) {
return new RedissonAtomicDouble(commandExecutor, name);
return new RedissonAtomicDouble(connectionManager.getCommandExecutor(), name);
}
@Override
public RCountDownLatch getCountDownLatch(String name) {
return new RedissonCountDownLatch(commandExecutor, name, id);
return new RedissonCountDownLatch(connectionManager.getCommandExecutor(), name, id);
}
@Override
public RBitSet getBitSet(String name) {
return new RedissonBitSet(commandExecutor, name);
return new RedissonBitSet(connectionManager.getCommandExecutor(), name);
}
@Override
public RSemaphore getSemaphore(String name) {
return new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
return new RedissonSemaphore(connectionManager.getCommandExecutor(), name, semaphorePubSub);
}
public RPermitExpirableSemaphore getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub);
return new RedissonPermitExpirableSemaphore(connectionManager.getCommandExecutor(), name, semaphorePubSub);
}
@Override
public <V> RBloomFilter<V> getBloomFilter(String name) {
return new RedissonBloomFilter<V>(commandExecutor, name);
return new RedissonBloomFilter<V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBloomFilter<V> getBloomFilter(String name, Codec codec) {
return new RedissonBloomFilter<V>(codec, commandExecutor, name);
return new RedissonBloomFilter<V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RKeys getKeys() {
return new RedissonKeys(commandExecutor);
return new RedissonKeys(connectionManager.getCommandExecutor());
}
@Override
@ -603,7 +600,7 @@ public class Redisson implements RedissonClient {
}
protected void enableRedissonReferenceSupport() {
this.commandExecutor.enableRedissonReferenceSupport(this);
this.connectionManager.getCommandExecutor().enableRedissonReferenceSupport(this);
}
}

@ -25,8 +25,8 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}

@ -25,12 +25,15 @@ import java.util.Map;
*/
public interface ClusterNode extends Node {
// Use {@link #clusterInfo()}
@Deprecated
Map<String, String> info();
/**
* Execute CLUSTER INFO operation.
*
* @return Map extracted via each response line splitting
* by ':' symbol
* @return value mapped by field
*/
Map<String, String> info();
Map<String, String> clusterInfo();
}

@ -16,6 +16,7 @@
package org.redisson.api;
import java.net.InetSocketAddress;
import java.util.Map;
/**
* Redis node interface
@ -23,8 +24,12 @@ import java.net.InetSocketAddress;
* @author Nikita Koksharov
*
*/
public interface Node {
public interface Node extends NodeAsync {
enum InfoSection {ALL, DEFAULT, SERVER, CLIENTS, MEMORY, PERSISTENCE, STATS, REPLICATION, CPU, COMMANDSTATS, CLUSTER, KEYSPACE}
Map<String, String> info(InfoSection section);
/**
* Returns current Redis server time in seconds
*
@ -52,5 +57,5 @@ public interface Node {
* @return <code>true</code> if PONG received, <code>false</code> otherwise
*/
boolean ping();
}

@ -0,0 +1,38 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Map;
import org.redisson.api.Node.InfoSection;
/**
* Redis node interface
*
* @author Nikita Koksharov
*
*/
public interface NodeAsync {
RFuture<Map<String, String>> infoAsync(InfoSection section);
RFuture<Long> timeAsync();
RFuture<Boolean> pingAsync();
RFuture<Map<String, String>> clusterInfoAsync();
}

@ -43,7 +43,15 @@ public interface NodesGroup<N extends Node> {
void removeConnectionListener(int listenerId);
/**
* Get all nodes by type
* Get Redis node by address in format: <code>host:port</code>
*
* @param address
* @return
*/
N getNode(String address);
/**
* Get all Redis nodes by type
*
* @param type - type of node
* @return collection of nodes

@ -231,11 +231,7 @@ public class RedisClient {
return channels.close();
}
/**
* Execute INFO SERVER operation.
*
* @return Map extracted from each response line splitting by ':' symbol
*/
@Deprecated
public Map<String, String> serverInfo() {
try {
return serverInfoAsync().sync().get();
@ -244,15 +240,10 @@ public class RedisClient {
}
}
/**
* Asynchronously execute INFO SERVER operation.
*
* @return A future for a map extracted from each response line splitting by
* ':' symbol
*/
@Deprecated
public RFuture<Map<String, String>> serverInfoAsync() {
final RedisConnection connection = connect();
RFuture<Map<String, String>> async = connection.async(RedisCommands.SERVER_INFO);
RFuture<Map<String, String>> async = connection.async(RedisCommands.INFO_SERVER);
async.addListener(new FutureListener<Map<String, String>>() {
@Override
public void operationComplete(Future<Map<String, String>> future) throws Exception {

@ -34,12 +34,14 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor;
import org.redisson.client.protocol.convertor.LongListObjectDecoder;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.TypeConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
@ -128,6 +130,7 @@ public interface RedisCommands {
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY", new StringDataDecoder());
RedisStrictCommand<String> PING = new RedisStrictCommand<String>("PING");
RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor());
RedisStrictCommand<Void> UNWATCH = new RedisStrictCommand<Void>("UNWATCH", new VoidReplayConvertor());
RedisStrictCommand<Void> WATCH = new RedisStrictCommand<Void>("WATCH", new VoidReplayConvertor());
@ -290,7 +293,7 @@ public interface RedisCommands {
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder());
RedisStrictCommand<List<String>> TIME = new RedisStrictCommand<List<String>>("TIME", new StringListReplayDecoder());
RedisCommand<Object> TIME = new RedisCommand<Object>("TIME", new LongListObjectDecoder());
RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder());
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
@ -307,9 +310,16 @@ public interface RedisCommands {
RedisStrictCommand<Void> CLUSTER_SETSLOT = new RedisStrictCommand<Void>("CLUSTER", "SETSLOT");
RedisStrictCommand<Void> CLUSTER_MEET = new RedisStrictCommand<Void>("CLUSTER", "MEET");
RedisStrictCommand<Map<String, String>> INFO_KEYSPACE = new RedisStrictCommand<Map<String, String>>("INFO", "KEYSPACE", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_CLUSTER = new RedisStrictCommand<Map<String, String>>("INFO", "CLUSTER", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_ALL = new RedisStrictCommand<Map<String, String>>("INFO", "ALL", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_DEFAULT = new RedisStrictCommand<Map<String, String>>("INFO", "DEFAULT", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_SERVER = new RedisStrictCommand<Map<String, String>>("INFO", "SERVER", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_CLIENTS = new RedisStrictCommand<Map<String, String>>("INFO", "CLIENTS", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_MEMORY = new RedisStrictCommand<Map<String, String>>("INFO", "MEMORY", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_PERSISTENCE = new RedisStrictCommand<Map<String, String>>("INFO", "PERSISTENCE", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_STATS = new RedisStrictCommand<Map<String, String>>("INFO", "STATS", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_REPLICATION = new RedisStrictCommand<Map<String, String>>("INFO", "REPLICATION", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_PERSISTENCE = new RedisStrictCommand<Map<String, String>>("INFO", "persistence", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> SERVER_INFO = new RedisStrictCommand<Map<String, String>>("INFO", "SERVER", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_CPU = new RedisStrictCommand<Map<String, String>>("INFO", "CPU", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_COMMANDSTATS = new RedisStrictCommand<Map<String, String>>("INFO", "COMMANDSTATS", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_CLUSTER = new RedisStrictCommand<Map<String, String>>("INFO", "CLUSTER", new StringMapDataDecoder());
RedisStrictCommand<Map<String, String>> INFO_KEYSPACE = new RedisStrictCommand<Map<String, String>>("INFO", "KEYSPACE", new StringMapDataDecoder());
}

@ -0,0 +1,40 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
/**
*
* @author Nikita Koksharov
*
*/
public class LongListObjectDecoder extends ListFirstObjectDecoder {
@Override
public Object decode(List<Object> parts, State state) {
Object result = super.decode(parts, state);
if (result != null) {
return Long.valueOf(result.toString());
}
return result;
}
}

@ -13,12 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.decoder;
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;

@ -29,6 +29,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandSyncService;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
@ -45,6 +46,8 @@ import io.netty.util.TimerTask;
*/
public interface ConnectionManager {
CommandSyncService getCommandExecutor();
ExecutorService getExecutor();
URL getLastClusterNode();

@ -45,6 +45,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.command.CommandSyncService;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
@ -150,6 +151,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final boolean sharedEventLoopGroup;
private final boolean sharedExecutor;
private final CommandSyncService commandExecutor;
{
for (int i = 0; i < locks.length; i++) {
@ -204,11 +207,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.shutdownPromise = newPromise();
this.sharedEventLoopGroup = cfg.getEventLoopGroup() != null;
this.sharedExecutor = cfg.getExecutor() != null;
this.commandExecutor = new CommandSyncService(this);
}
public boolean isClusterMode() {
return false;
}
public CommandSyncService getCommandExecutor() {
return commandExecutor;
}
public IdleConnectionWatcher getConnectionWatcher() {
return connectionWatcher;
@ -318,12 +326,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public RedisClient createClient(NodeType type, String host, int port) {
RedisClient client = createClient(host, port, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
clients.add(new RedisClientEntry(client, this, type));
clients.add(new RedisClientEntry(client, commandExecutor, type));
return client;
}
public void shutdownAsync(RedisClient client) {
clients.remove(new RedisClientEntry(client, this, null));
clients.remove(new RedisClientEntry(client, commandExecutor, null));
client.shutdownAsync();
}

@ -16,28 +16,31 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import org.redisson.api.ClusterNode;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.RPromise;
import org.redisson.command.CommandSyncService;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisClientEntry implements ClusterNode {
private final RedisClient client;
private final ConnectionManager manager;
private final CommandSyncService commandExecutor;
private final NodeType type;
public RedisClientEntry(RedisClient client, ConnectionManager manager, NodeType type) {
public RedisClientEntry(RedisClient client, CommandSyncService commandExecutor, NodeType type) {
super();
this.client = client;
this.manager = manager;
this.commandExecutor = commandExecutor;
this.type = type;
}
@ -55,27 +58,13 @@ public class RedisClientEntry implements ClusterNode {
return client.getAddr();
}
private RedisConnection connect() {
RedisConnection c = client.connect();
RPromise<RedisConnection> future = manager.newPromise();
manager.getConnectListener().onConnect(future, c, null, manager.getConfig());
future.syncUninterruptibly();
return future.getNow();
public RFuture<Boolean> pingAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, null, RedisCommands.PING_BOOL);
}
@Override
public boolean ping() {
RedisConnection c = null;
try {
c = connect();
return "PONG".equals(c.sync(RedisCommands.PING));
} catch (Exception e) {
return false;
} finally {
if (c != null) {
c.closeAsync();
}
}
return commandExecutor.get(pingAsync());
}
@Override
@ -103,34 +92,64 @@ public class RedisClientEntry implements ClusterNode {
return true;
}
@Override
public RFuture<Long> timeAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.TIME);
}
@Override
public long time() {
RedisConnection c = null;
try {
c = connect();
List<String> parts = c.sync(RedisCommands.TIME);
return Long.valueOf(parts.get(0));
} catch (Exception e) {
throw new RedisException(e.getMessage(), e);
} finally {
if (c != null) {
c.closeAsync();
}
return commandExecutor.get(timeAsync());
}
@Override
public RFuture<Map<String, String>> clusterInfoAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO);
}
@Override
public Map<String, String> clusterInfo() {
return commandExecutor.get(clusterInfoAsync());
}
@Override
public Map<String, String> info(InfoSection section) {
return commandExecutor.get(infoAsync(section));
}
@Override
public RFuture<Map<String, String>> infoAsync(InfoSection section) {
if (section == InfoSection.ALL) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_ALL);
} else if (section == InfoSection.DEFAULT) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT);
} else if (section == InfoSection.SERVER) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
} else if (section == InfoSection.CLIENTS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS);
} else if (section == InfoSection.MEMORY) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY);
} else if (section == InfoSection.PERSISTENCE) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE);
} else if (section == InfoSection.STATS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_STATS);
} else if (section == InfoSection.REPLICATION) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION);
} else if (section == InfoSection.CPU) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CPU);
} else if (section == InfoSection.COMMANDSTATS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS);
} else if (section == InfoSection.CLUSTER) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER);
} else if (section == InfoSection.KEYSPACE) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE);
}
throw new IllegalStateException();
}
@Override
public Map<String, String> info() {
RedisConnection c = null;
try {
c = connect();
return c.sync(RedisCommands.CLUSTER_INFO);
} catch (Exception e) {
return null;
} finally {
if (c != null) {
c.closeAsync();
}
}
return clusterInfo();
}
}

@ -16,6 +16,7 @@
package org.redisson.misc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
@ -66,6 +67,21 @@ public class URLBuilder {
}
});
}
public static InetSocketAddress toAddress(String url) {
String[] parts = url.split(":");
if (parts.length-1 >= 3) {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
String host = url.replace(":" + port, "");
return new InetSocketAddress(host, Integer.valueOf(newPort));
} else {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
String host = url.replace(":" + port, "");
return new InetSocketAddress(host, Integer.valueOf(newPort));
}
}
public static URL create(String url) {
try {

@ -22,8 +22,8 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}

@ -12,11 +12,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
@ -778,7 +780,10 @@ public class RedisRunner {
public RedisVersion getRedisVersion() {
if (redisVersion == null) {
redisVersion = new RedisVersion(createRedisClientInstance().serverInfo().get("redis_version"));
RedisConnection c = createRedisClientInstance().connect();
Map<String, String> serverMap = c.sync(RedisCommands.INFO_SERVER);
redisVersion = new RedisVersion(serverMap.get("redis_version"));
c.closeAsync();
}
return redisVersion;
}

@ -1,6 +1,9 @@
package org.redisson;
import java.io.File;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.redisson.BaseTest.createInstance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
@ -11,9 +14,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
@ -22,6 +25,7 @@ import org.junit.Test;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodesGroup;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
@ -33,10 +37,6 @@ import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.connection.ConnectionListener;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.redisson.BaseTest.createInstance;
public class RedissonTest {
protected RedissonClient redisson;
@ -267,6 +267,53 @@ public class RedissonTest {
Assert.assertTrue(r.isShutdown());
}
@Test
public void testNode() {
Node node = redisson.getNodesGroup().getNode(RedisRunner.getDefaultRedisServerBindAddressAndPort());
assertThat(node).isNotNull();
}
@Test
public void testInfo() {
Node node = redisson.getNodesGroup().getNodes().iterator().next();
Map<String, String> allResponse = node.info(InfoSection.ALL);
assertThat(allResponse).containsKeys("redis_version", "connected_clients");
Map<String, String> defaultResponse = node.info(InfoSection.DEFAULT);
assertThat(defaultResponse).containsKeys("redis_version", "connected_clients");
Map<String, String> serverResponse = node.info(InfoSection.SERVER);
assertThat(serverResponse).containsKey("redis_version");
Map<String, String> clientsResponse = node.info(InfoSection.CLIENTS);
assertThat(clientsResponse).containsKey("connected_clients");
Map<String, String> memoryResponse = node.info(InfoSection.MEMORY);
assertThat(memoryResponse).containsKey("used_memory_human");
Map<String, String> persistenceResponse = node.info(InfoSection.PERSISTENCE);
assertThat(persistenceResponse).containsKey("rdb_last_save_time");
Map<String, String> statsResponse = node.info(InfoSection.STATS);
assertThat(statsResponse).containsKey("pubsub_patterns");
Map<String, String> replicationResponse = node.info(InfoSection.REPLICATION);
assertThat(replicationResponse).containsKey("repl_backlog_first_byte_offset");
Map<String, String> cpuResponse = node.info(InfoSection.CPU);
assertThat(cpuResponse).containsKey("used_cpu_sys");
Map<String, String> commandStatsResponse = node.info(InfoSection.COMMANDSTATS);
assertThat(commandStatsResponse).containsKey("cmdstat_flushall");
Map<String, String> clusterResponse = node.info(InfoSection.CLUSTER);
assertThat(clusterResponse).containsKey("cluster_enabled");
Map<String, String> keyspaceResponse = node.info(InfoSection.KEYSPACE);
assertThat(keyspaceResponse).isEmpty();
}
@Test
public void testTime() {
NodesGroup<Node> nodes = redisson.getNodesGroup();

Loading…
Cancel
Save