diff --git a/redisson/src/main/java/org/redisson/RedisClientResult.java b/redisson/src/main/java/org/redisson/RedisClientResult.java index 48e9a6fa0..69df8e495 100644 --- a/redisson/src/main/java/org/redisson/RedisClientResult.java +++ b/redisson/src/main/java/org/redisson/RedisClientResult.java @@ -15,12 +15,17 @@ */ package org.redisson; -import java.net.InetSocketAddress; +import org.redisson.client.RedisClient; +/** + * + * @author Nikita Koksharov + * + */ public interface RedisClientResult { - void setRedisClient(InetSocketAddress addr); + void setRedisClient(RedisClient addr); - InetSocketAddress getRedisClient(); + RedisClient getRedisClient(); } diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java index 10a5861a3..7a5d4a689 100644 --- a/redisson/src/main/java/org/redisson/RedisNodes.java +++ b/redisson/src/main/java/org/redisson/RedisNodes.java @@ -15,7 +15,6 @@ */ package org.redisson; -import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -34,10 +33,10 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionManager; import org.redisson.connection.RedisClientEntry; +import org.redisson.misc.URIBuilder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import org.redisson.misc.URIBuilder; /** * @@ -56,10 +55,9 @@ public class RedisNodes implements NodesGroup { @Override public N getNode(String address) { Collection clients = (Collection) connectionManager.getClients(); - URI uri = URIBuilder.create(address); - InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort()); + URI addr = URIBuilder.create(address); for (N node : clients) { - if (node.getAddr().equals(addr)) { + if (URIBuilder.compare(node.getAddr(), addr)) { return node; } } diff --git a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java index 4410270cf..521d445b1 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java @@ -15,12 +15,12 @@ */ package org.redisson; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -38,7 +38,7 @@ abstract class RedissonBaseIterator implements Iterator { private List lastValues; private Iterator lastIter; protected long nextIterPos; - protected InetSocketAddress client; + protected RedisClient client; private boolean finished; private boolean currentElementRemoved; @@ -145,7 +145,7 @@ abstract class RedissonBaseIterator implements Iterator { return false; } - abstract ListScanResult iterator(InetSocketAddress client, long nextIterPos); + abstract ListScanResult iterator(RedisClient client, long nextIterPos); @Override public V next() { diff --git a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java index de2363299..c86380aa3 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseMapIterator.java @@ -15,7 +15,6 @@ */ package org.redisson; -import java.net.InetSocketAddress; import java.util.AbstractMap; import java.util.HashMap; import java.util.Iterator; @@ -23,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -42,7 +42,7 @@ public abstract class RedissonBaseMapIterator implements Iterator { private Map lastValues; private Iterator> lastIter; protected long nextIterPos; - protected InetSocketAddress client; + protected RedisClient client; private boolean finished; private boolean currentElementRemoved; diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 1aae21912..58bc92891 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -15,7 +15,6 @@ */ package org.redisson; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -31,6 +30,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RKeys; import org.redisson.api.RObject; import org.redisson.api.RType; +import org.redisson.client.RedisClient; import org.redisson.client.RedisException; import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; @@ -107,7 +107,7 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null); } - private ListScanResult scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { + private ListScanResult scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count); return commandExecutor.get(f); @@ -120,7 +120,7 @@ public class RedissonKeys implements RKeys { return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 489546c80..fed493b12 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -16,7 +16,6 @@ package org.redisson; import java.math.BigDecimal; -import java.net.InetSocketAddress; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.ArrayList; @@ -40,6 +39,7 @@ import org.redisson.api.RMap; import org.redisson.api.RReadWriteLock; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.MapScanCodec; @@ -949,7 +949,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - MapScanResult scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { + MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { if (pattern == null) { RFuture> f = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 482769042..8e025f209 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -37,6 +37,7 @@ import org.redisson.api.map.event.EntryExpiredListener; import org.redisson.api.map.event.EntryRemovedListener; import org.redisson.api.map.event.EntryUpdatedListener; import org.redisson.api.map.event.MapEntryListener; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.MapScanCodec; @@ -1169,11 +1170,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - MapScanResult scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { + MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { return get(scanIteratorAsync(name, client, startPos, pattern)); } - public RFuture> scanIteratorAsync(final String name, InetSocketAddress client, long startPos, String pattern) { + public RFuture> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(System.currentTimeMillis()); params.add(startPos); diff --git a/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java b/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java index c0a947614..9326eaa3e 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonMultiMapIterator.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -40,7 +41,7 @@ abstract class RedissonMultiMapIterator implements Iterator { private Iterator valuesIter; protected long valuesIterPos = 0; - protected InetSocketAddress client; + protected RedisClient client; private boolean finished; private boolean removeExecuted; diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index dd64af9d6..734bc7d79 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -33,6 +33,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RMultimap; import org.redisson.api.RReadWriteLock; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.MapScanCodec; @@ -298,7 +299,7 @@ public abstract class RedissonMultimap extends RedissonExpirable implement } - MapScanResult scanIterator(InetSocketAddress client, long startPos) { + MapScanResult scanIterator(RedisClient client, long startPos) { RFuture> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 1373d68f8..d42557046 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -16,7 +16,6 @@ package org.redisson; import java.math.BigDecimal; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -32,16 +31,14 @@ import org.redisson.api.RScoredSortedSet; import org.redisson.api.RedissonClient; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScoredCodec; -import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; @@ -315,7 +312,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o)); } - private ListScanResult scanIterator(InetSocketAddress client, long startPos) { + private ListScanResult scanIterator(RedisClient client, long startPos) { RFuture> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); return get(f); } @@ -325,7 +322,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index dd91c6f6e..d59270423 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -29,6 +29,7 @@ import org.redisson.api.RSet; import org.redisson.api.RedissonClient; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; @@ -89,7 +90,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt } @Override - public ListScanResult scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { + public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { if (pattern == null) { RFuture> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos); return get(f); @@ -104,7 +105,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(getName(), client, nextIterPos, pattern); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 1d9ac9cc3..a75f61034 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -29,6 +29,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RSetCache; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; @@ -123,12 +124,13 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< Arrays.asList(getName(o)), System.currentTimeMillis(), encode(o)); } - public ListScanResult scanIterator(String name, InetSocketAddress client, long startPos, String pattern) { + @Override + public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern) { RFuture> f = scanIteratorAsync(name, client, startPos, pattern); return get(f); } - public RFuture> scanIteratorAsync(String name, InetSocketAddress client, long startPos, String pattern) { + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(startPos); params.add(System.currentTimeMillis()); @@ -160,7 +162,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(getName(), client, nextIterPos, pattern); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index bdf62a37b..c365fc1bf 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -15,7 +15,6 @@ */ package org.redisson; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -28,6 +27,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; @@ -165,7 +165,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(o)); } - private ListScanResult scanIterator(InetSocketAddress client, long startPos, String pattern) { + private ListScanResult scanIterator(RedisClient client, long startPos, String pattern) { List params = new ArrayList(); params.add(System.currentTimeMillis()); params.add(startPos); @@ -201,7 +201,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(client, nextIterPos, pattern); } diff --git a/redisson/src/main/java/org/redisson/ScanIterator.java b/redisson/src/main/java/org/redisson/ScanIterator.java index 047dbcc18..f962f502d 100644 --- a/redisson/src/main/java/org/redisson/ScanIterator.java +++ b/redisson/src/main/java/org/redisson/ScanIterator.java @@ -15,8 +15,7 @@ */ package org.redisson; -import java.net.InetSocketAddress; - +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -27,7 +26,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; */ public interface ScanIterator { - ListScanResult scanIterator(String name, InetSocketAddress client, long startPos, String pattern); + ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern); boolean remove(Object value); diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index 1728f1133..1da0bd527 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -20,13 +20,13 @@ import java.net.URI; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RFuture; import org.redisson.client.handler.RedisChannelInitializer; import org.redisson.client.handler.RedisChannelInitializer.Type; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; -import org.redisson.misc.URIBuilder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -34,12 +34,16 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.resolver.AddressResolver; +import io.netty.resolver.dns.DnsAddressResolverGroup; +import io.netty.resolver.dns.DnsServerAddressStreamProviders; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.Future; @@ -53,9 +57,11 @@ import io.netty.util.concurrent.FutureListener; */ public class RedisClient { + private final AtomicReference> resolveFuture = new AtomicReference>(); private final Bootstrap bootstrap; private final Bootstrap pubSubBootstrap; - private final InetSocketAddress addr; + private final URI addr; + private InetSocketAddress resolvedAddr; private final ChannelGroup channels; private ExecutorService executor; @@ -66,6 +72,7 @@ public class RedisClient { private boolean hasOwnTimer; private boolean hasOwnExecutor; private boolean hasOwnGroup; + private boolean hasOwnResolver; public static RedisClient create(RedisClientConfig config) { return new RedisClient(config); @@ -85,12 +92,20 @@ public class RedisClient { copy.setExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)); hasOwnExecutor = true; } + if (copy.getResolverGroup() == null) { + if (config.getSocketChannelClass() == EpollSocketChannel.class) { + copy.setResolverGroup(new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault())); + } else { + copy.setResolverGroup(new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault())); + } + hasOwnResolver = true; + } this.config = copy; this.executor = copy.getExecutor(); this.timer = copy.getTimer(); - addr = new InetSocketAddress(copy.getAddress().getHost(), copy.getAddress().getPort()); + addr = copy.getAddress(); channels = new DefaultChannelGroup(copy.getGroup().next()); bootstrap = createBootstrap(copy, Type.PLAIN); @@ -101,9 +116,9 @@ public class RedisClient { private Bootstrap createBootstrap(RedisClientConfig config, Type type) { Bootstrap bootstrap = new Bootstrap() + .resolver(config.getResolverGroup()) .channel(config.getSocketChannelClass()) - .group(config.getGroup()) - .remoteAddress(addr); + .group(config.getGroup()); bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()); @@ -112,92 +127,8 @@ public class RedisClient { return bootstrap; } - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(String address) { - this(URIBuilder.create(address)); - } - - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(URI address) { - this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address); - hasOwnGroup = true; - } - - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) { - this(timer, executor, group, address.getHost(), address.getPort()); - } - - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(String host, int port) { - this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000); - hasOwnGroup = true; - } - - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int port) { - this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000); - } - - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(String host, int port, int connectTimeout, int commandTimeout) { - this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout); - } - - /* - * Use {@link #create(RedisClientConfig)} - * - */ - @Deprecated - public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class socketChannelClass, String host, int port, - int connectTimeout, int commandTimeout) { - RedisClientConfig config = new RedisClientConfig(); - config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass) - .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout); - - this.config = config; - this.executor = config.getExecutor(); - this.timer = config.getTimer(); - - addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort()); - - channels = new DefaultChannelGroup(config.getGroup().next()); - bootstrap = createBootstrap(config, Type.PLAIN); - pubSubBootstrap = createBootstrap(config, Type.PUBSUB); - - this.commandTimeout = config.getCommandTimeout(); - } - - public String getIpAddr() { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - public InetSocketAddress getAddr() { - return addr; + return resolvedAddr; } public long getCommandTimeout() { @@ -219,42 +150,78 @@ public class RedisClient { throw new RedisConnectionException("Unable to connect to: " + addr, e); } } - + + public RFuture resolveAddr() { + final RPromise promise = new RedissonPromise(); + if (!resolveFuture.compareAndSet(null, promise)) { + return resolveFuture.get(); + } + + AddressResolver resolver = (AddressResolver) bootstrap.config().resolver().getResolver(bootstrap.config().group().next()); + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort())); + resolveFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + resolvedAddr = future.getNow(); + promise.trySuccess(future.getNow()); + } + }); + return promise; + } + public RFuture connectAsync() { final RPromise f = new RedissonPromise(); - ChannelFuture channelFuture = bootstrap.connect(); - channelFuture.addListener(new ChannelFutureListener() { + + RFuture addrFuture = resolveAddr(); + addrFuture.addListener(new FutureListener() { @Override - public void operationComplete(final ChannelFuture future) throws Exception { - if (future.isSuccess()) { - final RedisConnection c = RedisConnection.getFrom(future.channel()); - c.getConnectionPromise().addListener(new FutureListener() { - @Override - public void operationComplete(final Future future) throws Exception { - bootstrap.config().group().execute(new Runnable() { + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + f.tryFailure(future.cause()); + return; + } + + ChannelFuture channelFuture = bootstrap.connect(future.getNow()); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + if (future.isSuccess()) { + final RedisConnection c = RedisConnection.getFrom(future.channel()); + c.getConnectionPromise().addListener(new FutureListener() { @Override - public void run() { - if (future.isSuccess()) { - if (!f.trySuccess(c)) { - c.closeAsync(); + public void operationComplete(final Future future) throws Exception { + bootstrap.config().group().execute(new Runnable() { + @Override + public void run() { + if (future.isSuccess()) { + if (!f.trySuccess(c)) { + c.closeAsync(); + } + } else { + f.tryFailure(future.cause()); + c.closeAsync(); + } } - } else { - f.tryFailure(future.cause()); - c.closeAsync(); - } + }); + } + }); + } else { + bootstrap.config().group().execute(new Runnable() { + public void run() { + f.tryFailure(future.cause()); } }); } - }); - } else { - bootstrap.config().group().execute(new Runnable() { - public void run() { - f.tryFailure(future.cause()); - } - }); - } + } + }); } }); + return f; } @@ -268,39 +235,52 @@ public class RedisClient { public RFuture connectPubSubAsync() { final RPromise f = new RedissonPromise(); - ChannelFuture channelFuture = pubSubBootstrap.connect(); - channelFuture.addListener(new ChannelFutureListener() { + + RFuture nameFuture = resolveAddr(); + nameFuture.addListener(new FutureListener() { @Override - public void operationComplete(final ChannelFuture future) throws Exception { - if (future.isSuccess()) { - final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel()); - c.getConnectionPromise().addListener(new FutureListener() { - @Override - public void operationComplete(final Future future) throws Exception { - bootstrap.config().group().execute(new Runnable() { + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + f.tryFailure(future.cause()); + return; + } + + ChannelFuture channelFuture = pubSubBootstrap.connect(future.getNow()); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { + if (future.isSuccess()) { + final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel()); + c.getConnectionPromise().addListener(new FutureListener() { @Override - public void run() { - if (future.isSuccess()) { - if (!f.trySuccess(c)) { - c.closeAsync(); + public void operationComplete(final Future future) throws Exception { + pubSubBootstrap.config().group().execute(new Runnable() { + @Override + public void run() { + if (future.isSuccess()) { + if (!f.trySuccess(c)) { + c.closeAsync(); + } + } else { + f.tryFailure(future.cause()); + c.closeAsync(); + } } - } else { - f.tryFailure(future.cause()); - c.closeAsync(); - } + }); + } + }); + } else { + pubSubBootstrap.config().group().execute(new Runnable() { + public void run() { + f.tryFailure(future.cause()); } }); } - }); - } else { - bootstrap.config().group().execute(new Runnable() { - public void run() { - f.tryFailure(future.cause()); - } - }); - } + } + }); } }); + return f; } @@ -339,6 +319,9 @@ public class RedisClient { executor.awaitTermination(15, TimeUnit.SECONDS); } + if (hasOwnResolver) { + bootstrap.config().resolver().close(); + } if (hasOwnGroup) { bootstrap.config().group().shutdownGracefully(); } diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java index 7d864129b..88987e588 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java +++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java @@ -15,6 +15,7 @@ */ package org.redisson.client; +import java.net.InetAddress; import java.net.URI; import java.util.concurrent.ExecutorService; @@ -24,6 +25,7 @@ import org.redisson.misc.URIBuilder; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.util.Timer; /** @@ -34,10 +36,12 @@ import io.netty.util.Timer; public class RedisClientConfig { private URI address; + private InetAddress addr; private Timer timer; private ExecutorService executor; private EventLoopGroup group; + private DnsAddressResolverGroup resolverGroup; private Class socketChannelClass = NioSocketChannel.class; private int connectTimeout = 10000; private int commandTimeout = 10000; @@ -84,6 +88,7 @@ public class RedisClientConfig { this.sslTruststorePassword = config.sslTruststorePassword; this.sslKeystore = config.sslKeystore; this.sslKeystorePassword = config.sslKeystorePassword; + this.resolverGroup = config.resolverGroup; } public RedisClientConfig setAddress(String host, int port) { @@ -94,6 +99,11 @@ public class RedisClientConfig { this.address = URIBuilder.create(address); return this; } + public RedisClientConfig setAddress(InetAddress addr, URI address) { + this.addr = addr; + this.address = address; + return this; + } public RedisClientConfig setAddress(URI address) { this.address = address; return this; @@ -101,7 +111,9 @@ public class RedisClientConfig { public URI getAddress() { return address; } - + public InetAddress getAddr() { + return addr; + } public Timer getTimer() { return timer; @@ -262,5 +274,15 @@ public class RedisClientConfig { this.tcpNoDelay = tcpNoDelay; return this; } + + public DnsAddressResolverGroup getResolverGroup() { + return resolverGroup; + } + public RedisClientConfig setResolverGroup(DnsAddressResolverGroup resolverGroup) { + this.resolverGroup = resolverGroup; + return this; + } + + } diff --git a/redisson/src/main/java/org/redisson/client/RedisRedirectException.java b/redisson/src/main/java/org/redisson/client/RedisRedirectException.java index 5f9f0d872..28ce15433 100644 --- a/redisson/src/main/java/org/redisson/client/RedisRedirectException.java +++ b/redisson/src/main/java/org/redisson/client/RedisRedirectException.java @@ -15,8 +15,8 @@ */ package org.redisson.client; -import java.net.InetSocketAddress; import java.net.URI; + import org.redisson.misc.URIBuilder; /** @@ -44,8 +44,4 @@ public class RedisRedirectException extends RedisException { return url; } - public InetSocketAddress getAddr() { - return new InetSocketAddress(url.getHost(), url.getPort()); - } - } diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 7b22b479a..16087e9e6 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -15,6 +15,7 @@ */ package org.redisson.client.handler; +import java.net.SocketAddress; import java.util.Map.Entry; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -109,7 +110,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { log.debug("reconnecting {} to {} ", connection, connection.getRedisClient().getAddr(), connection); try { - bootstrap.connect().addListener(new ChannelFutureListener() { + bootstrap.connect(connection.getRedisClient().getAddr()).addListener(new ChannelFutureListener() { @Override public void operationComplete(final ChannelFuture future) throws Exception { diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java index 269b86e64..4af4ae217 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.List; import org.redisson.RedisClientResult; +import org.redisson.client.RedisClient; /** * @@ -30,7 +31,7 @@ public class ListScanResult implements RedisClientResult { private final Long pos; private final List values; - private InetSocketAddress addr; + private RedisClient client; public ListScanResult(Long pos, List values) { this.pos = pos; @@ -46,12 +47,12 @@ public class ListScanResult implements RedisClientResult { } @Override - public void setRedisClient(InetSocketAddress addr) { - this.addr = addr; + public void setRedisClient(RedisClient client) { + this.client = client; } - public InetSocketAddress getRedisClient() { - return addr; + public RedisClient getRedisClient() { + return client; } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java index d50c6b846..d27a969b5 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResult.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.Map; import org.redisson.RedisClientResult; +import org.redisson.client.RedisClient; /** * @@ -31,7 +32,7 @@ public class MapScanResult implements RedisClientResult { private final Long pos; private final Map values; - private InetSocketAddress client; + private RedisClient client; public MapScanResult(Long pos, Map values) { super(); @@ -48,11 +49,11 @@ public class MapScanResult implements RedisClientResult { } @Override - public void setRedisClient(InetSocketAddress client) { + public void setRedisClient(RedisClient client) { this.client = client; } - public InetSocketAddress getRedisClient() { + public RedisClient getRedisClient() { return client; } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index c4a5dd0ba..705c2df76 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -52,6 +52,7 @@ import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.SingleEntry; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +112,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List>>> futures = new ArrayList>>>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { - failedMasters.add(partition.getMasterAddr().toString()); + failedMasters.add(partition.getMasterAddress().toString()); continue; } RFuture>> masterFuture = addMasterEntry(partition, cfg); @@ -271,12 +272,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } - RFuture f = e.setupMasterEntry(config.getMasterAddress()); - final RPromise initFuture = newPromise(); + RFuture f = e.setupMasterEntry(config.getMasterAddress()); + final RPromise initFuture = new RedissonPromise(); futures.add(initFuture); - f.addListener(new FutureListener() { + f.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { log.error("Can't add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); initFuture.tryFailure(future.cause()); @@ -393,8 +394,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { masterFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - checkSlotsMigration(newPartitions, nodesValue.toString()); - checkSlotsChange(cfg, newPartitions, nodesValue.toString()); + checkSlotsMigration(newPartitions); + checkSlotsChange(cfg, newPartitions); getShutdownLatch().release(); scheduleClusterChangeCheck(cfg, null); } @@ -410,7 +411,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr()); + MasterSlaveEntry entry = getEntry(currentPart.getSlots().iterator().next()); // should be invoked first in order to remove stale failedSlaveAddresses Set addedSlaves = addRemoveSlaves(entry, currentPart, newPart); // Do some slaves have changed state from failed to alive? @@ -562,7 +563,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions, String nodes) { + private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions) { Collection newPartitionsSlots = slots(newPartitions); if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) { return; @@ -576,8 +577,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } for (Integer slot : removedSlots) { - MasterSlaveEntry entry = getEntry(slot); - removeMaster(slot); + MasterSlaveEntry entry = removeEntry(slot); if (entry.getSlotRanges().isEmpty()) { entry.shutdownMasterAsync(); log.info("{} master and slaves for it removed", entry.getClient().getAddr()); @@ -592,50 +592,56 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } for (final Integer slot : addedSlots) { ClusterPartition partition = find(newPartitions, slot); - MasterSlaveEntry entry = getEntry(partition.getMasterAddr()); - if (entry != null && entry.getClient().getAddr().equals(partition.getMasterAddr())) { + + Set oldSlots = new HashSet(partition.getSlots()); + oldSlots.removeAll(addedSlots); + if (oldSlots.isEmpty()) { + continue; + } + + MasterSlaveEntry entry = getEntry(oldSlots.iterator().next()); + if (entry != null) { addEntry(slot, entry); lastPartitions.put(slot, partition); - break; } } } - private void checkSlotsMigration(Collection newPartitions, String nodes) { + private void checkSlotsMigration(Collection newPartitions) { Set currentPartitions = getLastPartitions(); for (ClusterPartition currentPartition : currentPartitions) { for (ClusterPartition newPartition : newPartitions) { if (!currentPartition.getNodeId().equals(newPartition.getNodeId()) // skip master change case - || !currentPartition.getMasterAddr().equals(newPartition.getMasterAddr())) { + || !currentPartition.getMasterAddress().equals(newPartition.getMasterAddress())) { continue; } + MasterSlaveEntry entry = getEntry(currentPartition.getSlots().iterator().next()); Set addedSlots = new HashSet(newPartition.getSlots()); addedSlots.removeAll(currentPartition.getSlots()); currentPartition.addSlots(addedSlots); - MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); for (Integer slot : addedSlots) { addEntry(slot, entry); lastPartitions.put(slot, currentPartition); } if (!addedSlots.isEmpty()) { - log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddr()); + log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress()); } Set removedSlots = new HashSet(currentPartition.getSlots()); removedSlots.removeAll(newPartition.getSlots()); for (Integer removeSlot : removedSlots) { if (lastPartitions.remove(removeSlot, currentPartition)) { - removeMaster(removeSlot); + removeEntry(removeSlot); } } currentPartition.removeSlots(removedSlots); if (!removedSlots.isEmpty()) { - log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddr()); + log.info("{} slots removed from {}", removedSlots.size(), currentPartition.getMasterAddress()); } break; } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java b/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java index 92259ddc6..76d48df98 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -15,7 +15,6 @@ */ package org.redisson.cluster; -import java.net.InetSocketAddress; import java.net.URI; import java.util.Collections; import java.util.HashSet; @@ -106,10 +105,6 @@ public class ClusterPartition { return slots; } - public InetSocketAddress getMasterAddr() { - return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort()); - } - public URI getMasterAddress() { return masterAddress; } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index e36726524..832502023 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.SlotCallback; import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; @@ -56,11 +57,11 @@ public interface CommandAsyncExecutor { RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); - RFuture writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params); - - RFuture readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); + RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); + + RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object ... params); - RFuture readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); + RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object ... params); RFuture evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); @@ -68,20 +69,16 @@ public interface CommandAsyncExecutor { RFuture readAllAsync(RedisCommand command, SlotCallback callback, Object ... params); - RFuture evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + RFuture evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); RFuture evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); RFuture evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - RFuture evalReadAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); RFuture evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - RFuture evalWriteAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - RFuture readAsync(String key, Codec codec, RedisCommand command, Object ... params); RFuture writeAsync(String key, Codec codec, RedisCommand command, Object ... params); @@ -96,9 +93,6 @@ public interface CommandAsyncExecutor { RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); - RFuture readAsync(Integer slot, Codec codec, RedisCommand command, Object ... params); - RFuture readRandomAsync(RedisCommand command, Object ... params); - } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 699290e2b..750df9db9 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -15,7 +15,6 @@ */ package org.redisson.command; -import java.net.InetSocketAddress; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -38,6 +37,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisAskException; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisLoadingException; @@ -180,20 +180,27 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { + public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); return mainPromise; } - + @Override - public RFuture readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object... params) { + public RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); - int slot = connectionManager.calcSlot(key); + int slot = connectionManager.calcSlot(name); async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); return mainPromise; } + @Override + public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { + RPromise mainPromise = connectionManager.newPromise(); + async(true, new NodeSource(client), codec, command, params, mainPromise, 0); + return mainPromise; + } + @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { final RPromise> mainPromise = connectionManager.newPromise(); @@ -344,12 +351,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } - public RFuture readAsync(Integer slot, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(slot), codec, command, params, mainPromise, 0); - return mainPromise; - } - @Override public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); @@ -357,13 +358,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } - @Override - public RFuture writeAsync(Integer slot, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(false, new NodeSource(slot), codec, command, params, mainPromise, 0); - return mainPromise; - } - @Override public RFuture readAsync(String key, RedisCommand command, Object... params) { return readAsync(key, connectionManager.getCodec(), command, params); @@ -381,13 +375,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture evalReadAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params); - } - - @Override - public RFuture evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - int slot = connectionManager.calcSlot(key); + public RFuture evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { + int slot = connectionManager.calcSlot(name); return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params); } @@ -401,10 +390,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params); } - public RFuture evalWriteAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params); - } - @Override public RFuture evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object... params) { return evalAllAsync(false, command, callback, script, keys, params); @@ -805,7 +790,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException) future.cause(); - async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(), + async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); AsyncDetails.release(details); return; @@ -813,7 +798,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException) future.cause(); - async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(), + async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); AsyncDetails.release(details); return; @@ -844,11 +829,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (future.isSuccess()) { R res = future.getNow(); if (res instanceof RedisClientResult) { - InetSocketAddress addr = source.getAddr(); - if (addr == null) { - addr = details.getConnectionFuture().getNow().getRedisClient().getAddr(); - } - ((RedisClientResult) res).setRedisClient(addr); + ((RedisClientResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient()); } if (isRedissonReferenceSupportEnabled()) { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index ef89d87bc..38085376e 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -371,14 +371,14 @@ public class CommandBatchService extends CommandAsyncService { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); entry.clearErrors(); - NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED); + NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED); execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); entry.clearErrors(); - NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK); + NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK); execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval); return; } diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java index b5bbf36b9..b86b1e3ab 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -15,13 +15,13 @@ */ package org.redisson.command; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; import org.reactivestreams.Publisher; import org.redisson.SlotCallback; import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.MasterSlaveEntry; @@ -37,9 +37,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor { Publisher reactive(Supplier> supplier); - Publisher evalReadReactive(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, - String script, List keys, Object ... params); - Publisher evalWriteAllReactive(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params); Publisher> readAllReactive(RedisCommand command, Object ... params); @@ -52,7 +49,7 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor { Publisher writeAllReactive(RedisCommand command, SlotCallback callback, Object ... params); - Publisher readReactive(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); + Publisher readReactive(RedisClient client, String name, Codec codec, RedisCommand command, Object ... params); Publisher evalWriteReactive(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java index 70a712db0..abe339d44 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java @@ -15,13 +15,13 @@ */ package org.redisson.command; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; import org.reactivestreams.Publisher; import org.redisson.SlotCallback; import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -76,11 +76,11 @@ public class CommandReactiveService extends CommandAsyncService implements Comma } @Override - public Publisher readReactive(final InetSocketAddress client, final String key, final Codec codec, final RedisCommand command, final Object ... params) { + public Publisher readReactive(final RedisClient client, final String name, final Codec codec, final RedisCommand command, final Object ... params) { return reactive(new Supplier>() { @Override public RFuture get() { - return readAsync(client, key, codec, command, params); + return readAsync(client, name, codec, command, params); }; }); } @@ -136,18 +136,6 @@ public class CommandReactiveService extends CommandAsyncService implements Comma }); } - @Override - public Publisher evalReadReactive(final InetSocketAddress client, final String key, final Codec codec, final RedisCommand evalCommandType, - final String script, final List keys, final Object ... params) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return evalReadAsync(client, key, codec, evalCommandType, script, keys, params); - }; - }); - } - - @Override public Publisher evalWriteReactive(final String key, final Codec codec, final RedisCommand evalCommandType, final String script, final List keys, final Object... params) { diff --git a/redisson/src/main/java/org/redisson/command/CommandSyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandSyncExecutor.java index 104171b52..7fd403b7e 100644 --- a/redisson/src/main/java/org/redisson/command/CommandSyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandSyncExecutor.java @@ -32,8 +32,6 @@ public interface CommandSyncExecutor { V get(RFuture future); - R write(Integer slot, Codec codec, RedisCommand command, Object ... params); - R write(String key, Codec codec, RedisCommand command, Object ... params); R write(String key, RedisCommand command, Object ... params); @@ -46,10 +44,6 @@ public interface CommandSyncExecutor { R evalRead(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - R read(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); - - R read(InetSocketAddress client, String key, RedisCommand command, Object ... params); - R evalWrite(String key, RedisCommand evalCommandType, String script, List keys, Object ... params); R evalWrite(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandSyncService.java b/redisson/src/main/java/org/redisson/command/CommandSyncService.java index 8a05f4821..de6399a72 100644 --- a/redisson/src/main/java/org/redisson/command/CommandSyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandSyncService.java @@ -49,18 +49,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx return get(res); } - @Override - public R read(InetSocketAddress client, String key, RedisCommand command, Object ... params) { - RFuture res = readAsync(client, key, connectionManager.getCodec(), command, params); - return get(res); - } - - @Override - public R read(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { - RFuture res = readAsync(client, key, codec, command, params); - return get(res); - } - @Override public R evalRead(String key, RedisCommand evalCommandType, String script, List keys, Object ... params) { return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params); @@ -83,12 +71,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx return get(res); } - @Override - public R write(Integer slot, Codec codec, RedisCommand command, Object ... params) { - RFuture res = writeAsync(slot, codec, command, params); - return get(res); - } - @Override public R write(String key, Codec codec, RedisCommand command, Object ... params) { RFuture res = writeAsync(key, codec, command, params); diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionEventsHub.java b/redisson/src/main/java/org/redisson/connection/ConnectionEventsHub.java index 7b3c44b9e..11594c881 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionEventsHub.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionEventsHub.java @@ -53,7 +53,7 @@ public class ConnectionEventsHub { } public void fireDisconnect(InetSocketAddress addr) { - if (maps.get(addr) == Status.DISCONNECTED) { + if (addr == null || maps.get(addr) == Status.DISCONNECTED) { return; } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index b89edb115..a6cc33447 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -102,8 +102,8 @@ public interface ConnectionManager { RedisClient createClient(NodeType type, URI address); - MasterSlaveEntry getEntry(InetSocketAddress addr); - + MasterSlaveEntry getEntry(RedisClient redisClient); + PubSubConnectionEntry getPubSubEntry(String channelName); RFuture psubscribe(String pattern, Codec codec, RedisPubSubListener... listeners); diff --git a/redisson/src/main/java/org/redisson/connection/CountListener.java b/redisson/src/main/java/org/redisson/connection/CountableListener.java similarity index 55% rename from redisson/src/main/java/org/redisson/connection/CountListener.java rename to redisson/src/main/java/org/redisson/connection/CountableListener.java index ec449beb3..d653d9a4b 100644 --- a/redisson/src/main/java/org/redisson/connection/CountListener.java +++ b/redisson/src/main/java/org/redisson/connection/CountableListener.java @@ -17,9 +17,7 @@ package org.redisson.connection; import java.util.concurrent.atomic.AtomicInteger; -import org.redisson.api.RFuture; import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -29,35 +27,31 @@ import io.netty.util.concurrent.FutureListener; * @author Nikita Koksharov * */ -public class CountListener implements FutureListener { +public class CountableListener implements FutureListener { - private final RPromise res; + protected final AtomicInteger counter = new AtomicInteger(); + protected final RPromise result; + protected final T value; - private final AtomicInteger counter; - - public static RPromise create(RFuture... futures) { - RPromise result = new RedissonPromise(); - FutureListener listener = new CountListener(result, futures.length); - for (RFuture future : futures) { - future.addListener(listener); - } - return result; + public CountableListener(RPromise result, T value) { + super(); + this.result = result; + this.value = value; } - public CountListener(RPromise res, int amount) { - super(); - this.res = res; - this.counter = new AtomicInteger(amount); + public void incCounter() { + counter.incrementAndGet(); } - + @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - res.tryFailure(future.cause()); + result.tryFailure(future.cause()); return; } + if (counter.decrementAndGet() == 0) { - res.trySuccess(null); + result.trySuccess(value); } } diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index 7a57a1684..ef7176e28 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -18,23 +18,20 @@ package org.redisson.connection; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.redisson.api.RFuture; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.epoll.EpollDatagramChannel; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.resolver.AddressResolver; -import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider; import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -50,33 +47,20 @@ public class DNSMonitor { private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class); - private final DnsAddressResolverGroup resolverGroup; - - private ScheduledFuture dnsMonitorFuture; - - private ConnectionManager connectionManager; - + private final AddressResolver resolver; + private final ConnectionManager connectionManager; private final Map masters = new HashMap(); private final Map slaves = new HashMap(); + private ScheduledFuture dnsMonitorFuture; private long dnsMonitoringInterval; - public DNSMonitor(ConnectionManager connectionManager, Set masterHosts, Set slaveHosts, long dnsMonitoringInterval) { - Class channelClass; - if (connectionManager.getCfg().isUseLinuxNativeEpoll()) { - channelClass = EpollDatagramChannel.class; - } else { - channelClass = NioDatagramChannel.class; - } + public DNSMonitor(ConnectionManager connectionManager, InetSocketAddress masterHost, Collection slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) { + this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next()); - resolverGroup = new DnsAddressResolverGroup(channelClass, DefaultDnsServerAddressStreamProvider.INSTANCE); + URI uri = URIBuilder.create("redis://" + masterHost.getAddress().getHostAddress() + ":" + masterHost.getPort()); + masters.put(uri, masterHost.getAddress()); - AddressResolver resolver = resolverGroup.getResolver(connectionManager.getGroup().next()); - for (URI host : masterHosts) { - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0)); - resolveFuture.syncUninterruptibly(); - masters.put(host, resolveFuture.getNow().getAddress()); - } for (URI host : slaveHosts) { Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0)); resolveFuture.syncUninterruptibly(); @@ -101,7 +85,6 @@ public class DNSMonitor { dnsMonitorFuture = connectionManager.getGroup().schedule(new Runnable() { @Override public void run() { - final AddressResolver resolver = resolverGroup.getResolver(connectionManager.getGroup().next()); final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); for (final Entry entry : masters.entrySet()) { Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0)); @@ -120,15 +103,15 @@ public class DNSMonitor { InetAddress master = entry.getValue(); InetAddress now = future.get().getAddress(); if (!now.getHostAddress().equals(master.getHostAddress())) { - log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress()); + log.info("Detected DNS change. Master {} has changed ip from {} to {}", entry.getKey(), master.getHostAddress(), now.getHostAddress()); for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) { if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost()) && entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) { entrySet.changeMaster(entry.getKey()); + break; } } masters.put(entry.getKey(), now); - log.info("Master {} has been changed", entry.getKey().getHost()); } } }); @@ -149,17 +132,29 @@ public class DNSMonitor { } InetAddress slave = entry.getValue(); - InetAddress updatedSlave = future.get().getAddress(); + final InetAddress updatedSlave = future.get().getAddress(); if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) { - log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress()); - for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { - URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort()); - if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) { - masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER); + log.info("Detected DNS change. Slave {} has changed ip from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress()); + for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { + final URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort()); + + if (masterSlaveEntry.hasSlave(uri)) { + RFuture addFuture = masterSlaveEntry.addSlave(entry.getKey()); + addFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't add slave: " + updatedSlave, future.cause()); + return; + } + + masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER); + } + }); + break; } } slaves.put(entry.getKey(), updatedSlave); - log.info("Slave {} has been changed", entry.getKey().getHost()); } } }); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 3538107eb..47308e9e3 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -54,16 +54,21 @@ import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.misc.TransferListener; +import org.redisson.misc.URIBuilder; import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.resolver.dns.DnsAddressResolverGroup; +import io.netty.resolver.dns.DnsServerAddressStreamProviders; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -117,9 +122,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected Codec codec; - protected EventLoopGroup group; + protected final EventLoopGroup group; - protected Class socketChannelClass; + protected final Class socketChannelClass; protected final ConcurrentMap name2PubSubConnection = PlatformDependent.newConcurrentHashMap(); @@ -130,7 +135,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveServersConfig config; private final AtomicReferenceArray slot2entry = new AtomicReferenceArray(MAX_SLOT); - private final Map addr2entry = PlatformDependent.newConcurrentHashMap(); + private final Map client2entry = PlatformDependent.newConcurrentHashMap(); private final RPromise shutdownPromise; @@ -151,6 +156,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final CommandSyncService commandExecutor; private final Config cfg; + + private final DnsAddressResolverGroup resolverGroup; { for (int i = 0; i < locks.length; i++) { @@ -176,6 +183,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } this.socketChannelClass = EpollSocketChannel.class; + this.resolverGroup = new DnsAddressResolverGroup(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); } else { if (cfg.getEventLoopGroup() == null) { this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); @@ -184,14 +192,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } this.socketChannelClass = NioSocketChannel.class; -// if (cfg.getEventLoopGroup() == null) { -// this.group = new OioEventLoopGroup(cfg.getThreads()); -// } else { -// this.group = cfg.getEventLoopGroup(); -// } -// -// this.socketChannelClass = OioSocketChannel.class; + this.resolverGroup = new DnsAddressResolverGroup(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); } + if (cfg.getExecutor() == null) { int threads = Runtime.getRuntime().availableProcessors() * 2; if (cfg.getThreads() != 0) { @@ -236,7 +239,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Collection getEntrySet() { - return addr2entry.values(); + return client2entry.values(); } protected void initTimer(MasterSlaveServersConfig config) { @@ -273,19 +276,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager { MasterSlaveEntry entry; if (config.checkSkipSlavesInit()) { entry = new SingleEntry(slots, this, config); - RFuture f = entry.setupMasterEntry(config.getMasterAddress()); - f.syncUninterruptibly(); } else { entry = createMasterSlaveEntry(config, slots); } + RFuture f = entry.setupMasterEntry(config.getMasterAddress()); + f.syncUninterruptibly(); for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { addEntry(slot, entry); } + InetSocketAddress masterHost = f.getNow().resolveAddr().syncUninterruptibly().getNow(); if (config.getDnsMonitoringInterval() != -1) { - dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()), - config.getSlaveAddresses(), config.getDnsMonitoringInterval()); + dnsMonitor = new DNSMonitor(this, masterHost, + config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); dnsMonitor.start(); } } catch (RuntimeException e) { @@ -301,8 +305,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { for (RFuture future : fs) { future.syncUninterruptibly(); } - RFuture f = entry.setupMasterEntry(config.getMasterAddress()); - f.syncUninterruptibly(); return entry; } @@ -367,6 +369,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { redisConfig.setAddress(address) .setTimer(timer) .setExecutor(executor) + .setResolverGroup(resolverGroup) .setGroup(group) .setSocketChannelClass(socketChannelClass) .setConnectTimeout(timeout) @@ -674,11 +677,32 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return newSucceededFuture(entryCodec); } - @Override - public MasterSlaveEntry getEntry(InetSocketAddress addr) { - return addr2entry.get(addr); + public MasterSlaveEntry getEntry(URI addr) { + for (MasterSlaveEntry entry : client2entry.values()) { + if (URIBuilder.compare(entry.getClient().getAddr(), addr)) { + return entry; + } + if (entry.hasSlave(addr)) { + return entry; + } + } + return null; } - + + public MasterSlaveEntry getEntry(RedisClient redisClient) { + MasterSlaveEntry entry = client2entry.get(redisClient); + if (entry != null) { + return entry; + } + + for (MasterSlaveEntry mentry : client2entry.values()) { + if (mentry.hasSlave(redisClient)) { + return mentry; + } + } + return null; + } + @Override public MasterSlaveEntry getEntry(int slot) { return slot2entry.get(slot); @@ -686,22 +710,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final void changeMaster(int slot, URI address) { MasterSlaveEntry entry = getEntry(slot); - addr2entry.remove(entry.getClient().getAddr()); + client2entry.remove(entry.getClient()); entry.changeMaster(address); - addr2entry.put(entry.getClient().getAddr(), entry); + client2entry.put(entry.getClient(), entry); } protected final void addEntry(Integer slot, MasterSlaveEntry entry) { slot2entry.set(slot, entry); entry.addSlotRange(slot); - addr2entry.put(entry.getClient().getAddr(), entry); + client2entry.put(entry.getClient(), entry); } - protected MasterSlaveEntry removeMaster(Integer slot) { + protected final MasterSlaveEntry removeEntry(Integer slot) { MasterSlaveEntry entry = slot2entry.getAndSet(slot, null); entry.removeSlotRange(slot); if (entry.getSlotRanges().isEmpty()) { - addr2entry.remove(entry.getClient().getAddr()); + client2entry.remove(entry.getClient()); } return entry; } @@ -734,6 +758,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (entry == null && source.getSlot() != null) { entry = getEntry(source.getSlot()); } + if (source.getRedisClient() != null) { + entry = getEntry(source.getRedisClient()); + } if (source.getAddr() != null) { entry = getEntry(source.getAddr()); if (entry == null) { @@ -836,6 +863,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } + resolverGroup.close(); + if (cfg.getEventLoopGroup() == null) { group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly(); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 0555a103a..68a52c069 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -42,6 +42,9 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.redisson.misc.TransferListener; import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +69,10 @@ public class MasterSlaveEntry { final MasterSlaveServersConfig config; final ConnectionManager connectionManager; - final MasterConnectionPool writeConnectionHolder; + final MasterConnectionPool writeConnectionPool; final Set slots = new HashSet(); - final MasterPubSubConnectionPool pubSubConnectionHolder; + final MasterPubSubConnectionPool pubSubConnectionPool; final AtomicBoolean active = new AtomicBoolean(true); @@ -83,8 +86,8 @@ public class MasterSlaveEntry { this.config = config; slaveBalancer = new LoadBalancerManager(config, connectionManager, this); - writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); - pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this); + writeConnectionPool = new MasterConnectionPool(config, connectionManager, this); + pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this); } public MasterSlaveServersConfig getConfig() { @@ -106,7 +109,7 @@ public class MasterSlaveEntry { return result; } - public RFuture setupMasterEntry(URI address) { + public RPromise setupMasterEntry(URI address) { RedisClient client = connectionManager.createClient(NodeType.MASTER, address); masterEntry = new ClientConnectionsEntry( client, @@ -117,12 +120,23 @@ public class MasterSlaveEntry { connectionManager, NodeType.MASTER); + RPromise result = new RedissonPromise(); + CountableListener listener = new CountableListener(result, client); + RFuture addrFuture = client.resolveAddr(); + listener.incCounter(); + addrFuture.addListener(listener); + + RFuture writeFuture = writeConnectionPool.add(masterEntry); + listener.incCounter(); + writeFuture.addListener(listener); + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture f = writeConnectionHolder.add(masterEntry); - RFuture s = pubSubConnectionHolder.add(masterEntry); - return CountListener.create(s, f); + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + listener.incCounter(); + pubSubFuture.addListener(listener); } - return writeConnectionHolder.add(masterEntry); + + return result; } public boolean slaveDown(URI address, FreezeReason freezeReason) { @@ -134,6 +148,15 @@ public class MasterSlaveEntry { return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); } + public boolean slaveDown(RedisClient redisClient, FreezeReason freezeReason) { + ClientConnectionsEntry entry = slaveBalancer.freeze(redisClient, freezeReason); + if (entry == null) { + return false; + } + + return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); + } + private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { @@ -201,6 +224,7 @@ public class MasterSlaveEntry { return; } + System.out.println("channelName " + channelName + " resubscribed!"); subscribeCodec.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -308,11 +332,11 @@ public class MasterSlaveEntry { }); } - public boolean hasSlave(InetSocketAddress addr) { - return slaveBalancer.contains(addr); + public boolean hasSlave(RedisClient redisClient) { + return slaveBalancer.contains(redisClient); } - public boolean hasSlave(String addr) { + public boolean hasSlave(URI addr) { return slaveBalancer.contains(addr); } @@ -322,7 +346,7 @@ public class MasterSlaveEntry { private RFuture addSlave(URI address, boolean freezed, NodeType nodeType) { RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); - ClientConnectionsEntry entry = new ClientConnectionsEntry(client, + final ClientConnectionsEntry entry = new ClientConnectionsEntry(client, this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionPoolSize(), this.config.getSubscriptionConnectionMinimumIdleSize(), @@ -333,7 +357,22 @@ public class MasterSlaveEntry { entry.setFreezeReason(FreezeReason.SYSTEM); } } - return slaveBalancer.add(entry); + + final RPromise result = new RedissonPromise(); + RFuture addrFuture = client.resolveAddr(); + addrFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + RFuture addFuture = slaveBalancer.add(entry); + addFuture.addListener(new TransferListener(result)); + } + }); + return result; } public RedisClient getClient() { @@ -345,11 +384,10 @@ public class MasterSlaveEntry { return false; } - InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort()); InetSocketAddress addr = masterEntry.getClient().getAddr(); // exclude master from slaves if (!config.checkSkipSlavesInit() - && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { + && !URIBuilder.compare(addr, address)) { slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM); log.info("master {} excluded from slaves", addr); } @@ -365,27 +403,33 @@ public class MasterSlaveEntry { */ public void changeMaster(final URI address) { final ClientConnectionsEntry oldMaster = masterEntry; - RFuture future = setupMasterEntry(address); - future.addListener(new FutureListener() { + RFuture future = setupMasterEntry(address); + future.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { - writeConnectionHolder.remove(oldMaster); - pubSubConnectionHolder.remove(oldMaster); + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't change master to: {}", address); + return; + } + + RedisClient newMasterClient = future.getNow(); + + writeConnectionPool.remove(oldMaster); + pubSubConnectionPool.remove(oldMaster); oldMaster.freezeMaster(FreezeReason.MANAGER); slaveDown(oldMaster, false); - slaveDown(URIBuilder.create("redis://" + oldMaster.getClient().getIpAddr()), FreezeReason.MANAGER); - slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE); - slaveBalancer.changeType(address, NodeType.MASTER); + slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE); + slaveBalancer.changeType(newMasterClient, NodeType.MASTER); // more than one slave available, so master can be removed from slaves if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() > 1) { - slaveDown(address, FreezeReason.SYSTEM); + slaveDown(newMasterClient, FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); - log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), address); + log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr()); } }); } @@ -420,7 +464,7 @@ public class MasterSlaveEntry { } public RFuture connectionWriteOp(RedisCommand command) { - return writeConnectionHolder.get(command); + return writeConnectionPool.get(command); } public RFuture connectionReadOp(RedisCommand command) { @@ -430,7 +474,7 @@ public class MasterSlaveEntry { return slaveBalancer.nextConnection(command); } - public RFuture connectionReadOp(RedisCommand command, InetSocketAddress addr) { + public RFuture connectionReadOp(RedisCommand command, URI addr) { if (config.getReadMode() == ReadMode.MASTER) { return connectionWriteOp(command); } @@ -439,7 +483,7 @@ public class MasterSlaveEntry { RFuture nextPubSubConnection() { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - return pubSubConnectionHolder.get(); + return pubSubConnectionPool.get(); } return slaveBalancer.nextPubSubConnection(); @@ -447,14 +491,14 @@ public class MasterSlaveEntry { public void returnPubSubConnection(PubSubConnectionEntry entry) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); + pubSubConnectionPool.returnConnection(masterEntry, entry.getConnection()); return; } slaveBalancer.returnPubSubConnection(entry.getConnection()); } public void releaseWrite(RedisConnection connection) { - writeConnectionHolder.returnConnection(masterEntry, connection); + writeConnectionPool.returnConnection(masterEntry, connection); } public void releaseRead(RedisConnection connection) { diff --git a/redisson/src/main/java/org/redisson/connection/NodeSource.java b/redisson/src/main/java/org/redisson/connection/NodeSource.java index 3fa173650..f7953e77c 100644 --- a/redisson/src/main/java/org/redisson/connection/NodeSource.java +++ b/redisson/src/main/java/org/redisson/connection/NodeSource.java @@ -15,38 +15,44 @@ */ package org.redisson.connection; -import java.net.InetSocketAddress; +import java.net.URI; -public class NodeSource { +import org.redisson.client.RedisClient; - public static final NodeSource ZERO = new NodeSource(0); +/** + * + * @author Nikita Koksharov + * + */ +public class NodeSource { public enum Redirect {MOVED, ASK} - private final Integer slot; - private final InetSocketAddress addr; - private final Redirect redirect; + private Integer slot; + private URI addr; + private RedisClient redisClient; + private Redirect redirect; private MasterSlaveEntry entry; public NodeSource(MasterSlaveEntry entry) { - this(null, null, null); this.entry = entry; } - public NodeSource(MasterSlaveEntry entry, InetSocketAddress addr) { - this(null, addr, null); + public NodeSource(MasterSlaveEntry entry, RedisClient redisClient) { this.entry = entry; + this.redisClient = redisClient; } - public NodeSource(Integer slot) { - this(slot, null, null); + public NodeSource(RedisClient redisClient) { + this.redisClient = redisClient; } - - public NodeSource(Integer slot, InetSocketAddress addr) { - this(slot, addr, null); + + public NodeSource(Integer slot, RedisClient redisClient) { + this.slot = slot; + this.redisClient = redisClient; } - - public NodeSource(Integer slot, InetSocketAddress addr, Redirect redirect) { + + public NodeSource(Integer slot, URI addr, Redirect redirect) { this.slot = slot; this.addr = addr; this.redirect = redirect; @@ -64,7 +70,11 @@ public class NodeSource { return slot; } - public InetSocketAddress getAddr() { + public RedisClient getRedisClient() { + return redisClient; + } + + public URI getAddr() { return addr; } diff --git a/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java b/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java index 5d92c82ae..e00f44118 100644 --- a/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java +++ b/redisson/src/main/java/org/redisson/connection/RedisClientEntry.java @@ -59,7 +59,7 @@ public class RedisClientEntry implements ClusterNode { } public RFuture pingAsync() { - return commandExecutor.readAsync(client.getAddr(), (String)null, null, RedisCommands.PING_BOOL); + return commandExecutor.readAsync(client, null, RedisCommands.PING_BOOL); } @Override @@ -94,7 +94,7 @@ public class RedisClientEntry implements ClusterNode { @Override public RFuture timeAsync() { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.TIME); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.TIME); } @Override @@ -104,7 +104,7 @@ public class RedisClientEntry implements ClusterNode { @Override public RFuture> clusterInfoAsync() { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.CLUSTER_INFO); } @Override @@ -120,29 +120,29 @@ public class RedisClientEntry implements ClusterNode { @Override public RFuture> infoAsync(InfoSection section) { if (section == InfoSection.ALL) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_ALL); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_ALL); } else if (section == InfoSection.DEFAULT) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_DEFAULT); } else if (section == InfoSection.SERVER) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_SERVER); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_SERVER); } else if (section == InfoSection.CLIENTS) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CLIENTS); } else if (section == InfoSection.MEMORY) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_MEMORY); } else if (section == InfoSection.PERSISTENCE) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_PERSISTENCE); } else if (section == InfoSection.STATS) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_STATS); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_STATS); } else if (section == InfoSection.REPLICATION) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_REPLICATION); } else if (section == InfoSection.CPU) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CPU); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CPU); } else if (section == InfoSection.COMMANDSTATS) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_COMMANDSTATS); } else if (section == InfoSection.CLUSTER) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_CLUSTER); } else if (section == InfoSection.KEYSPACE) { - return commandExecutor.readAsync(client.getAddr(), (String)null, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE); + return commandExecutor.readAsync(client, StringCodec.INSTANCE, RedisCommands.INFO_KEYSPACE); } throw new IllegalStateException(); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index d630723ca..97d0e08f5 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -15,7 +15,6 @@ */ package org.redisson.connection; -import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; @@ -154,8 +153,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { for (RFuture future : fs) { future.syncUninterruptibly(); } - RFuture f = entry.setupMasterEntry(config.getMasterAddress()); - f.syncUninterruptibly(); return entry; } @@ -355,9 +352,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; + URI uri = convert(ip, port); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); if (entry.isFreezed() - && entry.getClient().getAddr().equals(new InetSocketAddress(ip, Integer.valueOf(port)))) { + && URIBuilder.compare(entry.getClient().getAddr(), uri)) { entry.unfreeze(); String masterAddr = ip + ":" + port; log.info("master: {} has up", masterAddr); diff --git a/redisson/src/main/java/org/redisson/connection/SingleEntry.java b/redisson/src/main/java/org/redisson/connection/SingleEntry.java index 5fffe82f6..4bf33e4a8 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleEntry.java +++ b/redisson/src/main/java/org/redisson/connection/SingleEntry.java @@ -15,7 +15,7 @@ */ package org.redisson.connection; -import java.net.InetSocketAddress; +import java.net.URI; import java.util.Set; import org.redisson.api.RFuture; @@ -36,7 +36,7 @@ public class SingleEntry extends MasterSlaveEntry { } @Override - public RFuture connectionReadOp(RedisCommand command, InetSocketAddress addr) { + public RFuture connectionReadOp(RedisCommand command, URI addr) { return super.connectionWriteOp(command); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 07cb78526..3fafa2014 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -18,10 +18,10 @@ package org.redisson.connection.balancer; import java.net.InetSocketAddress; import java.net.URI; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.NodeType; import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; @@ -31,15 +31,16 @@ import org.redisson.config.ReadMode; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ConnectionManager; +import org.redisson.connection.CountableListener; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.pool.PubSubConnectionPool; import org.redisson.connection.pool.SlaveConnectionPool; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; +import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; /** @@ -55,7 +56,7 @@ public class LoadBalancerManager { private final PubSubConnectionPool pubSubConnectionPool; private final SlaveConnectionPool slaveConnectionPool; - private final Map ip2Entry = PlatformDependent.newConcurrentHashMap(); + private final Map client2Entry = PlatformDependent.newConcurrentHashMap(); public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { this.connectionManager = connectionManager; @@ -63,12 +64,12 @@ public class LoadBalancerManager { pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry); } - public void changeType(InetSocketAddress addr, NodeType nodeType) { - ClientConnectionsEntry entry = ip2Entry.get(convert(addr)); - changeType(addr, nodeType, entry); + public void changeType(RedisClient redisClient, NodeType nodeType) { + ClientConnectionsEntry entry = getEntry(redisClient); + changeType(nodeType, entry); } - protected void changeType(Object addr, NodeType nodeType, ClientConnectionsEntry entry) { + protected void changeType(NodeType nodeType, ClientConnectionsEntry entry) { if (entry != null) { if (connectionManager.isClusterMode()) { entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER); @@ -79,37 +80,34 @@ public class LoadBalancerManager { public void changeType(URI address, NodeType nodeType) { ClientConnectionsEntry entry = getEntry(address); - changeType(address, nodeType, entry); + changeType(nodeType, entry); } public RFuture add(final ClientConnectionsEntry entry) { - final RPromise result = connectionManager.newPromise(); - FutureListener listener = new FutureListener() { - AtomicInteger counter = new AtomicInteger(2); - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; + RPromise result = new RedissonPromise(); + + CountableListener listener = new CountableListener(result, null) { + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + super.operationComplete(future); + if (this.result.isSuccess()) { + client2Entry.put(entry.getClient(), entry); } - if (counter.decrementAndGet() == 0) { - String addr = entry.getClient().getIpAddr(); - ip2Entry.put(addr, entry); - result.trySuccess(null); - } - } + }; }; RFuture slaveFuture = slaveConnectionPool.add(entry); + listener.incCounter(); slaveFuture.addListener(listener); + RFuture pubSubFuture = pubSubConnectionPool.add(entry); + listener.incCounter(); pubSubFuture.addListener(listener); return result; } public int getAvailableClients() { int count = 0; - for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) { + for (ClientConnectionsEntry connectionEntry : client2Entry.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -139,19 +137,14 @@ public class LoadBalancerManager { return false; } - private String convert(URI address) { - InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort()); - return convert(addr); - } - public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { ClientConnectionsEntry connectionEntry = getEntry(address); return freeze(connectionEntry, freezeReason); } - - private ClientConnectionsEntry getEntry(URI address) { - String addr = convert(address); - return ip2Entry.get(addr); + + public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) { + ClientConnectionsEntry connectionEntry = getEntry(redisClient); + return freeze(connectionEntry, freezeReason); } public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { @@ -179,20 +172,34 @@ public class LoadBalancerManager { return pubSubConnectionPool.get(); } - public boolean contains(InetSocketAddress addr) { - return ip2Entry.containsKey(convert(addr)); + public boolean contains(URI addr) { + return getEntry(addr) != null; + } + + public boolean contains(RedisClient redisClient) { + return getEntry(redisClient) != null; } - protected String convert(InetSocketAddress addr) { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + protected ClientConnectionsEntry getEntry(URI addr) { + for (ClientConnectionsEntry entry : client2Entry.values()) { + InetSocketAddress entryAddr = entry.getClient().getAddr(); + if (URIBuilder.compare(entryAddr, addr)) { + return entry; + } + } + return null; } - public boolean contains(String addr) { - return ip2Entry.containsKey(addr); + protected ClientConnectionsEntry getEntry(RedisClient redisClient) { + return client2Entry.get(redisClient); + } + + protected String convert(InetSocketAddress addr) { + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); } - public RFuture getConnection(RedisCommand command, InetSocketAddress addr) { - ClientConnectionsEntry entry = ip2Entry.get(convert(addr)); + public RFuture getConnection(RedisCommand command, URI addr) { + ClientConnectionsEntry entry = getEntry(addr); if (entry != null) { return slaveConnectionPool.get(command, entry); } @@ -205,23 +212,23 @@ public class LoadBalancerManager { } public void returnPubSubConnection(RedisPubSubConnection connection) { - ClientConnectionsEntry entry = ip2Entry.get(convert(connection.getRedisClient().getAddr())); + ClientConnectionsEntry entry = getEntry(connection.getRedisClient()); pubSubConnectionPool.returnConnection(entry, connection); } public void returnConnection(RedisConnection connection) { - ClientConnectionsEntry entry = ip2Entry.get(convert(connection.getRedisClient().getAddr())); + ClientConnectionsEntry entry = getEntry(connection.getRedisClient()); slaveConnectionPool.returnConnection(entry, connection); } public void shutdown() { - for (ClientConnectionsEntry entry : ip2Entry.values()) { + for (ClientConnectionsEntry entry : client2Entry.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (ClientConnectionsEntry entry : ip2Entry.values()) { + for (ClientConnectionsEntry entry : client2Entry.values()) { connectionManager.shutdownAsync(entry.getClient()); } } diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index a1fb785d9..768e2f754 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -15,7 +15,6 @@ */ package org.redisson.jcache; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,6 +57,7 @@ import org.redisson.api.RLock; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; @@ -2082,7 +2082,7 @@ public class JCache extends RedissonObject implements Cache { cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime); } - MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { + MapScanResult scanIterator(String name, RedisClient client, long startPos) { RFuture> f = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); try { diff --git a/redisson/src/main/java/org/redisson/misc/URIBuilder.java b/redisson/src/main/java/org/redisson/misc/URIBuilder.java index 1d952b476..ab9610898 100644 --- a/redisson/src/main/java/org/redisson/misc/URIBuilder.java +++ b/redisson/src/main/java/org/redisson/misc/URIBuilder.java @@ -15,6 +15,7 @@ */ package org.redisson.misc; +import java.net.InetSocketAddress; import java.net.URI; /** @@ -37,4 +38,13 @@ public class URIBuilder { return URI.create(uri.replace(s, "[" + s + "]")); } + public static boolean compare(InetSocketAddress entryAddr, URI addr) { + if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost())) + || entryAddr.getAddress().getHostAddress().equals(addr.getHost())) + && entryAddr.getPort() == addr.getPort()) { + return true; + } + return false; + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/ReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/ReactiveIterator.java index 768d80c01..9a041523a 100644 --- a/redisson/src/main/java/org/redisson/reactive/ReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/ReactiveIterator.java @@ -18,6 +18,7 @@ package org.redisson.reactive; import java.net.InetSocketAddress; import org.reactivestreams.Publisher; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -30,7 +31,7 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry; */ interface MapReactive { - Publisher> scanIteratorReactive(InetSocketAddress client, long startPos); + Publisher> scanIteratorReactive(RedisClient client, long startPos); Publisher put(K key, V value); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 540865835..f74aed938 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; import java.util.Map.Entry; @@ -29,6 +28,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RMapCacheAsync; import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapReactive; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -164,7 +164,7 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im } @Override - public Publisher> scanIteratorReactive(final InetSocketAddress client, final long startPos) { + public Publisher> scanIteratorReactive(final RedisClient client, final long startPos) { return reactive(new Supplier>>() { @Override public RFuture> get() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index e24738a40..694fce885 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -27,6 +27,7 @@ import org.redisson.api.MapOptions; import org.redisson.api.RFuture; import org.redisson.api.RMapAsync; import org.redisson.api.RMapReactive; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommands; @@ -284,7 +285,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme }); } - public Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + public Publisher> scanIteratorReactive(RedisClient client, long startPos) { return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java index a2199ed74..9904a733c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import java.net.InetSocketAddress; import java.util.AbstractMap; import java.util.HashMap; import java.util.Map; @@ -24,6 +23,7 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -56,7 +56,7 @@ public class RedissonMapReactiveIterator { private Map firstValues; private long iterPos = 0; - private InetSocketAddress client; + private RedisClient client; private long currentIndex; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 1c6ea272d..5a5c89864 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -25,6 +25,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScoredSortedSetReactive; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; @@ -175,7 +176,7 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv }); } - private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + private Publisher> scanIteratorReactive(RedisClient client, long startPos) { return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); } @@ -183,7 +184,7 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index c37173ef6..7102d205e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -26,6 +26,7 @@ import org.reactivestreams.Publisher; import org.redisson.RedissonSetCache; import org.redisson.api.RFuture; import org.redisson.api.RSetCacheReactive; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; @@ -85,7 +86,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple }); } - Publisher> scanIterator(final InetSocketAddress client, final long startPos) { + Publisher> scanIterator(final RedisClient client, final long startPos) { return reactive(new Supplier>>() { @Override public RFuture> get() { @@ -98,7 +99,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 99c8d7c18..f66ec167c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -26,6 +26,7 @@ import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; import org.redisson.api.RSetReactive; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; @@ -76,7 +77,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements }); } - private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + private Publisher> scanIteratorReactive(RedisClient client, long startPos) { return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); } @@ -211,7 +212,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index 439018495..450b1afa3 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -22,6 +22,7 @@ import java.util.List; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -44,7 +45,7 @@ public abstract class SetReactiveIterator extends Stream { private List firstValues; private List lastValues; private long nextIterPos; - private InetSocketAddress client; + private RedisClient client; private boolean finished; @@ -168,6 +169,6 @@ public abstract class SetReactiveIterator extends Stream { return result; } - protected abstract Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos); + protected abstract Publisher> scanIteratorReactive(RedisClient client, long nextIterPos); } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index c35c02ed4..8400d4770 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -31,6 +31,7 @@ import org.redisson.api.Node.InfoSection; import org.redisson.api.NodesGroup; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; +import org.redisson.client.RedisClient; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.protocol.decoder.ListScanResult; @@ -84,7 +85,7 @@ public class RedissonTest { RedissonBaseIterator iter = new RedissonBaseIterator() { int i; @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { i++; if (i == 1) { return new ListScanResult(13L, Collections.emptyList()); @@ -110,7 +111,7 @@ public class RedissonTest { RedissonBaseIterator iter = new RedissonBaseIterator() { int i; @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(RedisClient client, long nextIterPos) { i++; if (i == 1) { return new ListScanResult(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1)));