Fixed - IP address renew process. #1178

pull/1204/head
Nikita 7 years ago
parent ecd78802b2
commit b9d9f87043

@ -15,12 +15,17 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress; import org.redisson.client.RedisClient;
/**
*
* @author Nikita Koksharov
*
*/
public interface RedisClientResult { public interface RedisClientResult {
void setRedisClient(InetSocketAddress addr); void setRedisClient(RedisClient addr);
InetSocketAddress getRedisClient(); RedisClient getRedisClient();
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -34,10 +33,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry; import org.redisson.connection.RedisClientEntry;
import org.redisson.misc.URIBuilder;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import org.redisson.misc.URIBuilder;
/** /**
* *
@ -56,10 +55,9 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override @Override
public N getNode(String address) { public N getNode(String address) {
Collection<N> clients = (Collection<N>) connectionManager.getClients(); Collection<N> clients = (Collection<N>) connectionManager.getClients();
URI uri = URIBuilder.create(address); URI addr = URIBuilder.create(address);
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
for (N node : clients) { for (N node : clients) {
if (node.getAddr().equals(addr)) { if (URIBuilder.compare(node.getAddr(), addr)) {
return node; return node;
} }
} }

@ -15,12 +15,12 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -38,7 +38,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<ByteBuf> lastValues; private List<ByteBuf> lastValues;
private Iterator<ScanObjectEntry> lastIter; private Iterator<ScanObjectEntry> lastIter;
protected long nextIterPos; protected long nextIterPos;
protected InetSocketAddress client; protected RedisClient client;
private boolean finished; private boolean finished;
private boolean currentElementRemoved; private boolean currentElementRemoved;
@ -145,7 +145,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
return false; return false;
} }
abstract ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos); abstract ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos);
@Override @Override
public V next() { public V next() {

@ -15,7 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -23,6 +22,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -42,7 +42,7 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> lastValues; private Map<ByteBuf, ByteBuf> lastValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter; private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> lastIter;
protected long nextIterPos; protected long nextIterPos;
protected InetSocketAddress client; protected RedisClient client;
private boolean finished; private boolean finished;
private boolean currentElementRemoved; private boolean currentElementRemoved;

@ -15,7 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -31,6 +30,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RKeys; import org.redisson.api.RKeys;
import org.redisson.api.RObject; import org.redisson.api.RObject;
import org.redisson.api.RType; import org.redisson.api.RType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
@ -107,7 +107,7 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null); return getKeysByPattern(null);
} }
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) { if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count); RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f); return commandExecutor.get(f);
@ -120,7 +120,7 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() { return new RedissonBaseIterator<String>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
} }

@ -16,7 +16,6 @@
package org.redisson; package org.redisson;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.AbstractCollection; import java.util.AbstractCollection;
import java.util.AbstractSet; import java.util.AbstractSet;
import java.util.ArrayList; import java.util.ArrayList;
@ -40,6 +39,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RMapReduce; import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.MapScanCodec;
@ -949,7 +949,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) { if (pattern == null) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);

@ -37,6 +37,7 @@ import org.redisson.api.map.event.EntryExpiredListener;
import org.redisson.api.map.event.EntryRemovedListener; import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener; import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.api.map.event.MapEntryListener; import org.redisson.api.map.event.MapEntryListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.MapScanCodec;
@ -1169,11 +1170,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
} }
@Override @Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
return get(scanIteratorAsync(name, client, startPos, pattern)); return get(scanIteratorAsync(name, client, startPos, pattern));
} }
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorAsync(final String name, InetSocketAddress client, long startPos, String pattern) { public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis()); params.add(System.currentTimeMillis());
params.add(startPos); params.add(startPos);

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -40,7 +41,7 @@ abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
private Iterator<V> valuesIter; private Iterator<V> valuesIter;
protected long valuesIterPos = 0; protected long valuesIterPos = 0;
protected InetSocketAddress client; protected RedisClient client;
private boolean finished; private boolean finished;
private boolean removeExecuted; private boolean removeExecuted;

@ -33,6 +33,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMultimap; import org.redisson.api.RMultimap;
import org.redisson.api.RReadWriteLock; import org.redisson.api.RReadWriteLock;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.MapScanCodec;
@ -298,7 +299,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
} }
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) { MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
return get(f); return get(f);
} }

@ -16,7 +16,6 @@
package org.redisson; package org.redisson;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -32,16 +31,14 @@ import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.ScoredCodec; import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
@ -315,7 +312,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o)); return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o));
} }
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) { private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
return get(f); return get(f);
} }
@ -325,7 +322,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos); return scanIterator(client, nextIterPos);
} }

@ -29,6 +29,7 @@ import org.redisson.api.RSet;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -89,7 +90,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
} }
@Override @Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
if (pattern == null) { if (pattern == null) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos); RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
return get(f); return get(f);
@ -104,7 +105,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern); return scanIterator(getName(), client, nextIterPos, pattern);
} }

@ -29,6 +29,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSetCache; import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -123,12 +124,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
Arrays.<Object>asList(getName(o)), System.currentTimeMillis(), encode(o)); Arrays.<Object>asList(getName(o)), System.currentTimeMillis(), encode(o));
} }
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { @Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos, pattern); RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos, pattern);
return get(f); return get(f);
} }
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, InetSocketAddress client, long startPos, String pattern) { public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(startPos); params.add(startPos);
params.add(System.currentTimeMillis()); params.add(System.currentTimeMillis());
@ -160,7 +162,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern); return scanIterator(getName(), client, nextIterPos, pattern);
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -28,6 +27,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -165,7 +165,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o)); System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o));
} }
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos, String pattern) { private ListScanResult<ScanObjectEntry> scanIterator(RedisClient client, long startPos, String pattern) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis()); params.add(System.currentTimeMillis());
params.add(startPos); params.add(startPos);
@ -201,7 +201,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {
@Override @Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern); return scanIterator(client, nextIterPos, pattern);
} }

@ -15,8 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -27,7 +26,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/ */
public interface ScanIterator { public interface ScanIterator {
ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos, String pattern); ListScanResult<ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos, String pattern);
boolean remove(Object value); boolean remove(Object value);

@ -20,13 +20,13 @@ import java.net.URI;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer; import org.redisson.client.handler.RedisChannelInitializer;
import org.redisson.client.handler.RedisChannelInitializer.Type; import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -34,12 +34,16 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -53,9 +57,11 @@ import io.netty.util.concurrent.FutureListener;
*/ */
public class RedisClient { public class RedisClient {
private final AtomicReference<RFuture<InetSocketAddress>> resolveFuture = new AtomicReference<RFuture<InetSocketAddress>>();
private final Bootstrap bootstrap; private final Bootstrap bootstrap;
private final Bootstrap pubSubBootstrap; private final Bootstrap pubSubBootstrap;
private final InetSocketAddress addr; private final URI addr;
private InetSocketAddress resolvedAddr;
private final ChannelGroup channels; private final ChannelGroup channels;
private ExecutorService executor; private ExecutorService executor;
@ -66,6 +72,7 @@ public class RedisClient {
private boolean hasOwnTimer; private boolean hasOwnTimer;
private boolean hasOwnExecutor; private boolean hasOwnExecutor;
private boolean hasOwnGroup; private boolean hasOwnGroup;
private boolean hasOwnResolver;
public static RedisClient create(RedisClientConfig config) { public static RedisClient create(RedisClientConfig config) {
return new RedisClient(config); return new RedisClient(config);
@ -85,12 +92,20 @@ public class RedisClient {
copy.setExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)); copy.setExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
hasOwnExecutor = true; hasOwnExecutor = true;
} }
if (copy.getResolverGroup() == null) {
if (config.getSocketChannelClass() == EpollSocketChannel.class) {
copy.setResolverGroup(new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()));
} else {
copy.setResolverGroup(new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()));
}
hasOwnResolver = true;
}
this.config = copy; this.config = copy;
this.executor = copy.getExecutor(); this.executor = copy.getExecutor();
this.timer = copy.getTimer(); this.timer = copy.getTimer();
addr = new InetSocketAddress(copy.getAddress().getHost(), copy.getAddress().getPort()); addr = copy.getAddress();
channels = new DefaultChannelGroup(copy.getGroup().next()); channels = new DefaultChannelGroup(copy.getGroup().next());
bootstrap = createBootstrap(copy, Type.PLAIN); bootstrap = createBootstrap(copy, Type.PLAIN);
@ -101,9 +116,9 @@ public class RedisClient {
private Bootstrap createBootstrap(RedisClientConfig config, Type type) { private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap() Bootstrap bootstrap = new Bootstrap()
.resolver(config.getResolverGroup())
.channel(config.getSocketChannelClass()) .channel(config.getSocketChannelClass())
.group(config.getGroup()) .group(config.getGroup());
.remoteAddress(addr);
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type)); bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
@ -112,92 +127,8 @@ public class RedisClient {
return bootstrap; return bootstrap;
} }
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(String address) {
this(URIBuilder.create(address));
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(URI address) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
hasOwnGroup = true;
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {
this(timer, executor, group, address.getHost(), address.getPort());
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(String host, int port) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
hasOwnGroup = true;
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int port) {
this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(String host, int port, int connectTimeout, int commandTimeout) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
}
/*
* Use {@link #create(RedisClientConfig)}
*
*/
@Deprecated
public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port,
int connectTimeout, int commandTimeout) {
RedisClientConfig config = new RedisClientConfig();
config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
.setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
this.config = config;
this.executor = config.getExecutor();
this.timer = config.getTimer();
addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());
channels = new DefaultChannelGroup(config.getGroup().next());
bootstrap = createBootstrap(config, Type.PLAIN);
pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
this.commandTimeout = config.getCommandTimeout();
}
public String getIpAddr() {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public InetSocketAddress getAddr() { public InetSocketAddress getAddr() {
return addr; return resolvedAddr;
} }
public long getCommandTimeout() { public long getCommandTimeout() {
@ -219,42 +150,78 @@ public class RedisClient {
throw new RedisConnectionException("Unable to connect to: " + addr, e); throw new RedisConnectionException("Unable to connect to: " + addr, e);
} }
} }
public RFuture<InetSocketAddress> resolveAddr() {
final RPromise<InetSocketAddress> promise = new RedissonPromise<InetSocketAddress>();
if (!resolveFuture.compareAndSet(null, promise)) {
return resolveFuture.get();
}
AddressResolver<InetSocketAddress> resolver = (AddressResolver<InetSocketAddress>) bootstrap.config().resolver().getResolver(bootstrap.config().group().next());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
resolvedAddr = future.getNow();
promise.trySuccess(future.getNow());
}
});
return promise;
}
public RFuture<RedisConnection> connectAsync() { public RFuture<RedisConnection> connectAsync() {
final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>(); final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() { RFuture<InetSocketAddress> addrFuture = resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override @Override
public void operationComplete(final ChannelFuture future) throws Exception { public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (future.isSuccess()) { if (!future.isSuccess()) {
final RedisConnection c = RedisConnection.getFrom(future.channel()); f.tryFailure(future.cause());
c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() { return;
@Override }
public void operationComplete(final Future<RedisConnection> future) throws Exception {
bootstrap.config().group().execute(new Runnable() { ChannelFuture channelFuture = bootstrap.connect(future.getNow());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (future.isSuccess()) {
final RedisConnection c = RedisConnection.getFrom(future.channel());
c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void run() { public void operationComplete(final Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) { bootstrap.config().group().execute(new Runnable() {
if (!f.trySuccess(c)) { @Override
c.closeAsync(); public void run() {
if (future.isSuccess()) {
if (!f.trySuccess(c)) {
c.closeAsync();
}
} else {
f.tryFailure(future.cause());
c.closeAsync();
}
} }
} else { });
f.tryFailure(future.cause()); }
c.closeAsync(); });
} } else {
bootstrap.config().group().execute(new Runnable() {
public void run() {
f.tryFailure(future.cause());
} }
}); });
} }
}); }
} else { });
bootstrap.config().group().execute(new Runnable() {
public void run() {
f.tryFailure(future.cause());
}
});
}
} }
}); });
return f; return f;
} }
@ -268,39 +235,52 @@ public class RedisClient {
public RFuture<RedisPubSubConnection> connectPubSubAsync() { public RFuture<RedisPubSubConnection> connectPubSubAsync() {
final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>(); final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>();
ChannelFuture channelFuture = pubSubBootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() { RFuture<InetSocketAddress> nameFuture = resolveAddr();
nameFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override @Override
public void operationComplete(final ChannelFuture future) throws Exception { public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (future.isSuccess()) { if (!future.isSuccess()) {
final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel()); f.tryFailure(future.cause());
c.<RedisPubSubConnection>getConnectionPromise().addListener(new FutureListener<RedisPubSubConnection>() { return;
@Override }
public void operationComplete(final Future<RedisPubSubConnection> future) throws Exception {
bootstrap.config().group().execute(new Runnable() { ChannelFuture channelFuture = pubSubBootstrap.connect(future.getNow());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
if (future.isSuccess()) {
final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel());
c.<RedisPubSubConnection>getConnectionPromise().addListener(new FutureListener<RedisPubSubConnection>() {
@Override @Override
public void run() { public void operationComplete(final Future<RedisPubSubConnection> future) throws Exception {
if (future.isSuccess()) { pubSubBootstrap.config().group().execute(new Runnable() {
if (!f.trySuccess(c)) { @Override
c.closeAsync(); public void run() {
if (future.isSuccess()) {
if (!f.trySuccess(c)) {
c.closeAsync();
}
} else {
f.tryFailure(future.cause());
c.closeAsync();
}
} }
} else { });
f.tryFailure(future.cause()); }
c.closeAsync(); });
} } else {
pubSubBootstrap.config().group().execute(new Runnable() {
public void run() {
f.tryFailure(future.cause());
} }
}); });
} }
}); }
} else { });
bootstrap.config().group().execute(new Runnable() {
public void run() {
f.tryFailure(future.cause());
}
});
}
} }
}); });
return f; return f;
} }
@ -339,6 +319,9 @@ public class RedisClient {
executor.awaitTermination(15, TimeUnit.SECONDS); executor.awaitTermination(15, TimeUnit.SECONDS);
} }
if (hasOwnResolver) {
bootstrap.config().resolver().close();
}
if (hasOwnGroup) { if (hasOwnGroup) {
bootstrap.config().group().shutdownGracefully(); bootstrap.config().group().shutdownGracefully();
} }

@ -15,6 +15,7 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -24,6 +25,7 @@ import org.redisson.misc.URIBuilder;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.Timer; import io.netty.util.Timer;
/** /**
@ -34,10 +36,12 @@ import io.netty.util.Timer;
public class RedisClientConfig { public class RedisClientConfig {
private URI address; private URI address;
private InetAddress addr;
private Timer timer; private Timer timer;
private ExecutorService executor; private ExecutorService executor;
private EventLoopGroup group; private EventLoopGroup group;
private DnsAddressResolverGroup resolverGroup;
private Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class; private Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;
private int connectTimeout = 10000; private int connectTimeout = 10000;
private int commandTimeout = 10000; private int commandTimeout = 10000;
@ -84,6 +88,7 @@ public class RedisClientConfig {
this.sslTruststorePassword = config.sslTruststorePassword; this.sslTruststorePassword = config.sslTruststorePassword;
this.sslKeystore = config.sslKeystore; this.sslKeystore = config.sslKeystore;
this.sslKeystorePassword = config.sslKeystorePassword; this.sslKeystorePassword = config.sslKeystorePassword;
this.resolverGroup = config.resolverGroup;
} }
public RedisClientConfig setAddress(String host, int port) { public RedisClientConfig setAddress(String host, int port) {
@ -94,6 +99,11 @@ public class RedisClientConfig {
this.address = URIBuilder.create(address); this.address = URIBuilder.create(address);
return this; return this;
} }
public RedisClientConfig setAddress(InetAddress addr, URI address) {
this.addr = addr;
this.address = address;
return this;
}
public RedisClientConfig setAddress(URI address) { public RedisClientConfig setAddress(URI address) {
this.address = address; this.address = address;
return this; return this;
@ -101,7 +111,9 @@ public class RedisClientConfig {
public URI getAddress() { public URI getAddress() {
return address; return address;
} }
public InetAddress getAddr() {
return addr;
}
public Timer getTimer() { public Timer getTimer() {
return timer; return timer;
@ -262,5 +274,15 @@ public class RedisClientConfig {
this.tcpNoDelay = tcpNoDelay; this.tcpNoDelay = tcpNoDelay;
return this; return this;
} }
public DnsAddressResolverGroup getResolverGroup() {
return resolverGroup;
}
public RedisClientConfig setResolverGroup(DnsAddressResolverGroup resolverGroup) {
this.resolverGroup = resolverGroup;
return this;
}
} }

@ -15,8 +15,8 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
/** /**
@ -44,8 +44,4 @@ public class RedisRedirectException extends RedisException {
return url; return url;
} }
public InetSocketAddress getAddr() {
return new InetSocketAddress(url.getHost(), url.getPort());
}
} }

@ -15,6 +15,7 @@
*/ */
package org.redisson.client.handler; package org.redisson.client.handler;
import java.net.SocketAddress;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -109,7 +110,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection); log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
try { try {
bootstrap.connect().addListener(new ChannelFutureListener() { bootstrap.connect(connection.getRedisClient().getAddr()).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(final ChannelFuture future) throws Exception { public void operationComplete(final ChannelFuture future) throws Exception {

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import org.redisson.RedisClientResult; import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
/** /**
* *
@ -30,7 +31,7 @@ public class ListScanResult<V> implements RedisClientResult {
private final Long pos; private final Long pos;
private final List<V> values; private final List<V> values;
private InetSocketAddress addr; private RedisClient client;
public ListScanResult(Long pos, List<V> values) { public ListScanResult(Long pos, List<V> values) {
this.pos = pos; this.pos = pos;
@ -46,12 +47,12 @@ public class ListScanResult<V> implements RedisClientResult {
} }
@Override @Override
public void setRedisClient(InetSocketAddress addr) { public void setRedisClient(RedisClient client) {
this.addr = addr; this.client = client;
} }
public InetSocketAddress getRedisClient() { public RedisClient getRedisClient() {
return addr; return client;
} }
} }

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import org.redisson.RedisClientResult; import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;
/** /**
* *
@ -31,7 +32,7 @@ public class MapScanResult<K, V> implements RedisClientResult {
private final Long pos; private final Long pos;
private final Map<K, V> values; private final Map<K, V> values;
private InetSocketAddress client; private RedisClient client;
public MapScanResult(Long pos, Map<K, V> values) { public MapScanResult(Long pos, Map<K, V> values) {
super(); super();
@ -48,11 +49,11 @@ public class MapScanResult<K, V> implements RedisClientResult {
} }
@Override @Override
public void setRedisClient(InetSocketAddress client) { public void setRedisClient(RedisClient client) {
this.client = client; this.client = client;
} }
public InetSocketAddress getRedisClient() { public RedisClient getRedisClient() {
return client; return client;
} }

@ -52,6 +52,7 @@ import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.SingleEntry; import org.redisson.connection.SingleEntry;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -111,7 +112,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<RFuture<Collection<RFuture<Void>>>> futures = new ArrayList<RFuture<Collection<RFuture<Void>>>>(); List<RFuture<Collection<RFuture<Void>>>> futures = new ArrayList<RFuture<Collection<RFuture<Void>>>>();
for (ClusterPartition partition : partitions) { for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
failedMasters.add(partition.getMasterAddr().toString()); failedMasters.add(partition.getMasterAddress().toString());
continue; continue;
} }
RFuture<Collection<RFuture<Void>>> masterFuture = addMasterEntry(partition, cfg); RFuture<Collection<RFuture<Void>>> masterFuture = addMasterEntry(partition, cfg);
@ -271,12 +272,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
} }
RFuture<Void> f = e.setupMasterEntry(config.getMasterAddress()); RFuture<RedisClient> f = e.setupMasterEntry(config.getMasterAddress());
final RPromise<Void> initFuture = newPromise(); final RPromise<Void> initFuture = new RedissonPromise<Void>();
futures.add(initFuture); futures.add(initFuture);
f.addListener(new FutureListener<Void>() { f.addListener(new FutureListener<RedisClient>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<RedisClient> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Can't add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); log.error("Can't add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
initFuture.tryFailure(future.cause()); initFuture.tryFailure(future.cause());
@ -393,8 +394,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
masterFuture.addListener(new FutureListener<Void>() { masterFuture.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
checkSlotsMigration(newPartitions, nodesValue.toString()); checkSlotsMigration(newPartitions);
checkSlotsChange(cfg, newPartitions, nodesValue.toString()); checkSlotsChange(cfg, newPartitions);
getShutdownLatch().release(); getShutdownLatch().release();
scheduleClusterChangeCheck(cfg, null); scheduleClusterChangeCheck(cfg, null);
} }
@ -410,7 +411,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue; continue;
} }
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); MasterSlaveEntry entry = getEntry(currentPart.getSlots().iterator().next());
// should be invoked first in order to remove stale failedSlaveAddresses // should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart); Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive? // Do some slaves have changed state from failed to alive?
@ -562,7 +563,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result; return result;
} }
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions, String nodes) { private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
Collection<Integer> newPartitionsSlots = slots(newPartitions); Collection<Integer> newPartitionsSlots = slots(newPartitions);
if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) { if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
return; return;
@ -576,8 +577,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
for (Integer slot : removedSlots) { for (Integer slot : removedSlots) {
MasterSlaveEntry entry = getEntry(slot); MasterSlaveEntry entry = removeEntry(slot);
removeMaster(slot);
if (entry.getSlotRanges().isEmpty()) { if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync(); entry.shutdownMasterAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr()); log.info("{} master and slaves for it removed", entry.getClient().getAddr());
@ -592,50 +592,56 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
for (final Integer slot : addedSlots) { for (final Integer slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot); ClusterPartition partition = find(newPartitions, slot);
MasterSlaveEntry entry = getEntry(partition.getMasterAddr());
if (entry != null && entry.getClient().getAddr().equals(partition.getMasterAddr())) { Set<Integer> oldSlots = new HashSet<Integer>(partition.getSlots());
oldSlots.removeAll(addedSlots);
if (oldSlots.isEmpty()) {
continue;
}
MasterSlaveEntry entry = getEntry(oldSlots.iterator().next());
if (entry != null) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, partition); lastPartitions.put(slot, partition);
break;
} }
} }
} }
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions, String nodes) { private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
Set<ClusterPartition> currentPartitions = getLastPartitions(); Set<ClusterPartition> currentPartitions = getLastPartitions();
for (ClusterPartition currentPartition : currentPartitions) { for (ClusterPartition currentPartition : currentPartitions) {
for (ClusterPartition newPartition : newPartitions) { for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId()) if (!currentPartition.getNodeId().equals(newPartition.getNodeId())
// skip master change case // skip master change case
|| !currentPartition.getMasterAddr().equals(newPartition.getMasterAddr())) { || !currentPartition.getMasterAddress().equals(newPartition.getMasterAddress())) {
continue; continue;
} }
MasterSlaveEntry entry = getEntry(currentPartition.getSlots().iterator().next());
Set<Integer> addedSlots = new HashSet<Integer>(newPartition.getSlots()); Set<Integer> addedSlots = new HashSet<Integer>(newPartition.getSlots());
addedSlots.removeAll(currentPartition.getSlots()); addedSlots.removeAll(currentPartition.getSlots());
currentPartition.addSlots(addedSlots); currentPartition.addSlots(addedSlots);
MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr());
for (Integer slot : addedSlots) { for (Integer slot : addedSlots) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, currentPartition); lastPartitions.put(slot, currentPartition);
} }
if (!addedSlots.isEmpty()) { if (!addedSlots.isEmpty()) {
log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddr()); log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress());
} }
Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots()); Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots());
removedSlots.removeAll(newPartition.getSlots()); removedSlots.removeAll(newPartition.getSlots());
for (Integer removeSlot : removedSlots) { for (Integer removeSlot : removedSlots) {
if (lastPartitions.remove(removeSlot, currentPartition)) { if (lastPartitions.remove(removeSlot, currentPartition)) {
removeMaster(removeSlot); removeEntry(removeSlot);
} }
} }
currentPartition.removeSlots(removedSlots); currentPartition.removeSlots(removedSlots);
if (!removedSlots.isEmpty()) { if (!removedSlots.isEmpty()) {
log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddr()); log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddress());
} }
break; break;
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.cluster; package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -106,10 +105,6 @@ public class ClusterPartition {
return slots; return slots;
} }
public InetSocketAddress getMasterAddr() {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
}
public URI getMasterAddress() { public URI getMasterAddress() {
return masterAddress; return masterAddress;
} }

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -56,11 +57,11 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
@ -68,20 +69,16 @@ public interface CommandAsyncExecutor {
<R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params); <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> RFuture<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params);
@ -96,9 +93,6 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object ... params);
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -38,6 +37,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient; import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisAskException; import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException; import org.redisson.client.RedisLoadingException;
@ -180,20 +180,27 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
@Override @Override
public <T, R> RFuture<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@Override @Override
public <T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(name);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0);
return mainPromise; return mainPromise;
} }
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override @Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) { public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise(); final RPromise<Collection<R>> mainPromise = connectionManager.newPromise();
@ -344,12 +351,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise; return mainPromise;
} }
public <T, R> RFuture<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override @Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise(); RPromise<R> mainPromise = connectionManager.newPromise();
@ -357,13 +358,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise; return mainPromise;
} }
@Override
public <T, R> RFuture<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override @Override
public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object... params) { public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object... params) {
return readAsync(key, connectionManager.getCodec(), command, params); return readAsync(key, connectionManager.getCodec(), command, params);
@ -381,13 +375,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
@Override @Override
public <T, R> RFuture<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params); int slot = connectionManager.calcSlot(name);
}
@Override
public <T, R> RFuture<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
int slot = connectionManager.calcSlot(key);
return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params); return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params);
} }
@ -401,10 +390,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params); return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params);
} }
public <T, R> RFuture<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params);
}
@Override @Override
public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
return evalAllAsync(false, command, callback, script, keys, params); return evalAllAsync(false, command, callback, script, keys, params);
@ -805,7 +790,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException) future.cause(); RedisMovedException ex = (RedisMovedException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(), async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details); AsyncDetails.release(details);
return; return;
@ -813,7 +798,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException) future.cause(); RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(), async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details); AsyncDetails.release(details);
return; return;
@ -844,11 +829,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.isSuccess()) { if (future.isSuccess()) {
R res = future.getNow(); R res = future.getNow();
if (res instanceof RedisClientResult) { if (res instanceof RedisClientResult) {
InetSocketAddress addr = source.getAddr(); ((RedisClientResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
if (addr == null) {
addr = details.getConnectionFuture().getNow().getRedisClient().getAddr();
}
((RedisClientResult) res).setRedisClient(addr);
} }
if (isRedissonReferenceSupportEnabled()) { if (isRedissonReferenceSupportEnabled()) {

@ -371,14 +371,14 @@ public class CommandBatchService extends CommandAsyncService {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors(); entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return; return;
} }
if (future.cause() instanceof RedisAskException) { if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause(); RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors(); entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK); NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval);
return; return;
} }

@ -15,13 +15,13 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
@ -37,9 +37,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Publisher<R> reactive(Supplier<RFuture<R>> supplier); <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier);
<T, R> Publisher<R> evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object ... params);
<T, R> Publisher<R> evalWriteAllReactive(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params); <T, R> Publisher<R> evalWriteAllReactive(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);
<T, R> Publisher<Collection<R>> readAllReactive(RedisCommand<T> command, Object ... params); <T, R> Publisher<Collection<R>> readAllReactive(RedisCommand<T> command, Object ... params);
@ -52,7 +49,7 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R, T> Publisher<R> writeAllReactive(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params); <R, T> Publisher<R> writeAllReactive(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> Publisher<R> readReactive(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> Publisher<R> readReactive(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> evalWriteReactive(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params); <T, R> Publisher<R> evalWriteReactive(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

@ -15,13 +15,13 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -76,11 +76,11 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
} }
@Override @Override
public <T, R> Publisher<R> readReactive(final InetSocketAddress client, final String key, final Codec codec, final RedisCommand<T> command, final Object ... params) { public <T, R> Publisher<R> readReactive(final RedisClient client, final String name, final Codec codec, final RedisCommand<T> command, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() { return reactive(new Supplier<RFuture<R>>() {
@Override @Override
public RFuture<R> get() { public RFuture<R> get() {
return readAsync(client, key, codec, command, params); return readAsync(client, name, codec, command, params);
}; };
}); });
} }
@ -136,18 +136,6 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
}); });
} }
@Override
public <T, R> Publisher<R> evalReadReactive(final InetSocketAddress client, final String key, final Codec codec, final RedisCommand<T> evalCommandType,
final String script, final List<Object> keys, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return evalReadAsync(client, key, codec, evalCommandType, script, keys, params);
};
});
}
@Override @Override
public <T, R> Publisher<R> evalWriteReactive(final String key, final Codec codec, final RedisCommand<T> evalCommandType, public <T, R> Publisher<R> evalWriteReactive(final String key, final Codec codec, final RedisCommand<T> evalCommandType,
final String script, final List<Object> keys, final Object... params) { final String script, final List<Object> keys, final Object... params) {

@ -32,8 +32,6 @@ public interface CommandSyncExecutor {
<V> V get(RFuture<V> future); <V> V get(RFuture<V> future);
<T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R write(String key, RedisCommand<T> command, Object ... params); <T, R> R write(String key, RedisCommand<T> command, Object ... params);
@ -46,10 +44,6 @@ public interface CommandSyncExecutor {
<T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params);
<T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

@ -49,18 +49,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
return get(res); return get(res);
} }
@Override
public <T, R> R read(InetSocketAddress client, String key, RedisCommand<T> command, Object ... params) {
RFuture<R> res = readAsync(client, key, connectionManager.getCodec(), command, params);
return get(res);
}
@Override
public <T, R> R read(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
RFuture<R> res = readAsync(client, key, codec, command, params);
return get(res);
}
@Override @Override
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params); return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
@ -83,12 +71,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
return get(res); return get(res);
} }
@Override
public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
RFuture<R> res = writeAsync(slot, codec, command, params);
return get(res);
}
@Override @Override
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
RFuture<R> res = writeAsync(key, codec, command, params); RFuture<R> res = writeAsync(key, codec, command, params);

@ -53,7 +53,7 @@ public class ConnectionEventsHub {
} }
public void fireDisconnect(InetSocketAddress addr) { public void fireDisconnect(InetSocketAddress addr) {
if (maps.get(addr) == Status.DISCONNECTED) { if (addr == null || maps.get(addr) == Status.DISCONNECTED) {
return; return;
} }

@ -102,8 +102,8 @@ public interface ConnectionManager {
RedisClient createClient(NodeType type, URI address); RedisClient createClient(NodeType type, URI address);
MasterSlaveEntry getEntry(InetSocketAddress addr); MasterSlaveEntry getEntry(RedisClient redisClient);
PubSubConnectionEntry getPubSubEntry(String channelName); PubSubConnectionEntry getPubSubEntry(String channelName);
RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?>... listeners); RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?>... listeners);

@ -17,9 +17,7 @@ package org.redisson.connection;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -29,35 +27,31 @@ import io.netty.util.concurrent.FutureListener;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class CountListener implements FutureListener<Void> { public class CountableListener<T> implements FutureListener<Object> {
private final RPromise<Void> res; protected final AtomicInteger counter = new AtomicInteger();
protected final RPromise<T> result;
protected final T value;
private final AtomicInteger counter; public CountableListener(RPromise<T> result, T value) {
super();
public static RPromise<Void> create(RFuture<Void>... futures) { this.result = result;
RPromise<Void> result = new RedissonPromise<Void>(); this.value = value;
FutureListener<Void> listener = new CountListener(result, futures.length);
for (RFuture<Void> future : futures) {
future.addListener(listener);
}
return result;
} }
public CountListener(RPromise<Void> res, int amount) { public void incCounter() {
super(); counter.incrementAndGet();
this.res = res;
this.counter = new AtomicInteger(amount);
} }
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
res.tryFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
res.trySuccess(null); result.trySuccess(value);
} }
} }

@ -18,23 +18,20 @@ package org.redisson.connection;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -50,33 +47,20 @@ public class DNSMonitor {
private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class); private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class);
private final DnsAddressResolverGroup resolverGroup; private final AddressResolver<InetSocketAddress> resolver;
private final ConnectionManager connectionManager;
private ScheduledFuture<?> dnsMonitorFuture;
private ConnectionManager connectionManager;
private final Map<URI, InetAddress> masters = new HashMap<URI, InetAddress>(); private final Map<URI, InetAddress> masters = new HashMap<URI, InetAddress>();
private final Map<URI, InetAddress> slaves = new HashMap<URI, InetAddress>(); private final Map<URI, InetAddress> slaves = new HashMap<URI, InetAddress>();
private ScheduledFuture<?> dnsMonitorFuture;
private long dnsMonitoringInterval; private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, Set<URI> masterHosts, Set<URI> slaveHosts, long dnsMonitoringInterval) { public DNSMonitor(ConnectionManager connectionManager, InetSocketAddress masterHost, Collection<URI> slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) {
Class<? extends DatagramChannel> channelClass; this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
if (connectionManager.getCfg().isUseLinuxNativeEpoll()) {
channelClass = EpollDatagramChannel.class;
} else {
channelClass = NioDatagramChannel.class;
}
resolverGroup = new DnsAddressResolverGroup(channelClass, DefaultDnsServerAddressStreamProvider.INSTANCE); URI uri = URIBuilder.create("redis://" + masterHost.getAddress().getHostAddress() + ":" + masterHost.getPort());
masters.put(uri, masterHost.getAddress());
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
for (URI host : masterHosts) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0));
resolveFuture.syncUninterruptibly();
masters.put(host, resolveFuture.getNow().getAddress());
}
for (URI host : slaveHosts) { for (URI host : slaveHosts) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0)); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0));
resolveFuture.syncUninterruptibly(); resolveFuture.syncUninterruptibly();
@ -101,7 +85,6 @@ public class DNSMonitor {
dnsMonitorFuture = connectionManager.getGroup().schedule(new Runnable() { dnsMonitorFuture = connectionManager.getGroup().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
final AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size());
for (final Entry<URI, InetAddress> entry : masters.entrySet()) { for (final Entry<URI, InetAddress> entry : masters.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0)); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0));
@ -120,15 +103,15 @@ public class DNSMonitor {
InetAddress master = entry.getValue(); InetAddress master = entry.getValue();
InetAddress now = future.get().getAddress(); InetAddress now = future.get().getAddress();
if (!now.getHostAddress().equals(master.getHostAddress())) { if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress()); log.info("Detected DNS change. Master {} has changed ip from {} to {}", entry.getKey(), master.getHostAddress(), now.getHostAddress());
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) { for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) {
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost()) if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost())
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) { && entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) {
entrySet.changeMaster(entry.getKey()); entrySet.changeMaster(entry.getKey());
break;
} }
} }
masters.put(entry.getKey(), now); masters.put(entry.getKey(), now);
log.info("Master {} has been changed", entry.getKey().getHost());
} }
} }
}); });
@ -149,17 +132,29 @@ public class DNSMonitor {
} }
InetAddress slave = entry.getValue(); InetAddress slave = entry.getValue();
InetAddress updatedSlave = future.get().getAddress(); final InetAddress updatedSlave = future.get().getAddress();
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) { if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress()); log.info("Detected DNS change. Slave {} has changed ip from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort()); final URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) {
masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER); if (masterSlaveEntry.hasSlave(uri)) {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(entry.getKey());
addFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + updatedSlave, future.cause());
return;
}
masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER);
}
});
break;
} }
} }
slaves.put(entry.getKey(), updatedSlave); slaves.put(entry.getKey(), updatedSlave);
log.info("Slave {} has been changed", entry.getKey().getHost());
} }
} }
}); });

@ -54,16 +54,21 @@ import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener; import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.Timer; import io.netty.util.Timer;
@ -117,9 +122,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected Codec codec; protected Codec codec;
protected EventLoopGroup group; protected final EventLoopGroup group;
protected Class<? extends SocketChannel> socketChannelClass; protected final Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap(); protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
@ -130,7 +135,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveServersConfig config; protected MasterSlaveServersConfig config;
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<MasterSlaveEntry>(MAX_SLOT); private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<MasterSlaveEntry>(MAX_SLOT);
private final Map<InetSocketAddress, MasterSlaveEntry> addr2entry = PlatformDependent.newConcurrentHashMap(); private final Map<RedisClient, MasterSlaveEntry> client2entry = PlatformDependent.newConcurrentHashMap();
private final RPromise<Boolean> shutdownPromise; private final RPromise<Boolean> shutdownPromise;
@ -151,6 +156,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final CommandSyncService commandExecutor; private final CommandSyncService commandExecutor;
private final Config cfg; private final Config cfg;
private final DnsAddressResolverGroup resolverGroup;
{ {
for (int i = 0; i < locks.length; i++) { for (int i = 0; i < locks.length; i++) {
@ -176,6 +183,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
this.socketChannelClass = EpollSocketChannel.class; this.socketChannelClass = EpollSocketChannel.class;
this.resolverGroup = new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
} else { } else {
if (cfg.getEventLoopGroup() == null) { if (cfg.getEventLoopGroup() == null) {
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
@ -184,14 +192,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
this.socketChannelClass = NioSocketChannel.class; this.socketChannelClass = NioSocketChannel.class;
// if (cfg.getEventLoopGroup() == null) { this.resolverGroup = new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
// this.group = new OioEventLoopGroup(cfg.getThreads());
// } else {
// this.group = cfg.getEventLoopGroup();
// }
//
// this.socketChannelClass = OioSocketChannel.class;
} }
if (cfg.getExecutor() == null) { if (cfg.getExecutor() == null) {
int threads = Runtime.getRuntime().availableProcessors() * 2; int threads = Runtime.getRuntime().availableProcessors() * 2;
if (cfg.getThreads() != 0) { if (cfg.getThreads() != 0) {
@ -236,7 +239,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public Collection<MasterSlaveEntry> getEntrySet() { public Collection<MasterSlaveEntry> getEntrySet() {
return addr2entry.values(); return client2entry.values();
} }
protected void initTimer(MasterSlaveServersConfig config) { protected void initTimer(MasterSlaveServersConfig config) {
@ -273,19 +276,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
MasterSlaveEntry entry; MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) { if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(slots, this, config); entry = new SingleEntry(slots, this, config);
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} else { } else {
entry = createMasterSlaveEntry(config, slots); entry = createMasterSlaveEntry(config, slots);
} }
RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
addEntry(slot, entry); addEntry(slot, entry);
} }
InetSocketAddress masterHost = f.getNow().resolveAddr().syncUninterruptibly().getNow();
if (config.getDnsMonitoringInterval() != -1) { if (config.getDnsMonitoringInterval() != -1) {
dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()), dnsMonitor = new DNSMonitor(this, masterHost,
config.getSlaveAddresses(), config.getDnsMonitoringInterval()); config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup);
dnsMonitor.start(); dnsMonitor.start();
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
@ -301,8 +305,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
for (RFuture<Void> future : fs) { for (RFuture<Void> future : fs) {
future.syncUninterruptibly(); future.syncUninterruptibly();
} }
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
return entry; return entry;
} }
@ -367,6 +369,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
redisConfig.setAddress(address) redisConfig.setAddress(address)
.setTimer(timer) .setTimer(timer)
.setExecutor(executor) .setExecutor(executor)
.setResolverGroup(resolverGroup)
.setGroup(group) .setGroup(group)
.setSocketChannelClass(socketChannelClass) .setSocketChannelClass(socketChannelClass)
.setConnectTimeout(timeout) .setConnectTimeout(timeout)
@ -674,11 +677,32 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return newSucceededFuture(entryCodec); return newSucceededFuture(entryCodec);
} }
@Override public MasterSlaveEntry getEntry(URI addr) {
public MasterSlaveEntry getEntry(InetSocketAddress addr) { for (MasterSlaveEntry entry : client2entry.values()) {
return addr2entry.get(addr); if (URIBuilder.compare(entry.getClient().getAddr(), addr)) {
return entry;
}
if (entry.hasSlave(addr)) {
return entry;
}
}
return null;
} }
public MasterSlaveEntry getEntry(RedisClient redisClient) {
MasterSlaveEntry entry = client2entry.get(redisClient);
if (entry != null) {
return entry;
}
for (MasterSlaveEntry mentry : client2entry.values()) {
if (mentry.hasSlave(redisClient)) {
return mentry;
}
}
return null;
}
@Override @Override
public MasterSlaveEntry getEntry(int slot) { public MasterSlaveEntry getEntry(int slot) {
return slot2entry.get(slot); return slot2entry.get(slot);
@ -686,22 +710,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final void changeMaster(int slot, URI address) { protected final void changeMaster(int slot, URI address) {
MasterSlaveEntry entry = getEntry(slot); MasterSlaveEntry entry = getEntry(slot);
addr2entry.remove(entry.getClient().getAddr()); client2entry.remove(entry.getClient());
entry.changeMaster(address); entry.changeMaster(address);
addr2entry.put(entry.getClient().getAddr(), entry); client2entry.put(entry.getClient(), entry);
} }
protected final void addEntry(Integer slot, MasterSlaveEntry entry) { protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
slot2entry.set(slot, entry); slot2entry.set(slot, entry);
entry.addSlotRange(slot); entry.addSlotRange(slot);
addr2entry.put(entry.getClient().getAddr(), entry); client2entry.put(entry.getClient(), entry);
} }
protected MasterSlaveEntry removeMaster(Integer slot) { protected final MasterSlaveEntry removeEntry(Integer slot) {
MasterSlaveEntry entry = slot2entry.getAndSet(slot, null); MasterSlaveEntry entry = slot2entry.getAndSet(slot, null);
entry.removeSlotRange(slot); entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) { if (entry.getSlotRanges().isEmpty()) {
addr2entry.remove(entry.getClient().getAddr()); client2entry.remove(entry.getClient());
} }
return entry; return entry;
} }
@ -734,6 +758,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (entry == null && source.getSlot() != null) { if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot()); entry = getEntry(source.getSlot());
} }
if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient());
}
if (source.getAddr() != null) { if (source.getAddr() != null) {
entry = getEntry(source.getAddr()); entry = getEntry(source.getAddr());
if (entry == null) { if (entry == null) {
@ -836,6 +863,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
} }
resolverGroup.close();
if (cfg.getEventLoopGroup() == null) { if (cfg.getEventLoopGroup() == null) {
group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly(); group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly();
} }

@ -42,6 +42,9 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -66,10 +69,10 @@ public class MasterSlaveEntry {
final MasterSlaveServersConfig config; final MasterSlaveServersConfig config;
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
final MasterConnectionPool writeConnectionHolder; final MasterConnectionPool writeConnectionPool;
final Set<Integer> slots = new HashSet<Integer>(); final Set<Integer> slots = new HashSet<Integer>();
final MasterPubSubConnectionPool pubSubConnectionHolder; final MasterPubSubConnectionPool pubSubConnectionPool;
final AtomicBoolean active = new AtomicBoolean(true); final AtomicBoolean active = new AtomicBoolean(true);
@ -83,8 +86,8 @@ public class MasterSlaveEntry {
this.config = config; this.config = config;
slaveBalancer = new LoadBalancerManager(config, connectionManager, this); slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); writeConnectionPool = new MasterConnectionPool(config, connectionManager, this);
pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this); pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this);
} }
public MasterSlaveServersConfig getConfig() { public MasterSlaveServersConfig getConfig() {
@ -106,7 +109,7 @@ public class MasterSlaveEntry {
return result; return result;
} }
public RFuture<Void> setupMasterEntry(URI address) { public RPromise<RedisClient> setupMasterEntry(URI address) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address); RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
masterEntry = new ClientConnectionsEntry( masterEntry = new ClientConnectionsEntry(
client, client,
@ -117,12 +120,23 @@ public class MasterSlaveEntry {
connectionManager, connectionManager,
NodeType.MASTER); NodeType.MASTER);
RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client);
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
listener.incCounter();
addrFuture.addListener(listener);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
listener.incCounter();
writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> f = writeConnectionHolder.add(masterEntry); RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry); listener.incCounter();
return CountListener.create(s, f); pubSubFuture.addListener(listener);
} }
return writeConnectionHolder.add(masterEntry);
return result;
} }
public boolean slaveDown(URI address, FreezeReason freezeReason) { public boolean slaveDown(URI address, FreezeReason freezeReason) {
@ -134,6 +148,15 @@ public class MasterSlaveEntry {
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
} }
public boolean slaveDown(RedisClient redisClient, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(redisClient, freezeReason);
if (entry == null) {
return false;
}
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
@ -201,6 +224,7 @@ public class MasterSlaveEntry {
return; return;
} }
System.out.println("channelName " + channelName + " resubscribed!");
subscribeCodec.addListener(new FutureListener<Codec>() { subscribeCodec.addListener(new FutureListener<Codec>() {
@Override @Override
public void operationComplete(Future<Codec> future) throws Exception { public void operationComplete(Future<Codec> future) throws Exception {
@ -308,11 +332,11 @@ public class MasterSlaveEntry {
}); });
} }
public boolean hasSlave(InetSocketAddress addr) { public boolean hasSlave(RedisClient redisClient) {
return slaveBalancer.contains(addr); return slaveBalancer.contains(redisClient);
} }
public boolean hasSlave(String addr) { public boolean hasSlave(URI addr) {
return slaveBalancer.contains(addr); return slaveBalancer.contains(addr);
} }
@ -322,7 +346,7 @@ public class MasterSlaveEntry {
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) { private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client, final ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSubscriptionConnectionMinimumIdleSize(), this.config.getSubscriptionConnectionMinimumIdleSize(),
@ -333,7 +357,22 @@ public class MasterSlaveEntry {
entry.setFreezeReason(FreezeReason.SYSTEM); entry.setFreezeReason(FreezeReason.SYSTEM);
} }
} }
return slaveBalancer.add(entry);
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
RFuture<Void> addFuture = slaveBalancer.add(entry);
addFuture.addListener(new TransferListener<Void>(result));
}
});
return result;
} }
public RedisClient getClient() { public RedisClient getClient() {
@ -345,11 +384,10 @@ public class MasterSlaveEntry {
return false; return false;
} }
InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves // exclude master from slaves
if (!config.checkSkipSlavesInit() if (!config.checkSkipSlavesInit()
&& (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { && !URIBuilder.compare(addr, address)) {
slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM); slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr); log.info("master {} excluded from slaves", addr);
} }
@ -365,27 +403,33 @@ public class MasterSlaveEntry {
*/ */
public void changeMaster(final URI address) { public void changeMaster(final URI address) {
final ClientConnectionsEntry oldMaster = masterEntry; final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<Void> future = setupMasterEntry(address); RFuture<RedisClient> future = setupMasterEntry(address);
future.addListener(new FutureListener<Void>() { future.addListener(new FutureListener<RedisClient>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<RedisClient> future) throws Exception {
writeConnectionHolder.remove(oldMaster); if (!future.isSuccess()) {
pubSubConnectionHolder.remove(oldMaster); log.error("Can't change master to: {}", address);
return;
}
RedisClient newMasterClient = future.getNow();
writeConnectionPool.remove(oldMaster);
pubSubConnectionPool.remove(oldMaster);
oldMaster.freezeMaster(FreezeReason.MANAGER); oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster, false); slaveDown(oldMaster, false);
slaveDown(URIBuilder.create("redis://" + oldMaster.getClient().getIpAddr()), FreezeReason.MANAGER);
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE); slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE);
slaveBalancer.changeType(address, NodeType.MASTER); slaveBalancer.changeType(newMasterClient, NodeType.MASTER);
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
if (!config.checkSkipSlavesInit() if (!config.checkSkipSlavesInit()
&& slaveBalancer.getAvailableClients() > 1) { && slaveBalancer.getAvailableClients() > 1) {
slaveDown(address, FreezeReason.SYSTEM); slaveDown(newMasterClient, FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), address); log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr());
} }
}); });
} }
@ -420,7 +464,7 @@ public class MasterSlaveEntry {
} }
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) { public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
return writeConnectionHolder.get(command); return writeConnectionPool.get(command);
} }
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
@ -430,7 +474,7 @@ public class MasterSlaveEntry {
return slaveBalancer.nextConnection(command); return slaveBalancer.nextConnection(command);
} }
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, URI addr) {
if (config.getReadMode() == ReadMode.MASTER) { if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command); return connectionWriteOp(command);
} }
@ -439,7 +483,7 @@ public class MasterSlaveEntry {
RFuture<RedisPubSubConnection> nextPubSubConnection() { RFuture<RedisPubSubConnection> nextPubSubConnection() {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
return pubSubConnectionHolder.get(); return pubSubConnectionPool.get();
} }
return slaveBalancer.nextPubSubConnection(); return slaveBalancer.nextPubSubConnection();
@ -447,14 +491,14 @@ public class MasterSlaveEntry {
public void returnPubSubConnection(PubSubConnectionEntry entry) { public void returnPubSubConnection(PubSubConnectionEntry entry) {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); pubSubConnectionPool.returnConnection(masterEntry, entry.getConnection());
return; return;
} }
slaveBalancer.returnPubSubConnection(entry.getConnection()); slaveBalancer.returnPubSubConnection(entry.getConnection());
} }
public void releaseWrite(RedisConnection connection) { public void releaseWrite(RedisConnection connection) {
writeConnectionHolder.returnConnection(masterEntry, connection); writeConnectionPool.returnConnection(masterEntry, connection);
} }
public void releaseRead(RedisConnection connection) { public void releaseRead(RedisConnection connection) {

@ -15,38 +15,44 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.URI;
public class NodeSource { import org.redisson.client.RedisClient;
public static final NodeSource ZERO = new NodeSource(0); /**
*
* @author Nikita Koksharov
*
*/
public class NodeSource {
public enum Redirect {MOVED, ASK} public enum Redirect {MOVED, ASK}
private final Integer slot; private Integer slot;
private final InetSocketAddress addr; private URI addr;
private final Redirect redirect; private RedisClient redisClient;
private Redirect redirect;
private MasterSlaveEntry entry; private MasterSlaveEntry entry;
public NodeSource(MasterSlaveEntry entry) { public NodeSource(MasterSlaveEntry entry) {
this(null, null, null);
this.entry = entry; this.entry = entry;
} }
public NodeSource(MasterSlaveEntry entry, InetSocketAddress addr) { public NodeSource(MasterSlaveEntry entry, RedisClient redisClient) {
this(null, addr, null);
this.entry = entry; this.entry = entry;
this.redisClient = redisClient;
} }
public NodeSource(Integer slot) { public NodeSource(RedisClient redisClient) {
this(slot, null, null); this.redisClient = redisClient;
} }
public NodeSource(Integer slot, InetSocketAddress addr) { public NodeSource(Integer slot, RedisClient redisClient) {
this(slot, addr, null); this.slot = slot;
this.redisClient = redisClient;
} }
public NodeSource(Integer slot, InetSocketAddress addr, Redirect redirect) { public NodeSource(Integer slot, URI addr, Redirect redirect) {
this.slot = slot; this.slot = slot;
this.addr = addr; this.addr = addr;
this.redirect = redirect; this.redirect = redirect;
@ -64,7 +70,11 @@ public class NodeSource {
return slot; return slot;
} }
public InetSocketAddress getAddr() { public RedisClient getRedisClient() {
return redisClient;
}
public URI getAddr() {
return addr; return addr;
} }

@ -59,7 +59,7 @@ public class RedisClientEntry implements ClusterNode {
} }
public RFuture<Boolean> pingAsync() { public RFuture<Boolean> pingAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, null, RedisCommands.PING_BOOL); return commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL);
} }
@Override @Override
@ -94,7 +94,7 @@ public class RedisClientEntry implements ClusterNode {
@Override @Override
public RFuture<Long> timeAsync() { public RFuture<Long> timeAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.TIME); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.TIME);
} }
@Override @Override
@ -104,7 +104,7 @@ public class RedisClientEntry implements ClusterNode {
@Override @Override
public RFuture<Map<String, String>> clusterInfoAsync() { public RFuture<Map<String, String>> clusterInfoAsync() {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO);
} }
@Override @Override
@ -120,29 +120,29 @@ public class RedisClientEntry implements ClusterNode {
@Override @Override
public RFuture<Map<String, String>> infoAsync(InfoSection section) { public RFuture<Map<String, String>> infoAsync(InfoSection section) {
if (section == InfoSection.ALL) { if (section == InfoSection.ALL) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_ALL); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_ALL);
} else if (section == InfoSection.DEFAULT) { } else if (section == InfoSection.DEFAULT) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT);
} else if (section == InfoSection.SERVER) { } else if (section == InfoSection.SERVER) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_SERVER);
} else if (section == InfoSection.CLIENTS) { } else if (section == InfoSection.CLIENTS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS);
} else if (section == InfoSection.MEMORY) { } else if (section == InfoSection.MEMORY) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY);
} else if (section == InfoSection.PERSISTENCE) { } else if (section == InfoSection.PERSISTENCE) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE);
} else if (section == InfoSection.STATS) { } else if (section == InfoSection.STATS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_STATS); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_STATS);
} else if (section == InfoSection.REPLICATION) { } else if (section == InfoSection.REPLICATION) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION);
} else if (section == InfoSection.CPU) { } else if (section == InfoSection.CPU) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CPU); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CPU);
} else if (section == InfoSection.COMMANDSTATS) { } else if (section == InfoSection.COMMANDSTATS) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS);
} else if (section == InfoSection.CLUSTER) { } else if (section == InfoSection.CLUSTER) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER);
} else if (section == InfoSection.KEYSPACE) { } else if (section == InfoSection.KEYSPACE) {
return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE); return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE);
} }
throw new IllegalStateException(); throw new IllegalStateException();
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -154,8 +153,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
for (RFuture<Void> future : fs) { for (RFuture<Void> future : fs) {
future.syncUninterruptibly(); future.syncUninterruptibly();
} }
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
return entry; return entry;
} }
@ -355,9 +352,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];
URI uri = convert(ip, port);
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.isFreezed() if (entry.isFreezed()
&& entry.getClient().getAddr().equals(new InetSocketAddress(ip, Integer.valueOf(port)))) { && URIBuilder.compare(entry.getClient().getAddr(), uri)) {
entry.unfreeze(); entry.unfreeze();
String masterAddr = ip + ":" + port; String masterAddr = ip + ":" + port;
log.info("master: {} has up", masterAddr); log.info("master: {} has up", masterAddr);

@ -15,7 +15,7 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.URI;
import java.util.Set; import java.util.Set;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -36,7 +36,7 @@ public class SingleEntry extends MasterSlaveEntry {
} }
@Override @Override
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, URI addr) {
return super.connectionWriteOp(command); return super.connectionWriteOp(command);
} }

@ -18,10 +18,10 @@ package org.redisson.connection.balancer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
@ -31,15 +31,16 @@ import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.CountableListener;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool; import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool; import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/** /**
@ -55,7 +56,7 @@ public class LoadBalancerManager {
private final PubSubConnectionPool pubSubConnectionPool; private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool; private final SlaveConnectionPool slaveConnectionPool;
private final Map<String, ClientConnectionsEntry> ip2Entry = PlatformDependent.newConcurrentHashMap(); private final Map<RedisClient, ClientConnectionsEntry> client2Entry = PlatformDependent.newConcurrentHashMap();
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
@ -63,12 +64,12 @@ public class LoadBalancerManager {
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry); pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
} }
public void changeType(InetSocketAddress addr, NodeType nodeType) { public void changeType(RedisClient redisClient, NodeType nodeType) {
ClientConnectionsEntry entry = ip2Entry.get(convert(addr)); ClientConnectionsEntry entry = getEntry(redisClient);
changeType(addr, nodeType, entry); changeType(nodeType, entry);
} }
protected void changeType(Object addr, NodeType nodeType, ClientConnectionsEntry entry) { protected void changeType(NodeType nodeType, ClientConnectionsEntry entry) {
if (entry != null) { if (entry != null) {
if (connectionManager.isClusterMode()) { if (connectionManager.isClusterMode()) {
entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER); entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER);
@ -79,37 +80,34 @@ public class LoadBalancerManager {
public void changeType(URI address, NodeType nodeType) { public void changeType(URI address, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(address); ClientConnectionsEntry entry = getEntry(address);
changeType(address, nodeType, entry); changeType(nodeType, entry);
} }
public RFuture<Void> add(final ClientConnectionsEntry entry) { public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise(); RPromise<Void> result = new RedissonPromise<Void>();
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2); CountableListener<Void> listener = new CountableListener<Void>(result, null) {
@Override public void operationComplete(io.netty.util.concurrent.Future<Object> future) throws Exception {
public void operationComplete(Future<Void> future) throws Exception { super.operationComplete(future);
if (!future.isSuccess()) { if (this.result.isSuccess()) {
result.tryFailure(future.cause()); client2Entry.put(entry.getClient(), entry);
return;
} }
if (counter.decrementAndGet() == 0) { };
String addr = entry.getClient().getIpAddr();
ip2Entry.put(addr, entry);
result.trySuccess(null);
}
}
}; };
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry); RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
listener.incCounter();
slaveFuture.addListener(listener); slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry); RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
listener.incCounter();
pubSubFuture.addListener(listener); pubSubFuture.addListener(listener);
return result; return result;
} }
public int getAvailableClients() { public int getAvailableClients() {
int count = 0; int count = 0;
for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) { for (ClientConnectionsEntry connectionEntry : client2Entry.values()) {
if (!connectionEntry.isFreezed()) { if (!connectionEntry.isFreezed()) {
count++; count++;
} }
@ -139,19 +137,14 @@ public class LoadBalancerManager {
return false; return false;
} }
private String convert(URI address) {
InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort());
return convert(addr);
}
public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address); ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason); return freeze(connectionEntry, freezeReason);
} }
private ClientConnectionsEntry getEntry(URI address) { public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) {
String addr = convert(address); ClientConnectionsEntry connectionEntry = getEntry(redisClient);
return ip2Entry.get(addr); return freeze(connectionEntry, freezeReason);
} }
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
@ -179,20 +172,34 @@ public class LoadBalancerManager {
return pubSubConnectionPool.get(); return pubSubConnectionPool.get();
} }
public boolean contains(InetSocketAddress addr) { public boolean contains(URI addr) {
return ip2Entry.containsKey(convert(addr)); return getEntry(addr) != null;
}
public boolean contains(RedisClient redisClient) {
return getEntry(redisClient) != null;
} }
protected String convert(InetSocketAddress addr) { protected ClientConnectionsEntry getEntry(URI addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort(); for (ClientConnectionsEntry entry : client2Entry.values()) {
InetSocketAddress entryAddr = entry.getClient().getAddr();
if (URIBuilder.compare(entryAddr, addr)) {
return entry;
}
}
return null;
} }
public boolean contains(String addr) { protected ClientConnectionsEntry getEntry(RedisClient redisClient) {
return ip2Entry.containsKey(addr); return client2Entry.get(redisClient);
}
protected String convert(InetSocketAddress addr) {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
} }
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> getConnection(RedisCommand<?> command, URI addr) {
ClientConnectionsEntry entry = ip2Entry.get(convert(addr)); ClientConnectionsEntry entry = getEntry(addr);
if (entry != null) { if (entry != null) {
return slaveConnectionPool.get(command, entry); return slaveConnectionPool.get(command, entry);
} }
@ -205,23 +212,23 @@ public class LoadBalancerManager {
} }
public void returnPubSubConnection(RedisPubSubConnection connection) { public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = ip2Entry.get(convert(connection.getRedisClient().getAddr())); ClientConnectionsEntry entry = getEntry(connection.getRedisClient());
pubSubConnectionPool.returnConnection(entry, connection); pubSubConnectionPool.returnConnection(entry, connection);
} }
public void returnConnection(RedisConnection connection) { public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = ip2Entry.get(convert(connection.getRedisClient().getAddr())); ClientConnectionsEntry entry = getEntry(connection.getRedisClient());
slaveConnectionPool.returnConnection(entry, connection); slaveConnectionPool.returnConnection(entry, connection);
} }
public void shutdown() { public void shutdown() {
for (ClientConnectionsEntry entry : ip2Entry.values()) { for (ClientConnectionsEntry entry : client2Entry.values()) {
entry.getClient().shutdown(); entry.getClient().shutdown();
} }
} }
public void shutdownAsync() { public void shutdownAsync() {
for (ClientConnectionsEntry entry : ip2Entry.values()) { for (ClientConnectionsEntry entry : client2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient()); connectionManager.shutdownAsync(entry.getClient());
} }
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.jcache; package org.redisson.jcache;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -58,6 +57,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RSemaphore; import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
@ -2082,7 +2082,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime); cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
} }
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) { MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, RedisClient client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
try { try {

@ -15,6 +15,7 @@
*/ */
package org.redisson.misc; package org.redisson.misc;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
/** /**
@ -37,4 +38,13 @@ public class URIBuilder {
return URI.create(uri.replace(s, "[" + s + "]")); return URI.create(uri.replace(s, "[" + s + "]"));
} }
public static boolean compare(InetSocketAddress entryAddr, URI addr) {
if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost()))
|| entryAddr.getAddress().getHostAddress().equals(addr.getHost()))
&& entryAddr.getPort() == addr.getPort()) {
return true;
}
return false;
}
} }

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -30,7 +31,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
*/ */
interface MapReactive<K, V> { interface MapReactive<K, V> {
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos); Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos);
Publisher<V> put(K key, V value); Publisher<V> put(K key, V value);

@ -15,7 +15,6 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -29,6 +28,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RMapCacheAsync; import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive; import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -164,7 +164,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
} }
@Override @Override
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(final InetSocketAddress client, final long startPos) { public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>>() { return reactive(new Supplier<RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>>>() {
@Override @Override
public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> get() { public RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> get() {

@ -27,6 +27,7 @@ import org.redisson.api.MapOptions;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMapAsync; import org.redisson.api.RMapAsync;
import org.redisson.api.RMapReactive; import org.redisson.api.RMapReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -284,7 +285,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
}); });
} }
public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) { public Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
} }

@ -15,7 +15,6 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -24,6 +23,7 @@ import java.util.Map.Entry;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -56,7 +56,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
private Map<ByteBuf, ByteBuf> firstValues; private Map<ByteBuf, ByteBuf> firstValues;
private long iterPos = 0; private long iterPos = 0;
private InetSocketAddress client; private RedisClient client;
private long currentIndex; private long currentIndex;

@ -25,6 +25,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -175,7 +176,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
}); });
} }
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) { private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
} }
@ -183,7 +184,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
public Publisher<V> iterator() { public Publisher<V> iterator() {
return new SetReactiveIterator<V>() { return new SetReactiveIterator<V>() {
@Override @Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
} }
}; };

@ -26,6 +26,7 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache; import org.redisson.RedissonSetCache;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetCacheReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
@ -85,7 +86,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
}); });
} }
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(final InetSocketAddress client, final long startPos) { Publisher<ListScanResult<ScanObjectEntry>> scanIterator(final RedisClient client, final long startPos) {
return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() { return reactive(new Supplier<RFuture<ListScanResult<ScanObjectEntry>>>() {
@Override @Override
public RFuture<ListScanResult<ScanObjectEntry>> get() { public RFuture<ListScanResult<ScanObjectEntry>> get() {
@ -98,7 +99,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public Publisher<V> iterator() { public Publisher<V> iterator() {
return new SetReactiveIterator<V>() { return new SetReactiveIterator<V>() {
@Override @Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos); return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
} }
}; };

@ -26,6 +26,7 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSet; import org.redisson.RedissonSet;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSetReactive; import org.redisson.api.RSetReactive;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -76,7 +77,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
}); });
} }
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) { private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
} }
@ -211,7 +212,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() { public Publisher<V> iterator() {
return new SetReactiveIterator<V>() { return new SetReactiveIterator<V>() {
@Override @Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
} }
}; };

@ -22,6 +22,7 @@ import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -44,7 +45,7 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
private List<ByteBuf> firstValues; private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues; private List<ByteBuf> lastValues;
private long nextIterPos; private long nextIterPos;
private InetSocketAddress client; private RedisClient client;
private boolean finished; private boolean finished;
@ -168,6 +169,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
return result; return result;
} }
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos); protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos);
} }

@ -31,6 +31,7 @@ import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodesGroup; import org.redisson.api.NodesGroup;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
@ -84,7 +85,7 @@ public class RedissonTest {
RedissonBaseIterator iter = new RedissonBaseIterator() { RedissonBaseIterator iter = new RedissonBaseIterator() {
int i; int i;
@Override @Override
ListScanResult iterator(InetSocketAddress client, long nextIterPos) { ListScanResult iterator(RedisClient client, long nextIterPos) {
i++; i++;
if (i == 1) { if (i == 1) {
return new ListScanResult(13L, Collections.emptyList()); return new ListScanResult(13L, Collections.emptyList());
@ -110,7 +111,7 @@ public class RedissonTest {
RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() { RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() {
int i; int i;
@Override @Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<ScanObjectEntry> iterator(RedisClient client, long nextIterPos) {
i++; i++;
if (i == 1) { if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1))); return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1)));

Loading…
Cancel
Save