Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/RedissonReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java
#	redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java
pull/1303/head
Nikita 7 years ago
commit dd8ac6c1a2

@ -15,12 +15,17 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import org.redisson.client.RedisClient;
/**
*
* @author Nikita Koksharov
*
*/
public interface RedisClientResult {
void setRedisClient(InetSocketAddress addr);
void setRedisClient(RedisClient addr);
InetSocketAddress getRedisClient();
RedisClient getRedisClient();
}

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -34,10 +33,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry;
import org.redisson.misc.URIBuilder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.redisson.misc.URIBuilder;
/**
*
@ -56,10 +55,9 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public N getNode(String address) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
URI uri = URIBuilder.create(address);
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
URI addr = URIBuilder.create(address);
for (N node : clients) {
if (node.getAddr().equals(addr)) {
if (URIBuilder.compare(node.getAddr(), addr)) {
return node;
}
}

@ -70,13 +70,12 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandExecutor;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.remote.ResponseEntry;
@ -102,8 +101,7 @@ public class Redisson implements RedissonClient {
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();
protected final CodecProvider codecProvider;
protected final ResolverProvider resolverProvider;
protected final ReferenceCodecProvider codecProvider;
protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -116,8 +114,7 @@ public class Redisson implements RedissonClient {
connectionManager = ConfigSupport.createConnectionManager(configCopy);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
codecProvider = configCopy.getCodecProvider();
resolverProvider = configCopy.getResolverProvider();
codecProvider = configCopy.getReferenceCodecProvider();
}
public EvictionScheduler getEvictionScheduler() {
@ -154,7 +151,7 @@ public class Redisson implements RedissonClient {
*/
public static RedissonClient create(Config config) {
Redisson redisson = new Redisson(config);
if (config.isRedissonReferenceEnabled()) {
if (config.isReferenceEnabled()) {
redisson.enableRedissonReferenceSupport();
}
return redisson;
@ -182,7 +179,7 @@ public class Redisson implements RedissonClient {
*/
public static RedissonReactiveClient createReactive(Config config) {
RedissonReactive react = new RedissonReactive(config);
if (config.isRedissonReferenceEnabled()) {
if (config.isReferenceEnabled()) {
react.enableRedissonReferenceSupport();
}
return react;
@ -566,7 +563,7 @@ public class Redisson implements RedissonClient {
@Override
public RBatch createBatch() {
RedissonBatch batch = new RedissonBatch(id, evictionScheduler, connectionManager);
if (config.isRedissonReferenceEnabled()) {
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
@ -574,7 +571,7 @@ public class Redisson implements RedissonClient {
@Override
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache, codecProvider, resolverProvider);
return new RedissonLiveObjectService(this, liveObjectClassCache, codecProvider);
}
@Override
@ -594,15 +591,10 @@ public class Redisson implements RedissonClient {
}
@Override
public CodecProvider getCodecProvider() {
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}
@Override
public ResolverProvider getResolverProvider() {
return resolverProvider;
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);

@ -15,12 +15,12 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -38,7 +38,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<ByteBuf> lastValues;
private Iterator<ScanObjectEntry> lastIter;
protected long nextIterPos;
protected InetSocketAddress client;
protected RedisClient client;
private boolean finished;
private boolean currentElementRemoved;
@ -145,7 +145,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
return false;
}
abstract ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos);
abstract ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos);
@Override
public V next() {

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
@ -23,6 +22,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -42,7 +42,7 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> lastValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter;
protected long nextIterPos;
protected InetSocketAddress client;
protected RedisClient client;
private boolean finished;
private boolean currentElementRemoved;

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -31,6 +30,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
import org.redisson.api.RObject;
import org.redisson.api.RType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
@ -107,7 +107,7 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null);
}
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
@ -120,7 +120,7 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() {
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
}

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.ArrayList;
@ -53,7 +54,7 @@ import org.redisson.api.annotation.RCascade;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RFieldAccessor;
import org.redisson.api.annotation.RId;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.AccessorInterceptor;
import org.redisson.liveobject.core.FieldAccessorInterceptor;
@ -65,9 +66,9 @@ import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.liveobject.misc.AdvBeanCopy;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.liveobject.resolver.Resolver;
import io.netty.util.internal.PlatformDependent;
import jodd.bean.BeanCopy;
import jodd.bean.BeanUtil;
import net.bytebuddy.ByteBuddy;
@ -81,17 +82,16 @@ import net.bytebuddy.matcher.ElementMatchers;
public class RedissonLiveObjectService implements RLiveObjectService {
private static final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> providerCache = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Class<?>, Class<?>> classCache;
private final RedissonClient redisson;
private final CodecProvider codecProvider;
private final ResolverProvider resolverProvider;
private final ReferenceCodecProvider codecProvider;
private final RedissonObjectBuilder objectBuilder;
public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache, CodecProvider codecProvider, ResolverProvider resolverProvider) {
public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache, ReferenceCodecProvider codecProvider) {
this.redisson = redisson;
this.classCache = classCache;
this.codecProvider = codecProvider;
this.resolverProvider = resolverProvider;
this.objectBuilder = new RedissonObjectBuilder(redisson, codecProvider);
}
@ -114,12 +114,22 @@ public class RedissonLiveObjectService implements RLiveObjectService {
String idFieldName = getRIdFieldName(entityClass);
RId annotation = ClassUtils.getDeclaredField(entityClass, idFieldName)
.getAnnotation(RId.class);
Resolver resolver = resolverProvider.getResolver(entityClass,
annotation.generator(), annotation);
Resolver resolver = getResolver(entityClass, annotation.generator(), annotation);
Object id = resolver.resolve(entityClass, annotation, idFieldName, redisson);
return id;
}
private Resolver<?, ?, ?> getResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Annotation anno) {
if (!providerCache.containsKey(resolverClass)) {
try {
providerCache.putIfAbsent(resolverClass, resolverClass.newInstance());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return providerCache.get(resolverClass);
}
@Override
public <T, K> T get(Class<T> entityClass, K id) {
try {

@ -16,7 +16,6 @@
package org.redisson;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.AbstractCollection;
import java.util.AbstractSet;
import java.util.ArrayList;
@ -40,6 +39,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
@ -949,7 +949,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);

@ -37,6 +37,7 @@ import org.redisson.api.map.event.EntryExpiredListener;
import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.api.map.event.MapEntryListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
@ -1169,11 +1170,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
return get(scanIteratorAsync(name, client, startPos, pattern));
}
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorAsync(final String name, InetSocketAddress client, long startPos, String pattern) {
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -40,7 +41,7 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
private Iterator<V> valuesIter;
protected long valuesIterPos = 0;
protected InetSocketAddress client;
protected RedisClient client;
private boolean finished;
private boolean removeExecuted;

@ -33,6 +33,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMultimap;
import org.redisson.api.RReadWriteLock;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
@ -298,7 +299,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}

@ -52,7 +52,7 @@ import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandReactiveService;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
@ -97,7 +97,7 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final CodecProvider codecProvider;
protected final ReferenceCodecProvider codecProvider;
protected final UUID id = UUID.randomUUID();
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -109,7 +109,7 @@ public class RedissonReactive implements RedissonReactiveClient {
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getCodecProvider();
codecProvider = config.getReferenceCodecProvider();
}
@Override
@ -316,7 +316,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBatchReactive createBatch() {
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, commandExecutor);
if (config.isRedissonReferenceEnabled()) {
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
@ -333,7 +333,7 @@ public class RedissonReactive implements RedissonReactiveClient {
}
@Override
public CodecProvider getCodecProvider() {
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}

@ -16,7 +16,6 @@
package org.redisson;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -32,16 +31,14 @@ import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
@ -315,7 +312,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o));
}
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
return get(f);
}
@ -325,7 +322,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -29,6 +29,7 @@ import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
@ -89,7 +90,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) {
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
return get(f);
@ -104,7 +105,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern);
}

@ -29,6 +29,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
@ -123,12 +124,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
Arrays.<Object>asList(getName(o)), System.currentTimeMillis(), encode(o));
}
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) {
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos, pattern);
return get(f);
}
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, InetSocketAddress client, long startPos, String pattern) {
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();
params.add(startPos);
params.add(System.currentTimeMillis());
@ -160,7 +162,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern);
}

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -28,6 +27,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
@ -165,7 +165,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o));
}
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos, String pattern) {
private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
@ -201,7 +201,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern);
}

@ -15,8 +15,7 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -27,7 +26,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/
public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern);
ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern);
boolean remove(Object value);

@ -18,9 +18,8 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.liveobject.provider.ResolverProvider;
/**
* Main Redisson interface for access
@ -902,15 +901,7 @@ public interface RedissonClient {
*
* @return CodecProvider object
*/
public CodecProvider getCodecProvider();
/**
* Returns the ResolverProvider instance
*
* @return ResolverProvider object
*/
public ResolverProvider getResolverProvider();
public ReferenceCodecProvider getCodecProvider();
/**
* Get Redis nodes group for server operations

@ -18,7 +18,7 @@ package org.redisson.api;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
/**
@ -521,7 +521,7 @@ public interface RedissonReactiveClient {
*
* @return CodecProvider object
*/
CodecProvider getCodecProvider();
ReferenceCodecProvider getCodecProvider();
/**
* Get Redis nodes group for server operations

@ -20,13 +20,13 @@ import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer;
import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -34,12 +34,16 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
@ -53,9 +57,11 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedisClient {
private final AtomicReference<RFuture<InetSocketAddress>> resolveFuture = new AtomicReference<RFuture<InetSocketAddress>>();
private final Bootstrap bootstrap;
private final Bootstrap pubSubBootstrap;
private final InetSocketAddress addr;
private final URI addr;
private InetSocketAddress resolvedAddr;
private final ChannelGroup channels;
private ExecutorService executor;
@ -66,6 +72,7 @@ public class RedisClient {
private boolean hasOwnTimer;
private boolean hasOwnExecutor;
private boolean hasOwnGroup;
private boolean hasOwnResolver;
public static RedisClient create(RedisClientConfig config) {
return new RedisClient(config);
@ -85,12 +92,20 @@ public class RedisClient {
copy.setExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
hasOwnExecutor = true;
}
if (copy.getResolverGroup() == null) {
if (config.getSocketChannelClass() == EpollSocketChannel.class) {
copy.setResolverGroup(new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()));
} else {
copy.setResolverGroup(new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()));
}
hasOwnResolver = true;
}
this.config = copy;
this.executor = copy.getExecutor();
this.timer = copy.getTimer();
addr = new InetSocketAddress(copy.getAddress().getHost(), copy.getAddress().getPort());
addr = copy.getAddress();
channels = new DefaultChannelGroup(copy.getGroup().next());
bootstrap = createBootstrap(copy, Type.PLAIN);
@ -101,9 +116,9 @@ public class RedisClient {
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
.resolver(config.getResolverGroup())
.channel(config.getSocketChannelClass())
.group(config.getGroup())
.remoteAddress(addr);
.group(config.getGroup());
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
@ -112,92 +127,8 @@ public class RedisClient {
return bootstrap;
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(String address) {
this(URIBuilder.create(address));
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(URI address) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
hasOwnGroup = true;
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {
this(timer, executor, group, address.getHost(), address.getPort());
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(String host, int port) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
hasOwnGroup = true;
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int port) {
this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(String host, int port, int connectTimeout, int commandTimeout) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port,
int connectTimeout, int commandTimeout) {
RedisClientConfig config = new RedisClientConfig();
config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
.setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
this.config = config;
this.executor = config.getExecutor();
this.timer = config.getTimer();
addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());
channels = new DefaultChannelGroup(config.getGroup().next());
bootstrap = createBootstrap(config, Type.PLAIN);
pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
this.commandTimeout = config.getCommandTimeout();
}
public String getIpAddr() {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public InetSocketAddress getAddr() {
return addr;
return resolvedAddr;
}
public long getCommandTimeout() {
@ -220,9 +151,42 @@ public class RedisClient {
}
}
public RFuture<InetSocketAddress> resolveAddr() {
final RPromise<InetSocketAddress> promise = new RedissonPromise<InetSocketAddress>();
if (!resolveFuture.compareAndSet(null, promise)) {
return resolveFuture.get();
}
AddressResolver<InetSocketAddress> resolver = (AddressResolver<InetSocketAddress>) bootstrap.config().resolver().getResolver(bootstrap.config().group().next());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
resolvedAddr = future.getNow();
promise.trySuccess(future.getNow());
}
});
return promise;
}
public RFuture<RedisConnection> connectAsync() {
final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
ChannelFuture channelFuture = bootstrap.connect();
RFuture<InetSocketAddress> addrFuture = resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
f.tryFailure(future.cause());
return;
}
ChannelFuture channelFuture = bootstrap.connect(future.getNow());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
@ -255,6 +219,9 @@ public class RedisClient {
}
}
});
}
});
return f;
}
@ -268,7 +235,17 @@ public class RedisClient {
public RFuture<RedisPubSubConnection> connectPubSubAsync() {
final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>();
ChannelFuture channelFuture = pubSubBootstrap.connect();
RFuture<InetSocketAddress> nameFuture = resolveAddr();
nameFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
f.tryFailure(future.cause());
return;
}
ChannelFuture channelFuture = pubSubBootstrap.connect(future.getNow());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
@ -277,7 +254,7 @@ public class RedisClient {
c.<RedisPubSubConnection>getConnectionPromise().addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(final Future<RedisPubSubConnection> future) throws Exception {
bootstrap.config().group().execute(new Runnable() {
pubSubBootstrap.config().group().execute(new Runnable() {
@Override
public void run() {
if (future.isSuccess()) {
@ -293,7 +270,7 @@ public class RedisClient {
}
});
} else {
bootstrap.config().group().execute(new Runnable() {
pubSubBootstrap.config().group().execute(new Runnable() {
public void run() {
f.tryFailure(future.cause());
}
@ -301,6 +278,9 @@ public class RedisClient {
}
}
});
}
});
return f;
}
@ -339,6 +319,9 @@ public class RedisClient {
executor.awaitTermination(15, TimeUnit.SECONDS);
}
if (hasOwnResolver) {
bootstrap.config().resolver().close();
}
if (hasOwnGroup) {
bootstrap.config().group().shutdownGracefully();
}

@ -15,6 +15,7 @@
*/
package org.redisson.client;
import java.net.InetAddress;
import java.net.URI;
import java.util.concurrent.ExecutorService;
@ -24,6 +25,7 @@ import org.redisson.misc.URIBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.Timer;
/**
@ -34,10 +36,12 @@ import io.netty.util.Timer;
public class RedisClientConfig {
private URI address;
private InetAddress addr;
private Timer timer;
private ExecutorService executor;
private EventLoopGroup group;
private DnsAddressResolverGroup resolverGroup;
private Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;
private int connectTimeout = 10000;
private int commandTimeout = 10000;
@ -84,6 +88,7 @@ public class RedisClientConfig {
this.sslTruststorePassword = config.sslTruststorePassword;
this.sslKeystore = config.sslKeystore;
this.sslKeystorePassword = config.sslKeystorePassword;
this.resolverGroup = config.resolverGroup;
}
public RedisClientConfig setAddress(String host, int port) {
@ -94,6 +99,11 @@ public class RedisClientConfig {
this.address = URIBuilder.create(address);
return this;
}
public RedisClientConfig setAddress(InetAddress addr, URI address) {
this.addr = addr;
this.address = address;
return this;
}
public RedisClientConfig setAddress(URI address) {
this.address = address;
return this;
@ -101,7 +111,9 @@ public class RedisClientConfig {
public URI getAddress() {
return address;
}
public InetAddress getAddr() {
return addr;
}
public Timer getTimer() {
return timer;
@ -263,4 +275,14 @@ public class RedisClientConfig {
return this;
}
public DnsAddressResolverGroup getResolverGroup() {
return resolverGroup;
}
public RedisClientConfig setResolverGroup(DnsAddressResolverGroup resolverGroup) {
this.resolverGroup = resolverGroup;
return this;
}
}

@ -15,8 +15,8 @@
*/
package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
import org.redisson.misc.URIBuilder;
/**
@ -44,8 +44,4 @@ public class RedisRedirectException extends RedisException {
return url;
}
public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
}
}

@ -15,6 +15,7 @@
*/
package org.redisson.client.handler;
import java.net.SocketAddress;
import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -109,7 +110,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
try {
bootstrap.connect().addListener(new ChannelFutureListener() {
bootstrap.connect(connection.getRedisClient().getAddr()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.List;
import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
/**
*
@ -30,7 +31,7 @@ public class ListScanResult<V> implements RedisClientResult {
private final Long pos;
private final List<V> values;
private InetSocketAddress addr;
private RedisClient client;
public ListScanResult(Long pos, List<V> values) {
this.pos = pos;
@ -46,12 +47,12 @@ public class ListScanResult<V> implements RedisClientResult {
}
@Override
public void setRedisClient(InetSocketAddress addr) {
this.addr = addr;
public void setRedisClient(RedisClient client) {
this.client = client;
}
public InetSocketAddress getRedisClient() {
return addr;
public RedisClient getRedisClient() {
return client;
}
}

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.Map;
import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
/**
*
@ -31,7 +32,7 @@ public class MapScanResult<K, V> implements RedisClientResult {
private final Long pos;
private final Map<K, V> values;
private InetSocketAddress client;
private RedisClient client;
public MapScanResult(Long pos, Map<K, V> values) {
super();
@ -48,11 +49,11 @@ public class MapScanResult<K, V> implements RedisClientResult {
}
@Override
public void setRedisClient(InetSocketAddress client) {
public void setRedisClient(RedisClient client) {
this.client = client;
}
public InetSocketAddress getRedisClient() {
public RedisClient getRedisClient() {
return client;
}

@ -52,6 +52,7 @@ import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -111,7 +112,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<RFuture<Collection<RFuture<Void>>>> futures = new ArrayList<RFuture<Collection<RFuture<Void>>>>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
failedMasters.add(partition.getMasterAddr().toString());
failedMasters.add(partition.getMasterAddress().toString());
continue;
}
RFuture<Collection<RFuture<Void>>> masterFuture = addMasterEntry(partition, cfg);
@ -271,12 +272,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
RFuture<Void> f = e.setupMasterEntry(config.getMasterAddress());
final RPromise<Void> initFuture = newPromise();
RFuture<RedisClient> f = e.setupMasterEntry(config.getMasterAddress());
final RPromise<Void> initFuture = new RedissonPromise<Void>();
futures.add(initFuture);
f.addListener(new FutureListener<Void>() {
f.addListener(new FutureListener<RedisClient>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
public void operationComplete(Future<RedisClient> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
initFuture.tryFailure(future.cause());
@ -393,8 +394,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
masterFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
checkSlotsMigration(newPartitions, nodesValue.toString());
checkSlotsChange(cfg, newPartitions, nodesValue.toString());
checkSlotsMigration(newPartitions);
checkSlotsChange(cfg, newPartitions);
getShutdownLatch().release();
scheduleClusterChangeCheck(cfg, null);
}
@ -410,7 +411,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue;
}
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
MasterSlaveEntry entry = getEntry(currentPart.getSlots().iterator().next());
// should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive?
@ -562,7 +563,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions, String nodes) {
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
Collection<Integer> newPartitionsSlots = slots(newPartitions);
if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
return;
@ -576,8 +577,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
for (Integer slot : removedSlots) {
MasterSlaveEntry entry = getEntry(slot);
removeMaster(slot);
MasterSlaveEntry entry = removeEntry(slot);
if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr());
@ -592,50 +592,56 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
for (final Integer slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot);
MasterSlaveEntry entry = getEntry(partition.getMasterAddr());
if (entry != null && entry.getClient().getAddr().equals(partition.getMasterAddr())) {
Set<Integer> oldSlots = new HashSet<Integer>(partition.getSlots());
oldSlots.removeAll(addedSlots);
if (oldSlots.isEmpty()) {
continue;
}
MasterSlaveEntry entry = getEntry(oldSlots.iterator().next());
if (entry != null) {
addEntry(slot, entry);
lastPartitions.put(slot, partition);
break;
}
}
}
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions, String nodes) {
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
Set<ClusterPartition> currentPartitions = getLastPartitions();
for (ClusterPartition currentPartition : currentPartitions) {
for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())
// skip master change case
|| !currentPartition.getMasterAddr().equals(newPartition.getMasterAddr())) {
|| !currentPartition.getMasterAddress().equals(newPartition.getMasterAddress())) {
continue;
}
MasterSlaveEntry entry = getEntry(currentPartition.getSlots().iterator().next());
Set<Integer> addedSlots = new HashSet<Integer>(newPartition.getSlots());
addedSlots.removeAll(currentPartition.getSlots());
currentPartition.addSlots(addedSlots);
MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr());
for (Integer slot : addedSlots) {
addEntry(slot, entry);
lastPartitions.put(slot, currentPartition);
}
if (!addedSlots.isEmpty()) {
log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddr());
log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress());
}
Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots());
removedSlots.removeAll(newPartition.getSlots());
for (Integer removeSlot : removedSlots) {
if (lastPartitions.remove(removeSlot, currentPartition)) {
removeMaster(removeSlot);
removeEntry(removeSlot);
}
}
currentPartition.removeSlots(removedSlots);
if (!removedSlots.isEmpty()) {
log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddr());
log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddress());
}
break;
}

@ -15,7 +15,6 @@
*/
package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
@ -106,10 +105,6 @@ public class ClusterPartition {
return slots;
}
public InetSocketAddress getMasterAddr() {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
}
public URI getMasterAddress() {
return masterAddress;
}

@ -27,7 +27,7 @@ import org.redisson.liveobject.misc.ClassUtils;
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class DefaultCodecProvider implements CodecProvider {
public class DefaultReferenceCodecProvider implements ReferenceCodecProvider {
public transient final ConcurrentMap<Class<? extends Codec>, Codec> codecCache = PlatformDependent.newConcurrentHashMap();

@ -24,7 +24,7 @@ import org.redisson.api.annotation.RObjectField;
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public interface CodecProvider {
public interface ReferenceCodecProvider {
/**
* Get codec instance by its class.

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
@ -56,11 +57,11 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
@ -68,20 +69,16 @@ public interface CommandAsyncExecutor {
<R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> RFuture<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
@ -96,9 +93,6 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object ... params);
}

@ -15,7 +15,6 @@
*/
package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
@ -38,6 +37,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
@ -180,20 +180,27 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object... params) {
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
int slot = connectionManager.calcSlot(name);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise();
@ -344,12 +351,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
public <T, R> RFuture<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
@ -357,13 +358,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
@Override
public <T, R> RFuture<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object... params) {
return readAsync(key, connectionManager.getCodec(), command, params);
@ -381,13 +375,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
int slot = connectionManager.calcSlot(key);
public <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
int slot = connectionManager.calcSlot(name);
return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params);
}
@ -401,10 +390,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params);
}
public <T, R> RFuture<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
return evalAllAsync(false, command, callback, script, keys, params);
@ -805,7 +790,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(),
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
return;
@ -813,7 +798,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(),
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
return;
@ -844,11 +829,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.isSuccess()) {
R res = future.getNow();
if (res instanceof RedisClientResult) {
InetSocketAddress addr = source.getAddr();
if (addr == null) {
addr = details.getConnectionFuture().getNow().getRedisClient().getAddr();
}
((RedisClientResult) res).setRedisClient(addr);
((RedisClientResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
}
if (isRedissonReferenceSupportEnabled()) {

@ -371,14 +371,14 @@ public class CommandBatchService extends CommandAsyncService {
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED);
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK);
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return;
}

@ -15,7 +15,6 @@
*/
package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
@ -23,6 +22,7 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.MasterSlaveEntry;
@ -36,9 +36,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Publisher<R> reactive(Supplier<RFuture<R>> supplier);
<T, R> Publisher<R> evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object ... params);
<T, R> Publisher<R> evalWriteAllReactive(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
<T, R> Publisher<Collection<R>> readAllReactive(RedisCommand<T> command, Object ... params);
@ -51,7 +48,7 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R, T> Publisher<R> writeAllReactive(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> Publisher<R> readReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> readReactive(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> evalWriteReactive(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

@ -15,7 +15,6 @@
*/
package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
@ -23,6 +22,7 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
@ -90,11 +90,11 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
}
@Override
public <T, R> Publisher<R> readReactive(final InetSocketAddress client, final String key, final Codec codec, final RedisCommand<T> command, final Object ... params) {
public <T, R> Publisher<R> readReactive(final RedisClient client, final String name, final Codec codec, final RedisCommand<T> command, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return readAsync(client, key, codec, command, params);
return readAsync(client, name, codec, command, params);
};
});
}
@ -150,18 +150,6 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
});
}
@Override
public <T, R> Publisher<R> evalReadReactive(final InetSocketAddress client, final String key, final Codec codec, final RedisCommand<T> evalCommandType,
final String script, final List<Object> keys, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return evalReadAsync(client, key, codec, evalCommandType, script, keys, params);
};
});
}
@Override
public <T, R> Publisher<R> evalWriteReactive(final String key, final Codec codec, final RedisCommand<T> evalCommandType,
final String script, final List<Object> keys, final Object... params) {

@ -32,8 +32,6 @@ public interface CommandSyncExecutor {
<V> V get(RFuture<V> future);
<T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, RedisCommand<T> command, Object ... params);
@ -46,10 +44,6 @@ public interface CommandSyncExecutor {
<T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params);
<T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

@ -49,18 +49,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
return get(res);
}
@Override
public <T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params) {
RFuture<R> res = readAsync(client, key, connectionManager.getCodec(), command, params);
return get(res);
}
@Override
public <T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
RFuture<R> res = readAsync(client, key, codec, command, params);
return get(res);
}
@Override
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
@ -83,12 +71,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
return get(res);
}
@Override
public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
RFuture<R> res = writeAsync(slot, codec, command, params);
return get(res);
}
@Override
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
RFuture<R> res = writeAsync(key, codec, command, params);

@ -23,13 +23,11 @@ import java.net.URL;
import java.util.concurrent.ExecutorService;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.DefaultCodecProvider;
import org.redisson.codec.DefaultReferenceCodecProvider;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ReplicatedConnectionManager;
import org.redisson.liveobject.provider.DefaultResolverProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import io.netty.channel.EventLoopGroup;
@ -70,12 +68,7 @@ public class Config {
/**
* For codec registry and look up. DefaultCodecProvider used by default
*/
private CodecProvider codecProvider = new DefaultCodecProvider();
/**
* For resolver registry and look up. DefaultResolverProvider used by default
*/
private ResolverProvider resolverProvider = new DefaultResolverProvider();
private ReferenceCodecProvider referenceCodecProvider = new DefaultReferenceCodecProvider();
private ExecutorService executor;
@ -83,7 +76,7 @@ public class Config {
* Config option for enabling Redisson Reference feature.
* Default value is TRUE
*/
private boolean redissonReferenceEnabled = true;
private boolean referenceEnabled = true;
private boolean useLinuxNativeEpoll;
@ -110,9 +103,8 @@ public class Config {
setNettyThreads(oldConf.getNettyThreads());
setThreads(oldConf.getThreads());
setCodec(oldConf.getCodec());
setCodecProvider(oldConf.getCodecProvider());
setResolverProvider(oldConf.getResolverProvider());
setRedissonReferenceEnabled(oldConf.redissonReferenceEnabled);
setReferenceCodecProvider(oldConf.getReferenceCodecProvider());
setReferenceEnabled(oldConf.isReferenceEnabled());
setEventLoopGroup(oldConf.getEventLoopGroup());
if (oldConf.getSingleServerConfig() != null) {
setSingleServerConfig(new SingleServerConfig(oldConf.getSingleServerConfig()));
@ -156,14 +148,15 @@ public class Config {
}
/**
* For codec registry and look up. DefaultCodecProvider used by default.
* Reference objects codec provider used for codec registry and look up.
* <code>org.redisson.codec.DefaultReferenceCodecProvider</code> used by default.
*
* @param codecProvider object
* @return config
* @see org.redisson.codec.CodecProvider
* @see org.redisson.codec.ReferenceCodecProvider
*/
public Config setCodecProvider(CodecProvider codecProvider) {
this.codecProvider = codecProvider;
public Config setReferenceCodecProvider(ReferenceCodecProvider codecProvider) {
this.referenceCodecProvider = codecProvider;
return this;
}
@ -172,28 +165,8 @@ public class Config {
*
* @return CodecProvider
*/
public CodecProvider getCodecProvider() {
return codecProvider;
}
/**
* For resolver registry and look up. DefaultResolverProvider used by default.
*
* @param resolverProvider object
* @return this
*/
public Config setResolverProvider(ResolverProvider resolverProvider) {
this.resolverProvider = resolverProvider;
return this;
}
/**
* Returns the ResolverProvider instance
*
* @return resolverProvider
*/
public ResolverProvider getResolverProvider() {
return resolverProvider;
public ReferenceCodecProvider getReferenceCodecProvider() {
return referenceCodecProvider;
}
/**
@ -203,8 +176,8 @@ public class Config {
*
* @return <code>true</code> if Redisson Reference feature enabled
*/
public boolean isRedissonReferenceEnabled() {
return redissonReferenceEnabled;
public boolean isReferenceEnabled() {
return referenceEnabled;
}
/**
@ -214,8 +187,8 @@ public class Config {
*
* @param redissonReferenceEnabled flag
*/
public void setRedissonReferenceEnabled(boolean redissonReferenceEnabled) {
this.redissonReferenceEnabled = redissonReferenceEnabled;
public void setReferenceEnabled(boolean redissonReferenceEnabled) {
this.referenceEnabled = redissonReferenceEnabled;
}
/**

@ -29,7 +29,7 @@ import java.util.List;
import org.redisson.api.RedissonNodeInitializer;
import org.redisson.client.codec.Codec;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ElasticacheConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
@ -37,7 +37,6 @@ import org.redisson.connection.ReplicatedConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.connection.balancer.LoadBalancer;
import org.redisson.liveobject.provider.ResolverProvider;
import com.fasterxml.jackson.annotation.JsonFilter;
import com.fasterxml.jackson.annotation.JsonIgnore;
@ -271,8 +270,7 @@ public class ConfigSupport {
mapper.addMixIn(MasterSlaveServersConfig.class, MasterSlaveServersConfigMixIn.class);
mapper.addMixIn(SingleServerConfig.class, SingleSeverConfigMixIn.class);
mapper.addMixIn(Config.class, ConfigMixIn.class);
mapper.addMixIn(CodecProvider.class, ClassMixIn.class);
mapper.addMixIn(ResolverProvider.class, ClassMixIn.class);
mapper.addMixIn(ReferenceCodecProvider.class, ClassMixIn.class);
mapper.addMixIn(Codec.class, ClassMixIn.class);
mapper.addMixIn(RedissonNodeInitializer.class, ClassMixIn.class);
mapper.addMixIn(LoadBalancer.class, ClassMixIn.class);

@ -53,7 +53,7 @@ public class ConnectionEventsHub {
}
public void fireDisconnect(InetSocketAddress addr) {
if (maps.get(addr) == Status.DISCONNECTED) {
if (addr == null || maps.get(addr) == Status.DISCONNECTED) {
return;
}

@ -102,7 +102,7 @@ public interface ConnectionManager {
RedisClient createClient(NodeType type, URI address);
MasterSlaveEntry getEntry(InetSocketAddress addr);
MasterSlaveEntry getEntry(RedisClient redisClient);
PubSubConnectionEntry getPubSubEntry(String channelName);

@ -17,9 +17,7 @@ package org.redisson.connection;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -29,35 +27,31 @@ import io.netty.util.concurrent.FutureListener;
* @author Nikita Koksharov
*
*/
public class CountListener implements FutureListener<Void> {
public class CountableListener<T> implements FutureListener<Object> {
private final RPromise<Void> res;
protected final AtomicInteger counter = new AtomicInteger();
protected final RPromise<T> result;
protected final T value;
private final AtomicInteger counter;
public static RPromise<Void> create(RFuture<Void>... futures) {
RPromise<Void> result = new RedissonPromise<Void>();
FutureListener<Void> listener = new CountListener(result, futures.length);
for (RFuture<Void> future : futures) {
future.addListener(listener);
}
return result;
public CountableListener(RPromise<T> result, T value) {
super();
this.result = result;
this.value = value;
}
public CountListener(RPromise<Void> res, int amount) {
super();
this.res = res;
this.counter = new AtomicInteger(amount);
public void incCounter() {
counter.incrementAndGet();
}
@Override
public void operationComplete(Future<Void> future) throws Exception {
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
res.tryFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
res.trySuccess(null);
result.trySuccess(value);
}
}

@ -18,21 +18,20 @@ package org.redisson.connection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -48,24 +47,20 @@ public class DNSMonitor {
private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class);
private DnsAddressResolverGroup resolverGroup = new DnsAddressResolverGroup(NioDatagramChannel.class, DefaultDnsServerAddressStreamProvider.INSTANCE);
private ScheduledFuture<?> dnsMonitorFuture;
private ConnectionManager connectionManager;
private final AddressResolver<InetSocketAddress> resolver;
private final ConnectionManager connectionManager;
private final Map<URI, InetAddress> masters = new HashMap<URI, InetAddress>();
private final Map<URI, InetAddress> slaves = new HashMap<URI, InetAddress>();
private ScheduledFuture<?> dnsMonitorFuture;
private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, Set<URI> masterHosts, Set<URI> slaveHosts, long dnsMonitoringInterval) {
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
for (URI host : masterHosts) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0));
resolveFuture.syncUninterruptibly();
masters.put(host, resolveFuture.getNow().getAddress());
}
public DNSMonitor(ConnectionManager connectionManager, InetSocketAddress masterHost, Collection<URI> slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) {
this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
URI uri = URIBuilder.create("redis://" + masterHost.getAddress().getHostAddress() + ":" + masterHost.getPort());
masters.put(uri, masterHost.getAddress());
for (URI host : slaveHosts) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0));
resolveFuture.syncUninterruptibly();
@ -90,7 +85,6 @@ public class DNSMonitor {
dnsMonitorFuture = connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
final AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size());
for (final Entry<URI, InetAddress> entry : masters.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0));
@ -109,15 +103,15 @@ public class DNSMonitor {
InetAddress master = entry.getValue();
InetAddress now = future.get().getAddress();
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress());
log.info("Detected DNS change. Master {} has changed ip from {} to {}", entry.getKey(), master.getHostAddress(), now.getHostAddress());
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) {
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost())
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) {
entrySet.changeMaster(entry.getKey());
break;
}
}
masters.put(entry.getKey(), now);
log.info("Master {} has been changed", entry.getKey().getHost());
}
}
});
@ -138,17 +132,29 @@ public class DNSMonitor {
}
InetAddress slave = entry.getValue();
InetAddress updatedSlave = future.get().getAddress();
final InetAddress updatedSlave = future.get().getAddress();
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) {
masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER);
log.info("Detected DNS change. Slave {} has changed ip from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
final URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.hasSlave(uri)) {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(entry.getKey());
addFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + updatedSlave, future.cause());
return;
}
masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER);
}
});
break;
}
}
slaves.put(entry.getKey(), updatedSlave);
log.info("Slave {} has been changed", entry.getKey().getHost());
}
}
});

@ -54,16 +54,21 @@ import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
@ -117,9 +122,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected Codec codec;
protected EventLoopGroup group;
protected final EventLoopGroup group;
protected Class<? extends SocketChannel> socketChannelClass;
protected final Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
@ -130,7 +135,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveServersConfig config;
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<MasterSlaveEntry>(MAX_SLOT);
private final Map<InetSocketAddress, MasterSlaveEntry> addr2entry = PlatformDependent.newConcurrentHashMap();
private final Map<RedisClient, MasterSlaveEntry> client2entry = PlatformDependent.newConcurrentHashMap();
private final RPromise<Boolean> shutdownPromise;
@ -152,6 +157,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final Config cfg;
private final DnsAddressResolverGroup resolverGroup;
{
for (int i = 0; i < locks.length; i++) {
locks[i] = new AsyncSemaphore(1);
@ -176,6 +183,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
this.socketChannelClass = EpollSocketChannel.class;
this.resolverGroup = new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
} else {
if (cfg.getEventLoopGroup() == null) {
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
@ -184,14 +192,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
this.socketChannelClass = NioSocketChannel.class;
// if (cfg.getEventLoopGroup() == null) {
// this.group = new OioEventLoopGroup(cfg.getThreads());
// } else {
// this.group = cfg.getEventLoopGroup();
// }
//
// this.socketChannelClass = OioSocketChannel.class;
this.resolverGroup = new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
if (cfg.getExecutor() == null) {
int threads = Runtime.getRuntime().availableProcessors() * 2;
if (cfg.getThreads() != 0) {
@ -236,7 +239,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public Collection<MasterSlaveEntry> getEntrySet() {
return addr2entry.values();
return client2entry.values();
}
protected void initTimer(MasterSlaveServersConfig config) {
@ -273,19 +276,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(slots, this, config);
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} else {
entry = createMasterSlaveEntry(config, slots);
}
RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
addEntry(slot, entry);
}
InetSocketAddress masterHost = f.getNow().resolveAddr().syncUninterruptibly().getNow();
if (config.getDnsMonitoringInterval() != -1) {
dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),
config.getSlaveAddresses(), config.getDnsMonitoringInterval());
dnsMonitor = new DNSMonitor(this, masterHost,
config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup);
dnsMonitor.start();
}
} catch (RuntimeException e) {
@ -301,8 +305,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
return entry;
}
@ -367,6 +369,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
redisConfig.setAddress(address)
.setTimer(timer)
.setExecutor(executor)
.setResolverGroup(resolverGroup)
.setGroup(group)
.setSocketChannelClass(socketChannelClass)
.setConnectTimeout(timeout)
@ -674,9 +677,30 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return newSucceededFuture(entryCodec);
}
@Override
public MasterSlaveEntry getEntry(InetSocketAddress addr) {
return addr2entry.get(addr);
public MasterSlaveEntry getEntry(URI addr) {
for (MasterSlaveEntry entry : client2entry.values()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr)) {
return entry;
}
if (entry.hasSlave(addr)) {
return entry;
}
}
return null;
}
public MasterSlaveEntry getEntry(RedisClient redisClient) {
MasterSlaveEntry entry = client2entry.get(redisClient);
if (entry != null) {
return entry;
}
for (MasterSlaveEntry mentry : client2entry.values()) {
if (mentry.hasSlave(redisClient)) {
return mentry;
}
}
return null;
}
@Override
@ -686,22 +710,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final void changeMaster(int slot, URI address) {
MasterSlaveEntry entry = getEntry(slot);
addr2entry.remove(entry.getClient().getAddr());
client2entry.remove(entry.getClient());
entry.changeMaster(address);
addr2entry.put(entry.getClient().getAddr(), entry);
client2entry.put(entry.getClient(), entry);
}
protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
slot2entry.set(slot, entry);
entry.addSlotRange(slot);
addr2entry.put(entry.getClient().getAddr(), entry);
client2entry.put(entry.getClient(), entry);
}
protected MasterSlaveEntry removeMaster(Integer slot) {
protected final MasterSlaveEntry removeEntry(Integer slot) {
MasterSlaveEntry entry = slot2entry.getAndSet(slot, null);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
addr2entry.remove(entry.getClient().getAddr());
client2entry.remove(entry.getClient());
}
return entry;
}
@ -734,6 +758,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot());
}
if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient());
}
if (source.getAddr() != null) {
entry = getEntry(source.getAddr());
if (entry == null) {
@ -836,6 +863,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
resolverGroup.close();
if (cfg.getEventLoopGroup() == null) {
group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly();
}

@ -42,6 +42,9 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,10 +69,10 @@ public class MasterSlaveEntry {
final MasterSlaveServersConfig config;
final ConnectionManager connectionManager;
final MasterConnectionPool writeConnectionHolder;
final MasterConnectionPool writeConnectionPool;
final Set<Integer> slots = new HashSet<Integer>();
final MasterPubSubConnectionPool pubSubConnectionHolder;
final MasterPubSubConnectionPool pubSubConnectionPool;
final AtomicBoolean active = new AtomicBoolean(true);
@ -83,8 +86,8 @@ public class MasterSlaveEntry {
this.config = config;
slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);
writeConnectionPool = new MasterConnectionPool(config, connectionManager, this);
pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this);
}
public MasterSlaveServersConfig getConfig() {
@ -106,7 +109,7 @@ public class MasterSlaveEntry {
return result;
}
public RFuture<Void> setupMasterEntry(URI address) {
public RPromise<RedisClient> setupMasterEntry(URI address) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
masterEntry = new ClientConnectionsEntry(
client,
@ -117,12 +120,23 @@ public class MasterSlaveEntry {
connectionManager,
NodeType.MASTER);
RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
listener.incCounter();
addrFuture.addListener(listener);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> f = writeConnectionHolder.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
return CountListener.create(s, f);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
listener.incCounter();
pubSubFuture.addListener(listener);
}
return writeConnectionHolder.add(masterEntry);
return result;
}
public boolean slaveDown(URI address, FreezeReason freezeReason) {
@ -134,6 +148,15 @@ public class MasterSlaveEntry {
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}
public boolean slaveDown(RedisClient redisClient, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(redisClient, freezeReason);
if (entry == null) {
return false;
}
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
@ -201,6 +224,7 @@ public class MasterSlaveEntry {
return;
}
System.out.println("channelName " + channelName + " resubscribed!");
subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
@ -308,11 +332,11 @@ public class MasterSlaveEntry {
});
}
public boolean hasSlave(InetSocketAddress addr) {
return slaveBalancer.contains(addr);
public boolean hasSlave(RedisClient redisClient) {
return slaveBalancer.contains(redisClient);
}
public boolean hasSlave(String addr) {
public boolean hasSlave(URI addr) {
return slaveBalancer.contains(addr);
}
@ -322,7 +346,7 @@ public class MasterSlaveEntry {
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
final ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(),
this.config.getSubscriptionConnectionMinimumIdleSize(),
@ -333,7 +357,22 @@ public class MasterSlaveEntry {
entry.setFreezeReason(FreezeReason.SYSTEM);
}
}
return slaveBalancer.add(entry);
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
RFuture<Void> addFuture = slaveBalancer.add(entry);
addFuture.addListener(new TransferListener<Void>(result));
}
});
return result;
}
public RedisClient getClient() {
@ -345,11 +384,10 @@ public class MasterSlaveEntry {
return false;
}
InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!config.checkSkipSlavesInit()
&& (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) {
&& !URIBuilder.compare(addr, address)) {
slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
}
@ -365,27 +403,33 @@ public class MasterSlaveEntry {
*/
public void changeMaster(final URI address) {
final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<Void> future = setupMasterEntry(address);
future.addListener(new FutureListener<Void>() {
RFuture<RedisClient> future = setupMasterEntry(address);
future.addListener(new FutureListener<RedisClient>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster);
pubSubConnectionHolder.remove(oldMaster);
public void operationComplete(Future<RedisClient> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't change master to: {}", address);
return;
}
RedisClient newMasterClient = future.getNow();
writeConnectionPool.remove(oldMaster);
pubSubConnectionPool.remove(oldMaster);
oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster, false);
slaveDown(URIBuilder.create("redis://" + oldMaster.getClient().getIpAddr()), FreezeReason.MANAGER);
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(address, NodeType.MASTER);
slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient, NodeType.MASTER);
// more than one slave available, so master can be removed from slaves
if (!config.checkSkipSlavesInit()
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(address, FreezeReason.SYSTEM);
slaveDown(newMasterClient, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), address);
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr());
}
});
}
@ -420,7 +464,7 @@ public class MasterSlaveEntry {
}
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
return writeConnectionHolder.get(command);
return writeConnectionPool.get(command);
}
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
@ -430,7 +474,7 @@ public class MasterSlaveEntry {
return slaveBalancer.nextConnection(command);
}
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, URI addr) {
if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command);
}
@ -439,7 +483,7 @@ public class MasterSlaveEntry {
RFuture<RedisPubSubConnection> nextPubSubConnection() {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
return pubSubConnectionHolder.get();
return pubSubConnectionPool.get();
}
return slaveBalancer.nextPubSubConnection();
@ -447,14 +491,14 @@ public class MasterSlaveEntry {
public void returnPubSubConnection(PubSubConnectionEntry entry) {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
pubSubConnectionPool.returnConnection(masterEntry, entry.getConnection());
return;
}
slaveBalancer.returnPubSubConnection(entry.getConnection());
}
public void releaseWrite(RedisConnection connection) {
writeConnectionHolder.returnConnection(masterEntry, connection);
writeConnectionPool.returnConnection(masterEntry, connection);
}
public void releaseRead(RedisConnection connection) {

@ -15,38 +15,44 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
public class NodeSource {
import org.redisson.client.RedisClient;
public static final NodeSource ZERO = new NodeSource(0);
/**
*
* @author Nikita Koksharov
*
*/
public class NodeSource {
public enum Redirect {MOVED, ASK}
private final Integer slot;
private final InetSocketAddress addr;
private final Redirect redirect;
private Integer slot;
private URI addr;
private RedisClient redisClient;
private Redirect redirect;
private MasterSlaveEntry entry;
public NodeSource(MasterSlaveEntry entry) {
this(null, null, null);
this.entry = entry;
}
public NodeSource(MasterSlaveEntry entry, InetSocketAddress addr) {
this(null, addr, null);
public NodeSource(MasterSlaveEntry entry, RedisClient redisClient) {
this.entry = entry;
this.redisClient = redisClient;
}
public NodeSource(Integer slot) {
this(slot, null, null);
public NodeSource(RedisClient redisClient) {
this.redisClient = redisClient;
}
public NodeSource(Integer slot, InetSocketAddress addr) {
this(slot, addr, null);
public NodeSource(Integer slot, RedisClient redisClient) {
this.slot = slot;
this.redisClient = redisClient;
}
public NodeSource(Integer slot, InetSocketAddress addr, Redirect redirect) {
public NodeSource(Integer slot, URI addr, Redirect redirect) {
this.slot = slot;
this.addr = addr;
this.redirect = redirect;
@ -64,7 +70,11 @@ public class NodeSource {
return slot;
}
public InetSocketAddress getAddr() {
public RedisClient getRedisClient() {
return redisClient;
}
public URI getAddr() {
return addr;
}

@ -59,7 +59,7 @@ public class RedisClientEntry implements ClusterNode {
}
public RFuture<Boolean> pingAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, null, RedisCommands.PING_BOOL);
return commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL);
}
@Override
@ -94,7 +94,7 @@ public class RedisClientEntry implements ClusterNode {
@Override
public RFuture<Long> timeAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.TIME);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.TIME);
}
@Override
@ -104,7 +104,7 @@ public class RedisClientEntry implements ClusterNode {
@Override
public RFuture<Map<String, String>> clusterInfoAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO);
}
@Override
@ -120,29 +120,29 @@ public class RedisClientEntry implements ClusterNode {
@Override
public RFuture<Map<String, String>> infoAsync(InfoSection section) {
if (section == InfoSection.ALL) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_ALL);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_ALL);
} else if (section == InfoSection.DEFAULT) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT);
} else if (section == InfoSection.SERVER) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
} else if (section == InfoSection.CLIENTS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS);
} else if (section == InfoSection.MEMORY) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY);
} else if (section == InfoSection.PERSISTENCE) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE);
} else if (section == InfoSection.STATS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_STATS);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_STATS);
} else if (section == InfoSection.REPLICATION) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION);
} else if (section == InfoSection.CPU) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CPU);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CPU);
} else if (section == InfoSection.COMMANDSTATS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS);
} else if (section == InfoSection.CLUSTER) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER);
} else if (section == InfoSection.KEYSPACE) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE);
return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE);
}
throw new IllegalStateException();
}

@ -15,7 +15,6 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
@ -154,8 +153,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
return entry;
}
@ -355,9 +352,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2];
String port = parts[3];
URI uri = convert(ip, port);
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.isFreezed()
&& entry.getClient().getAddr().equals(new InetSocketAddress(ip, Integer.valueOf(port)))) {
&& URIBuilder.compare(entry.getClient().getAddr(), uri)) {
entry.unfreeze();
String masterAddr = ip + ":" + port;
log.info("master: {} has up", masterAddr);

@ -15,7 +15,7 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Set;
import org.redisson.api.RFuture;
@ -36,7 +36,7 @@ public class SingleEntry extends MasterSlaveEntry {
}
@Override
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, URI addr) {
return super.connectionWriteOp(command);
}

@ -18,10 +18,10 @@ package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
@ -31,15 +31,16 @@ import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.CountableListener;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
@ -55,7 +56,7 @@ public class LoadBalancerManager {
private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool;
private final Map<String, ClientConnectionsEntry> ip2Entry = PlatformDependent.newConcurrentHashMap();
private final Map<RedisClient, ClientConnectionsEntry> client2Entry = PlatformDependent.newConcurrentHashMap();
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
@ -63,12 +64,12 @@ public class LoadBalancerManager {
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
}
public void changeType(InetSocketAddress addr, NodeType nodeType) {
ClientConnectionsEntry entry = ip2Entry.get(convert(addr));
changeType(addr, nodeType, entry);
public void changeType(RedisClient redisClient, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(redisClient);
changeType(nodeType, entry);
}
protected void changeType(Object addr, NodeType nodeType, ClientConnectionsEntry entry) {
protected void changeType(NodeType nodeType, ClientConnectionsEntry entry) {
if (entry != null) {
if (connectionManager.isClusterMode()) {
entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER);
@ -79,37 +80,34 @@ public class LoadBalancerManager {
public void changeType(URI address, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(address);
changeType(address, nodeType, entry);
changeType(nodeType, entry);
}
public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise();
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
String addr = entry.getClient().getIpAddr();
ip2Entry.put(addr, entry);
result.trySuccess(null);
}
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null) {
public void operationComplete(io.netty.util.concurrent.Future<Object> future) throws Exception {
super.operationComplete(future);
if (this.result.isSuccess()) {
client2Entry.put(entry.getClient(), entry);
}
};
};
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
listener.incCounter();
slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
listener.incCounter();
pubSubFuture.addListener(listener);
return result;
}
public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) {
for (ClientConnectionsEntry connectionEntry : client2Entry.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
@ -139,19 +137,14 @@ public class LoadBalancerManager {
return false;
}
private String convert(URI address) {
InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort());
return convert(addr);
}
public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason);
}
private ClientConnectionsEntry getEntry(URI address) {
String addr = convert(address);
return ip2Entry.get(addr);
public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(redisClient);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
@ -179,20 +172,34 @@ public class LoadBalancerManager {
return pubSubConnectionPool.get();
}
public boolean contains(InetSocketAddress addr) {
return ip2Entry.containsKey(convert(addr));
public boolean contains(URI addr) {
return getEntry(addr) != null;
}
protected String convert(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
public boolean contains(RedisClient redisClient) {
return getEntry(redisClient) != null;
}
public boolean contains(String addr) {
return ip2Entry.containsKey(addr);
protected ClientConnectionsEntry getEntry(URI addr) {
for (ClientConnectionsEntry entry : client2Entry.values()) {
InetSocketAddress entryAddr = entry.getClient().getAddr();
if (URIBuilder.compare(entryAddr, addr)) {
return entry;
}
}
return null;
}
protected ClientConnectionsEntry getEntry(RedisClient redisClient) {
return client2Entry.get(redisClient);
}
protected String convert(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) {
ClientConnectionsEntry entry = ip2Entry.get(convert(addr));
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, URI addr) {
ClientConnectionsEntry entry = getEntry(addr);
if (entry != null) {
return slaveConnectionPool.get(command, entry);
}
@ -205,23 +212,23 @@ public class LoadBalancerManager {
}
public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = ip2Entry.get(convert(connection.getRedisClient().getAddr()));
ClientConnectionsEntry entry = getEntry(connection.getRedisClient());
pubSubConnectionPool.returnConnection(entry, connection);
}
public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = ip2Entry.get(convert(connection.getRedisClient().getAddr()));
ClientConnectionsEntry entry = getEntry(connection.getRedisClient());
slaveConnectionPool.returnConnection(entry, connection);
}
public void shutdown() {
for (ClientConnectionsEntry entry : ip2Entry.values()) {
for (ClientConnectionsEntry entry : client2Entry.values()) {
entry.getClient().shutdown();
}
}
public void shutdownAsync() {
for (ClientConnectionsEntry entry : ip2Entry.values()) {
for (ClientConnectionsEntry entry : client2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
}
}

@ -15,7 +15,6 @@
*/
package org.redisson.jcache;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -58,6 +57,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
@ -2082,7 +2082,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
try {

@ -29,7 +29,7 @@ import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.REntity.TransformationMode;
import org.redisson.api.annotation.RId;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
@ -52,7 +52,7 @@ import net.bytebuddy.implementation.bind.annotation.This;
public class AccessorInterceptor {
private final RedissonClient redisson;
private final CodecProvider codecProvider;
private final ReferenceCodecProvider codecProvider;
private final RedissonObjectBuilder objectBuilder;
public AccessorInterceptor(RedissonClient redisson, RedissonObjectBuilder objectBuilder) {

@ -29,7 +29,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.REntity;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.resolver.NamingScheme;
@ -50,14 +50,14 @@ public class LiveObjectInterceptor {
}
private final RedissonClient redisson;
private final CodecProvider codecProvider;
private final ReferenceCodecProvider codecProvider;
private final Class<?> originalClass;
private final String idFieldName;
private final Class<?> idFieldType;
private final NamingScheme namingScheme;
private final Class<? extends Codec> codecClass;
public LiveObjectInterceptor(RedissonClient redisson, CodecProvider codecProvider, Class<?> entityClass, String idFieldName) {
public LiveObjectInterceptor(RedissonClient redisson, ReferenceCodecProvider codecProvider, Class<?> entityClass, String idFieldName) {
this.redisson = redisson;
this.codecProvider = codecProvider;
this.originalClass = entityClass;

@ -43,7 +43,7 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RObjectField;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.misc.RedissonObjectFactory;
@ -74,9 +74,9 @@ public class RedissonObjectBuilder {
}
private final RedissonClient redisson;
private final CodecProvider codecProvider;
private final ReferenceCodecProvider codecProvider;
public RedissonObjectBuilder(RedissonClient redisson, CodecProvider codecProvider) {
public RedissonObjectBuilder(RedissonClient redisson, ReferenceCodecProvider codecProvider) {
super();
this.redisson = redisson;
this.codecProvider = codecProvider;

@ -1,48 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.liveobject.provider;
import io.netty.util.internal.PlatformDependent;
import java.lang.annotation.Annotation;
import java.util.concurrent.ConcurrentMap;
import org.redisson.liveobject.resolver.Resolver;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class DefaultResolverProvider implements ResolverProvider {
public transient final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> providerCache = PlatformDependent.newConcurrentHashMap();
@Override
public Resolver<?, ?, ?> getResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Annotation anno) {
if (!providerCache.containsKey(resolverClass)) {
try {
providerCache.putIfAbsent(resolverClass, resolverClass.newInstance());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return providerCache.get(resolverClass);
}
@Override
public void registerResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Resolver resolver) {
providerCache.putIfAbsent(resolverClass, resolver);
}
}

@ -1,49 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.liveobject.provider;
import java.lang.annotation.Annotation;
import org.redisson.liveobject.resolver.Resolver;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public interface ResolverProvider {
/**
* To retrieve a resolver based on the the class requiring values to be
* resolved, the resolver type, and annotation which may carry any required
* configurations.
*
* @param cls the class requires value to be resolved
* @param resolverClass the resolver type
* @param anno annotation with configurations
* @return a Resolver instance
*/
Resolver<?, ?, ?> getResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Annotation anno);
/**
* To register a resolver based on the the class it can provide value to,
* the resolver type, the resolver instance to be cached.
*
* @param cls object
* @param resolverClass object
* @param resolver object
*/
void registerResolver(Class<?> cls, Class<? extends Resolver> resolverClass, Resolver resolver);
}

@ -32,7 +32,7 @@ import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RId;
import org.redisson.client.codec.Codec;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
@ -103,7 +103,7 @@ public class RedissonObjectFactory {
public static <T> T fromReference(RedissonClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getType();
CodecProvider codecProvider = redisson.getConfig().getCodecProvider();
ReferenceCodecProvider codecProvider = redisson.getConfig().getReferenceCodecProvider();
if (type != null) {
if (ClassUtils.isAnnotationPresent(type, REntity.class)) {
RLiveObjectService liveObjectService = redisson.getLiveObjectService();
@ -133,7 +133,7 @@ public class RedissonObjectFactory {
public static <T> T fromReference(RedissonReactiveClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getReactiveType();
CodecProvider codecProvider = redisson.getConfig().getCodecProvider();
ReferenceCodecProvider codecProvider = redisson.getConfig().getReferenceCodecProvider();
/**
* Live Object from reference in reactive client is not supported yet.
*/
@ -158,12 +158,12 @@ public class RedissonObjectFactory {
if (object instanceof RObject && !(object instanceof RLiveObject)) {
RObject rObject = ((RObject) object);
config.getCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
return new RedissonReference(object.getClass(), ((RObject) object).getName(), ((RObject) object).getCodec());
}
if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) {
RObjectReactive rObject = ((RObjectReactive) object);
config.getCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
return new RedissonReference(object.getClass(), ((RObjectReactive) object).getName(), ((RObjectReactive) object).getCodec());
}
@ -173,7 +173,7 @@ public class RedissonObjectFactory {
REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(config.getCodecProvider().getCodec(anno, (Class) rEntity));
.newInstance(config.getReferenceCodecProvider().getCodec(anno, (Class) rEntity));
String name = Introspectior
.getFieldsWithAnnotation(rEntity, RId.class)
.getOnly().getName();

@ -15,6 +15,7 @@
*/
package org.redisson.misc;
import java.net.InetSocketAddress;
import java.net.URI;
/**
@ -37,4 +38,13 @@ public class URIBuilder {
return URI.create(uri.replace(s, "[" + s + "]"));
}
public static boolean compare(InetSocketAddress entryAddr, URI addr) {
if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost()))
|| entryAddr.getAddress().getHostAddress().equals(addr.getHost()))
&& entryAddr.getPort() == addr.getPort()) {
return true;
}
return false;
}
}

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.net.InetSocketAddress;
import org.reactivestreams.Publisher;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -30,7 +31,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/
interface MapReactive<K, V> {
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos);
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos);
Publisher<V> put(K key, V value);

@ -15,7 +15,6 @@
*/
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@ -32,6 +31,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -165,7 +165,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
}
@Override
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(final InetSocketAddress client, final long startPos) {
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>>() {
@Override
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> get() {

@ -30,6 +30,7 @@ import org.redisson.api.MapOptions;
import org.redisson.api.RFuture;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommands;
@ -284,7 +285,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
});
}
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
}

@ -15,7 +15,6 @@
*/
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
@ -27,6 +26,7 @@ import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -57,7 +57,7 @@ public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M
private Map<ByteBuf, ByteBuf> firstValues;
private Map<ByteBuf, ByteBuf> lastValues;
private long nextIterPos;
private InetSocketAddress client;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;

@ -26,6 +26,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
@ -176,7 +177,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
}
@ -184,7 +185,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
});

@ -27,6 +27,7 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCacheReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -86,7 +87,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
});
}
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(final InetSocketAddress client, final long startPos) {
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() {
@Override
public RFuture<ListScanResult<ScanObjectEntry>> get() {
@ -99,7 +100,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
});

@ -27,6 +27,7 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
@ -77,7 +78,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
});
}
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
}
@ -212,7 +213,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
});

@ -25,6 +25,7 @@ import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -46,7 +47,7 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private long nextIterPos;
private InetSocketAddress client;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
@ -183,6 +184,6 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
return result;
}
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos);
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos);
}

@ -31,6 +31,7 @@ import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodesGroup;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -84,7 +85,7 @@ public class RedissonTest {
RedissonBaseIterator iter = new RedissonBaseIterator() {
int i;
@Override
ListScanResult iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult(13L, Collections.emptyList());
@ -110,7 +111,7 @@ public class RedissonTest {
RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() {
int i;
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1)));

@ -1,25 +1,31 @@
package org.redisson.spring.support;
import io.netty.channel.EventLoopGroup;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Executor;
import static org.hamcrest.Matchers.*;
import org.junit.Test;
import org.redisson.ClusterRunner;
import org.redisson.RedisRunner;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import static org.junit.Assert.*;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.redisson.liveobject.provider.ResolverProvider;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import io.netty.channel.EventLoopGroup;
/**
*
@ -116,9 +122,8 @@ public class SpringNamespaceWikiTest {
assertEquals(2, config.getNettyThreads());
assertSame(context.getBean("myCodec", Codec.class), config.getCodec());
assertEquals(false, config.isUseLinuxNativeEpoll());
assertEquals(false, config.isRedissonReferenceEnabled());
assertSame(context.getBean("myCodecProvider", CodecProvider.class), config.getCodecProvider());
assertSame(context.getBean("myResolverProvider", ResolverProvider.class), config.getResolverProvider());
assertEquals(false, config.isReferenceEnabled());
assertSame(context.getBean("myCodecProvider", ReferenceCodecProvider.class), config.getReferenceCodecProvider());
assertSame(context.getBean("myExecutor", Executor.class), config.getExecutor());
assertSame(context.getBean("myEventLoopGroup", EventLoopGroup.class), config.getEventLoopGroup());
Method method = Config.class.getDeclaredMethod("getSingleServerConfig", (Class<?>[]) null);

Loading…
Cancel
Save