diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 2ce4924b8..d6f043f0d 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisClient; +import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisTimeoutException; @@ -192,43 +193,55 @@ public class CommandBatchExecutorService extends CommandExecutorService { } }; - try { - org.redisson.client.RedisConnection connection; - if (entry.isReadOnlyMode()) { - connection = connectionManager.connectionReadOp(slot); - } else { - connection = connectionManager.connectionWriteOp(slot); - } + Future connectionFuture; + if (entry.isReadOnlyMode()) { + connectionFuture = connectionManager.connectionReadOp(slot); + } else { + connectionFuture = connectionManager.connectionWriteOp(slot); + } - ArrayList> list = new ArrayList>(entry.getCommands().size()); - for (CommandEntry c : entry.getCommands()) { - list.add(c.getCommand()); - } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); - - ex.set(new RedisTimeoutException()); - final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); - - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - timeout.cancel(); - ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed")); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - } + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future connFuture) throws Exception { + if (attemptPromise.isCancelled()) { + return; + } + if (!connFuture.isSuccess()) { + ex.set((RedisException)connFuture.cause()); + connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + return; } - }); - if (entry.isReadOnlyMode()) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); - } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); + RedisConnection connection = connFuture.getNow(); + + ArrayList> list = new ArrayList>(entry.getCommands().size()); + for (CommandEntry c : entry.getCommands()) { + list.add(c.getCommand()); + } + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + + ex.set(new RedisTimeoutException()); + final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + timeout.cancel(); + ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed")); + connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + } + } + }); + + if (entry.isReadOnlyMode()) { + attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); + } else { + attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); + } } - } catch (RedisException e) { - ex.set(e); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - } + }); + attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index d09e80365..be3a96544 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -252,12 +252,16 @@ public class CommandExecutorService implements CommandExecutor { } try { - RedisConnection connection; + Future connectionFuture; if (readOnlyMode) { - connection = connectionManager.connectionReadOp(slot); + connectionFuture = connectionManager.connectionReadOp(slot); } else { - connection = connectionManager.connectionWriteOp(slot); + connectionFuture = connectionManager.connectionWriteOp(slot); } + connectionFuture.syncUninterruptibly(); + + RedisConnection connection = connectionFuture.getNow(); + try { return operation.execute(codec, connection); } catch (RedisMovedException e) { @@ -422,51 +426,64 @@ public class CommandExecutorService implements CommandExecutor { } }; - try { - RedisConnection connection; - if (readOnlyMode) { - if (client != null) { - connection = connectionManager.connectionReadOp(slot, client); - } else { - connection = connectionManager.connectionReadOp(slot); - } + ex.set(new RedisTimeoutException()); + final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + + Future connectionFuture; + if (readOnlyMode) { + if (client != null) { + connectionFuture = connectionManager.connectionReadOp(slot, client); } else { - connection = connectionManager.connectionWriteOp(slot); + connectionFuture = connectionManager.connectionReadOp(slot); } - log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr()); - ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); - - ex.set(new RedisTimeoutException()); - final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); - - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - timeout.cancel(); - ex.set(new WriteRedisConnectionException( - "Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause())); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - } + } else { + connectionFuture = connectionManager.connectionWriteOp(slot); + } + + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future connFuture) throws Exception { + if (attemptPromise.isCancelled()) { + return; + } + if (!connFuture.isSuccess()) { + timeout.cancel(); + ex.set((RedisException)connFuture.cause()); + connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + return; } - }); - if (readOnlyMode) { - attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); - } else { - attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); + RedisConnection connection = connFuture.getNow(); + + log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr()); + ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + timeout.cancel(); + ex.set(new WriteRedisConnectionException( + "Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause())); + connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + } + } + }); + + if (readOnlyMode) { + attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout)); + } else { + attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); + } } - } catch (RedisException e) { - ex.set(e); - connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - } + }); + attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { + timeout.cancel(); if (future.isCancelled()) { return; } - // TODO cancel timeout if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); diff --git a/src/main/java/org/redisson/RedissonPatternTopic.java b/src/main/java/org/redisson/RedissonPatternTopic.java index 508d9299c..86878187f 100644 --- a/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/src/main/java/org/redisson/RedissonPatternTopic.java @@ -25,6 +25,8 @@ import org.redisson.core.PatternMessageListener; import org.redisson.core.PatternStatusListener; import org.redisson.core.RPatternTopic; +import io.netty.util.concurrent.Future; + /** * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. * @@ -60,7 +62,9 @@ public class RedissonPatternTopic implements RPatternTopic { } private int addListener(RedisPubSubListener pubSubListener) { - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name, codec); + Future future = commandExecutor.getConnectionManager().psubscribe(name, codec); + future.syncUninterruptibly(); + PubSubConnectionEntry entry = future.getNow(); synchronized (entry) { if (entry.isActive()) { entry.addListener(name, pubSubListener); diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index 0705156bc..0a2178372 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.SortedSet; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.redisson.client.RedisConnection; @@ -40,7 +41,9 @@ import org.redisson.core.RSortedSet; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; /** @@ -310,21 +313,18 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet addAsync(final V value) { - EventLoop loop = commandExecutor.getConnectionManager().getGroup().next(); - final Promise promise = loop.newPromise(); - - loop.execute(new Runnable() { + final Promise promise = new DefaultPromise(){}; + GlobalEventExecutor.INSTANCE.execute(new Runnable() { @Override public void run() { try { - boolean result = add(value); - promise.setSuccess(result); + boolean res = add(value); + promise.setSuccess(res); } catch (Exception e) { promise.setFailure(e); } } }); - return promise; } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 6332f3a05..e1ccdafb8 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -77,7 +77,9 @@ public class RedissonTopic implements RTopic { } private int addListener(RedisPubSubListener pubSubListener) { - PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name, codec); + Future future = commandExecutor.getConnectionManager().subscribe(name, codec); + future.syncUninterruptibly(); + PubSubConnectionEntry entry = future.getNow(); synchronized (entry) { if (entry.isActive()) { entry.addListener(name, pubSubListener); diff --git a/src/main/java/org/redisson/client/handler/CommandsQueue.java b/src/main/java/org/redisson/client/handler/CommandsQueue.java index c16409746..880371ffe 100644 --- a/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -52,7 +52,8 @@ public class CommandsQueue extends ChannelDuplexHandler { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof QueueCommand) { QueueCommand data = (QueueCommand) msg; - if (queue.peek() != null && queue.peek().getCommand() == data) { + QueueCommandHolder holder = queue.peek(); + if (holder != null && holder.getCommand() == data) { super.write(ctx, msg, promise); } else { queue.add(new QueueCommandHolder(data, promise)); diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 2f0588f05..9963842c9 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -26,39 +26,41 @@ import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; -import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.misc.ReclosableLatch; +import org.redisson.misc.ConnectionPool; +import org.redisson.misc.PubSubConnectionPoll; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.Future; import io.netty.util.internal.PlatformDependent; abstract class BaseLoadBalancer implements LoadBalancer { private final Logger log = LoggerFactory.getLogger(getClass()); - private MasterSlaveServersConfig config; - private ConnectionManager connectionManager; - private final ReclosableLatch clientsEmpty = new ReclosableLatch(); - final Map clients = PlatformDependent.newConcurrentHashMap(); + final Map client2Entry = PlatformDependent.newConcurrentHashMap(); + + PubSubConnectionPoll pubSubEntries; + + ConnectionPool entries; public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) { - this.config = config; this.connectionManager = connectionManager; + entries = new ConnectionPool(config, this, connectionManager.getGroup()); + pubSubEntries = new PubSubConnectionPoll(config, this, connectionManager.getGroup()); } public synchronized void add(SubscribesConnectionEntry entry) { - clients.put(entry.getClient(), entry); - if (!entry.isFreezed()) { - clientsEmpty.open(); - } + client2Entry.put(entry.getClient(), entry); + entries.add(entry); + pubSubEntries.add(entry); } public int getAvailableClients() { int count = 0; - for (SubscribesConnectionEntry connectionEntry : clients.values()) { + for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -68,12 +70,11 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized void unfreeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : clients.values()) { + for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { if (!connectionEntry.getClient().getAddr().equals(addr)) { continue; } connectionEntry.setFreezed(false); - clientsEmpty.open(); return; } throw new IllegalStateException("Can't find " + addr + " in slaves!"); @@ -81,7 +82,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public synchronized Collection freeze(String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - for (SubscribesConnectionEntry connectionEntry : clients.values()) { + for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) { if (connectionEntry.isFreezed() || !connectionEntry.getClient().getAddr().equals(addr)) { continue; @@ -92,7 +93,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { // close all connections while (true) { - RedisConnection connection = connectionEntry.getConnections().poll(); + RedisConnection connection = connectionEntry.pollConnection(); if (connection == null) { break; } @@ -109,17 +110,6 @@ abstract class BaseLoadBalancer implements LoadBalancer { } - boolean allFreezed = true; - for (SubscribesConnectionEntry entry : clients.values()) { - if (!entry.isFreezed()) { - allFreezed = false; - break; - } - } - if (allFreezed) { - clientsEmpty.close(); - } - List list = new ArrayList(connectionEntry.getAllSubscribeConnections()); connectionEntry.getAllSubscribeConnections().clear(); return list; @@ -128,122 +118,41 @@ abstract class BaseLoadBalancer implements LoadBalancer { return Collections.emptyList(); } - public RedisPubSubConnection nextPubSubConnection() { - clientsEmpty.awaitUninterruptibly(); - List clientsCopy = new ArrayList(clients.values()); - while (true) { - if (clientsCopy.isEmpty()) { - throw new RedisConnectionException("Slave subscribe-connection pool gets exhausted!"); - } - - int index = getIndex(clientsCopy); - SubscribesConnectionEntry entry = clientsCopy.get(index); - - if (entry.isFreezed() - || !entry.getSubscribeConnectionsSemaphore().tryAcquire()) { - clientsCopy.remove(index); - } else { - try { - RedisPubSubConnection conn = entry.pollFreeSubscribeConnection(); - if (conn != null) { - return conn; - } - return entry.connectPubSub(config); - } catch (RedisConnectionException e) { - entry.getSubscribeConnectionsSemaphore().release(); - // TODO connection scoring - log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); - clientsCopy.remove(index); - } - } - } + public Future nextPubSubConnection() { + return pubSubEntries.get(); } - public RedisConnection getConnection(RedisClient client) { - SubscribesConnectionEntry entry = clients.get(client); + public Future getConnection(RedisClient client) { + SubscribesConnectionEntry entry = client2Entry.get(client); if (entry != null) { - RedisConnection conn = retrieveConnection(entry); - if (conn == null) { - throw new RedisConnectionException("Slave connection pool gets exhausted for " + client); - } - return conn; + return entries.get(entry); } - throw new RedisConnectionException("Can't find entry for " + client); + RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + client); + return connectionManager.getGroup().next().newFailedFuture(exception); } - public RedisConnection nextConnection() { - clientsEmpty.awaitUninterruptibly(); - List clientsCopy = new ArrayList(clients.values()); - while (true) { - if (clientsCopy.isEmpty()) { - throw new RedisConnectionException("Slave connection pool gets exhausted!"); - } - - int index = getIndex(clientsCopy); - SubscribesConnectionEntry entry = clientsCopy.get(index); - - RedisConnection conn = retrieveConnection(entry); - if (conn == null) { - clientsCopy.remove(index); - } else { - return conn; - } - } - } - - private RedisConnection retrieveConnection(SubscribesConnectionEntry entry) { - if (entry.isFreezed() - || !entry.getConnectionsSemaphore().tryAcquire()) { - return null; - } else { - RedisConnection conn = entry.getConnections().poll(); - if (conn != null) { - return conn; - } - try { - return entry.connect(config); - } catch (RedisException e) { - entry.getConnectionsSemaphore().release(); - // TODO connection scoring - log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr()); - return null; - } - } + public Future nextConnection() { + return entries.get(); } - abstract int getIndex(List clientsCopy); - public void returnSubscribeConnection(RedisPubSubConnection connection) { - SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); - if (entry.isFreezed()) { - connection.closeAsync(); - } else { - entry.offerFreeSubscribeConnection(connection); - } - entry.getSubscribeConnectionsSemaphore().release(); + SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient()); + pubSubEntries.returnConnection(entry, connection); } public void returnConnection(RedisConnection connection) { - SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); - if (entry.isFreezed()) { - connection.closeAsync(); - } else { - if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) { - connection.forceReconnect(); - } - entry.getConnections().add(connection); - } - entry.getConnectionsSemaphore().release(); + SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient()); + entries.returnConnection(entry, connection); } public void shutdown() { - for (SubscribesConnectionEntry entry : clients.values()) { + for (SubscribesConnectionEntry entry : client2Entry.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (RedisClient client : clients.keySet()) { + for (RedisClient client : client2Entry.keySet()) { connectionManager.shutdownAsync(client); } } diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/connection/ClusterConnectionManager.java index 557ccc318..3ce9d253e 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -117,6 +117,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { config.setMasterAddress(partition.getMasterAddress()); SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config); + entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entries.put(partition.getEndSlot(), entry); lastPartitions.put(partition.getEndSlot(), partition); } @@ -142,7 +143,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (newPart.isMasterFail()) { ClusterPartition newMasterPart = partitions.get(part.getEndSlot()); if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) { - log.debug("changing master from {} to {} for {}", + log.info("changing master from {} to {} for {}", part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot()); URI newUri = newMasterPart.getMasterAddress(); URI oldUri = part.getMasterAddress(); @@ -208,6 +209,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Map result = new HashMap(); List nodes = parse(nodesValue); for (ClusterNodeInfo clusterNodeInfo : nodes) { + if (clusterNodeInfo.getFlags().contains(Flag.NOADDR)) { + // skip it + continue; + } + String id = clusterNodeInfo.getNodeId(); if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) { id = clusterNodeInfo.getSlaveOf(); diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index 3adbe3cad..d965f98d0 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -17,7 +17,7 @@ package org.redisson.connection; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.ReconnectListener; @@ -36,11 +36,11 @@ public class ConnectionEntry { final RedisClient client; private final Queue connections = new ConcurrentLinkedQueue(); - private final Semaphore connectionsSemaphore; + private final AtomicInteger connectionsCounter = new AtomicInteger(); public ConnectionEntry(RedisClient client, int poolSize) { this.client = client; - this.connectionsSemaphore = new Semaphore(poolSize); + this.connectionsCounter.set(poolSize); } public RedisClient getClient() { @@ -55,12 +55,31 @@ public class ConnectionEntry { this.freezed = freezed; } - public Semaphore getConnectionsSemaphore() { - return connectionsSemaphore; + public int getFreeAmount() { + return connectionsCounter.get(); } - public Queue getConnections() { - return connections; + public boolean tryAcquireConnection() { + while (true) { + if (connectionsCounter.get() == 0) { + return false; + } + if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) { + return true; + } + } + } + + public void releaseConnection() { + connectionsCounter.incrementAndGet(); + } + + public RedisConnection pollConnection() { + return connections.poll(); + } + + public void releaseConnection(RedisConnection connection) { + connections.add(connection); } public RedisConnection connect(final MasterSlaveServersConfig config) { diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index c3ecea282..734f95c9b 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -30,6 +30,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; @@ -61,11 +62,11 @@ public interface ConnectionManager { void releaseWrite(int slot, RedisConnection connection); - RedisConnection connectionReadOp(int slot); + Future connectionReadOp(int slot); - RedisConnection connectionReadOp(int slot, RedisClient client); + Future connectionReadOp(int slot, RedisClient client); - RedisConnection connectionWriteOp(int slot); + Future connectionWriteOp(int slot); FutureListener createReleaseReadListener(int slot, RedisConnection conn, Timeout timeout); @@ -79,9 +80,9 @@ public interface ConnectionManager { PubSubConnectionEntry getEntry(String channelName); - PubSubConnectionEntry subscribe(String channelName, Codec codec); + Future subscribe(String channelName, Codec codec); - PubSubConnectionEntry psubscribe(String pattern, Codec codec); + Future psubscribe(String pattern, Codec codec); void subscribe(RedisPubSubListener listener, String channelName); diff --git a/src/main/java/org/redisson/connection/LoadBalancer.java b/src/main/java/org/redisson/connection/LoadBalancer.java index 8c009ef8d..d53690389 100644 --- a/src/main/java/org/redisson/connection/LoadBalancer.java +++ b/src/main/java/org/redisson/connection/LoadBalancer.java @@ -16,15 +16,20 @@ package org.redisson.connection; import java.util.Collection; +import java.util.List; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; +import io.netty.util.concurrent.Future; + public interface LoadBalancer { - RedisConnection getConnection(RedisClient client); + SubscribesConnectionEntry getEntry(List clientsCopy); + + Future getConnection(RedisClient client); int getAvailableClients(); @@ -40,9 +45,9 @@ public interface LoadBalancer { void add(SubscribesConnectionEntry entry); - RedisConnection nextConnection(); + Future nextConnection(); - RedisPubSubConnection nextPubSubConnection(); + Future nextPubSubConnection(); void returnConnection(RedisConnection connection); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 8078b8262..640e05f46 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -49,6 +49,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; @@ -129,6 +130,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void initEntry(MasterSlaveServersConfig config) { MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config); + entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entries.put(MAX_SLOT, entry); } @@ -220,12 +222,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return name2PubSubConnection.get(channelName); } - @Override - public PubSubConnectionEntry subscribe(String channelName, Codec codec) { + public Future subscribe(String channelName, Codec codec) { + Promise promise = group.next().newPromise(); + subscribe(channelName, codec, promise); + return promise; + } + + private void subscribe(final String channelName, final Codec codec, final Promise promise) { // multiple channel names per PubSubConnections allowed PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { - return сonnEntry; + promise.setSuccess(сonnEntry); + return; } Set entries = new HashSet(name2PubSubConnection.values()); @@ -234,47 +242,70 @@ public class MasterSlaveConnectionManager implements ConnectionManager { PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { entry.release(); - return oldEntry; + promise.setSuccess(oldEntry); + return; } synchronized (entry) { if (!entry.isActive()) { entry.release(); - return subscribe(channelName, codec); + subscribe(channelName, codec, promise); + return; } entry.subscribe(codec, channelName); - return entry; + promise.setSuccess(entry); + return; } } } - int slot = 0; - RedisPubSubConnection conn = nextPubSubConnection(slot); + final int slot = 0; + Future connFuture = nextPubSubConnection(slot); + connFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + return; + } + + RedisPubSubConnection conn = future.getNow(); - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - releaseSubscribeConnection(slot, entry); - return oldEntry; - } + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + releaseSubscribeConnection(slot, entry); + promise.setSuccess(oldEntry); + return; + } - synchronized (entry) { - if (!entry.isActive()) { - entry.release(); - return subscribe(channelName, codec); + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + subscribe(channelName, codec, promise); + return; + } + entry.subscribe(codec, channelName); + promise.setSuccess(entry); + } } - entry.subscribe(codec, channelName); - return entry; - } + }); } @Override - public PubSubConnectionEntry psubscribe(String channelName, Codec codec) { - // multiple channel names per PubSubConnections allowed + public Future psubscribe(final String channelName, final Codec codec) { + Promise promise = group.next().newPromise(); + psubscribe(channelName, codec, promise); + return promise; + } + + private void psubscribe(final String channelName, final Codec codec, final Promise promise) { + // multiple channel names per PubSubConnections are allowed PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { - return сonnEntry; + promise.setSuccess(сonnEntry); + return; } Set entries = new HashSet(name2PubSubConnection.values()); @@ -283,43 +314,59 @@ public class MasterSlaveConnectionManager implements ConnectionManager { PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); if (oldEntry != null) { entry.release(); - return oldEntry; + promise.setSuccess(oldEntry); + return; } synchronized (entry) { if (!entry.isActive()) { entry.release(); - return psubscribe(channelName, codec); + psubscribe(channelName, codec, promise); + return; } entry.psubscribe(codec, channelName); - return entry; + promise.setSuccess(entry); + return; } } } - int slot = 0; - RedisPubSubConnection conn = nextPubSubConnection(slot); + final int slot = 0; + Future connFuture = nextPubSubConnection(slot); + connFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + return; + } + + RedisPubSubConnection conn = future.getNow(); - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - releaseSubscribeConnection(slot, entry); - return oldEntry; - } + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + releaseSubscribeConnection(slot, entry); + promise.setSuccess(oldEntry); + return; + } - synchronized (entry) { - if (!entry.isActive()) { - entry.release(); - return psubscribe(channelName, codec); + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + psubscribe(channelName, codec, promise); + return; + } + entry.psubscribe(codec, channelName); + promise.setSuccess(entry); + } } - entry.psubscribe(codec, channelName); - return entry; - } + }); } @Override - public void subscribe(RedisPubSubListener listener, String channelName) { + public void subscribe(final RedisPubSubListener listener, final String channelName) { PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { сonnEntry.subscribe(codec, listener, channelName); @@ -346,25 +393,31 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - int slot = 0; - RedisPubSubConnection conn = nextPubSubConnection(slot); - - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - releaseSubscribeConnection(slot, entry); - return; - } - synchronized (entry) { - if (!entry.isActive()) { - entry.release(); - subscribe(listener, channelName); - return; + final int slot = 0; + Future connFuture = nextPubSubConnection(slot); + connFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + RedisPubSubConnection conn = future.getNow(); + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + releaseSubscribeConnection(slot, entry); + return; + } + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + subscribe(listener, channelName); + return; + } + entry.subscribe(codec, listener, channelName); + return; + } } - entry.subscribe(codec, listener, channelName); - return; - } + }).syncUninterruptibly(); + } @Override @@ -432,7 +485,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { for (Entry mapEntry : name2PubSubConnection.entrySet()) { for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) { PubSubConnectionEntry entry = mapEntry.getValue(); - String channelName = mapEntry.getKey(); + final String channelName = mapEntry.getKey(); if (!entry.getConnection().equals(redisPubSubConnection)) { continue; @@ -441,24 +494,39 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { entry.close(); - Collection listeners = entry.getListeners(channelName); + final Collection listeners = entry.getListeners(channelName); if (entry.getConnection().getPatternChannels().get(channelName) != null) { Codec subscribeCodec = punsubscribe(channelName); if (!listeners.isEmpty()) { - PubSubConnectionEntry newEntry = psubscribe(channelName, subscribeCodec); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + Future future = psubscribe(channelName, subscribeCodec); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) + throws Exception { + PubSubConnectionEntry newEntry = future.getNow(); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + } + }); } } else { Codec subscribeCodec = unsubscribe(channelName); if (!listeners.isEmpty()) { - PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel", channelName); + Future future = subscribe(channelName, subscribeCodec); + future.addListener(new FutureListener() { + + @Override + public void operationComplete(Future future) + throws Exception { + PubSubConnectionEntry newEntry = future.getNow(); + for (RedisPubSubListener redisPubSubListener : listeners) { + newEntry.addListener(channelName, redisPubSubListener); + } + log.debug("resubscribed listeners for '{}' channel", channelName); + } + }); } } } @@ -475,7 +543,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RedisConnection connectionWriteOp(int slot) { + public Future connectionWriteOp(int slot) { MasterSlaveEntry e = getEntry(slot); if (!e.isOwn(slot)) { throw new RedisEmptySlotException("No node for slot: " + slot, slot); @@ -484,7 +552,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RedisConnection connectionReadOp(int slot) { + public Future connectionReadOp(int slot) { MasterSlaveEntry e = getEntry(slot); if (!e.isOwn(slot)) { throw new RedisEmptySlotException("No node for slot: " + slot, slot); @@ -493,7 +561,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RedisConnection connectionReadOp(int slot, RedisClient client) { + public Future connectionReadOp(int slot, RedisClient client) { MasterSlaveEntry e = getEntry(slot); if (!e.isOwn(slot)) { throw new RedisEmptySlotException("No node for slot: " + slot, slot); @@ -501,7 +569,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return e.connectionReadOp(client); } - RedisPubSubConnection nextPubSubConnection(int slot) { + Future nextPubSubConnection(int slot) { return getEntry(slot).nextPubSubConnection(); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 988247d1c..9a0e67b0f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collection; @@ -23,23 +24,25 @@ import java.util.List; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisException; import org.redisson.client.RedisPubSubConnection; +import org.redisson.misc.ConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.Future; + /** * * @author Nikita Koksharov * */ //TODO ping support -public class MasterSlaveEntry { +public class MasterSlaveEntry { final Logger log = LoggerFactory.getLogger(getClass()); LoadBalancer slaveBalancer; - volatile ConnectionEntry masterEntry; + SubscribesConnectionEntry masterEntry; final MasterSlaveServersConfig config; final ConnectionManager connectionManager; @@ -47,6 +50,8 @@ public class MasterSlaveEntry { final int startSlot; final int endSlot; + final ConnectionPool writeConnectionHolder; + public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) { this.startSlot = startSlot; this.endSlot = endSlot; @@ -68,18 +73,20 @@ public class MasterSlaveEntry { slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); } - setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + writeConnectionHolder = new ConnectionPool(config, null, connectionManager.getGroup()); } - public void setupMasterEntry(String host, int port) { + protected void setupMasterEntry(String host, int port) { RedisClient client = connectionManager.createClient(host, port); - masterEntry = new ConnectionEntry(client, config.getMasterConnectionPoolSize()); + masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0); + writeConnectionHolder.add(masterEntry); } public Collection slaveDown(String host, int port) { Collection conns = slaveBalancer.freeze(host, port); if (slaveBalancer.getAvailableClients() == 0) { - slaveUp(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort()); + InetSocketAddress addr = masterEntry.getClient().getAddr(); + slaveUp(addr.getHostName(), addr.getPort()); } return conns; } @@ -98,8 +105,9 @@ public class MasterSlaveEntry { } public void slaveUp(String host, int port) { - if (!masterEntry.getClient().getAddr().getHostName().equals(host) && port != masterEntry.getClient().getAddr().getPort()) { - slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort()); + InetSocketAddress addr = masterEntry.getClient().getAddr(); + if (!addr.getHostName().equals(host) && port != addr.getPort()) { + slaveDown(addr.getHostName(), addr.getPort()); } slaveBalancer.unfreeze(host, port); } @@ -111,8 +119,9 @@ public class MasterSlaveEntry { * */ public void changeMaster(String host, int port) { - ConnectionEntry oldMaster = masterEntry; + SubscribesConnectionEntry oldMaster = masterEntry; setupMasterEntry(host, port); + writeConnectionHolder.remove(oldMaster); if (slaveBalancer.getAvailableClients() > 1) { slaveDown(host, port); } @@ -124,63 +133,29 @@ public class MasterSlaveEntry { slaveBalancer.shutdownAsync(); } - public RedisConnection connectionWriteOp() { - // may changed during changeMaster call - ConnectionEntry entry = masterEntry; - acquireMasterConnection(entry); - - RedisConnection conn = entry.getConnections().poll(); - if (conn != null) { - return conn; - } - - try { - return entry.connect(config); - } catch (RedisException e) { - entry.getConnectionsSemaphore().release(); - throw e; - } + public Future connectionWriteOp() { + return writeConnectionHolder.get(); } - public RedisConnection connectionReadOp() { + public Future connectionReadOp() { return slaveBalancer.nextConnection(); } - public RedisConnection connectionReadOp(RedisClient client) { + public Future connectionReadOp(RedisClient client) { return slaveBalancer.getConnection(client); } - RedisPubSubConnection nextPubSubConnection() { + Future nextPubSubConnection() { return slaveBalancer.nextPubSubConnection(); } - void acquireMasterConnection(ConnectionEntry entry) { - if (!entry.getConnectionsSemaphore().tryAcquire()) { - log.warn("Master connection pool gets exhausted! Trying to acquire connection ..."); - long time = System.currentTimeMillis(); - entry.getConnectionsSemaphore().acquireUninterruptibly(); - long endTime = System.currentTimeMillis() - time; - log.warn("Master connection acquired, time spended: {} ms", endTime); - } - } - public void returnSubscribeConnection(PubSubConnectionEntry entry) { slaveBalancer.returnSubscribeConnection(entry.getConnection()); } public void releaseWrite(RedisConnection connection) { - // may changed during changeMaster call - ConnectionEntry entry = masterEntry; - if (!entry.getClient().equals(connection.getRedisClient())) { - connection.closeAsync(); - return; - } else if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) { - connection.forceReconnect(); - } - - entry.getConnections().add(connection); - entry.getConnectionsSemaphore().release(); + writeConnectionHolder.returnConnection(masterEntry, connection); } public void releaseRead(RedisConnection сonnection) { diff --git a/src/main/java/org/redisson/connection/RandomLoadBalancer.java b/src/main/java/org/redisson/connection/RandomLoadBalancer.java index b4f153099..47ce7c144 100644 --- a/src/main/java/org/redisson/connection/RandomLoadBalancer.java +++ b/src/main/java/org/redisson/connection/RandomLoadBalancer.java @@ -15,15 +15,17 @@ */ package org.redisson.connection; +import java.security.SecureRandom; import java.util.List; import java.util.Random; public class RandomLoadBalancer extends BaseLoadBalancer { - private final Random random = new Random(); + private final Random random = new SecureRandom(); - int getIndex(List clientsCopy) { - return random.nextInt(clientsCopy.size()); + public SubscribesConnectionEntry getEntry(List clientsCopy) { + int ind = random.nextInt(clientsCopy.size()); + return clientsCopy.get(ind); } } diff --git a/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java b/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java index 88075f350..6bcec102a 100644 --- a/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java +++ b/src/main/java/org/redisson/connection/RoundRobinLoadBalancer.java @@ -23,8 +23,9 @@ public class RoundRobinLoadBalancer extends BaseLoadBalancer { private final AtomicInteger index = new AtomicInteger(-1); @Override - int getIndex(List clientsCopy) { - return Math.abs(index.incrementAndGet() % clientsCopy.size()); + public SubscribesConnectionEntry getEntry(List clientsCopy) { + int ind = Math.abs(index.incrementAndGet() % clientsCopy.size()); + return clientsCopy.get(ind); } } diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 2b5a18de0..ae7ff578c 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -70,6 +70,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { @Override protected void initEntry(MasterSlaveServersConfig config) { SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config); + entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); entries.put(MAX_SLOT, entry); } diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 62159402f..8b257ea98 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -18,56 +18,41 @@ package org.redisson.connection; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; +import org.redisson.misc.ConnectionPool; +import org.redisson.misc.PubSubConnectionPoll; -public class SingleEntry extends MasterSlaveEntry { +import io.netty.util.concurrent.Future; + +public class SingleEntry extends MasterSlaveEntry { + + final ConnectionPool pubSubConnectionHolder; public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(startSlot, endSlot, connectionManager, config); + pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup()); } @Override public void setupMasterEntry(String host, int port) { RedisClient masterClient = connectionManager.createClient(host, port); masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize()); - } - - private void acquireSubscribeConnection() { - if (!((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().tryAcquire()) { - log.warn("Subscribe connection pool gets exhausted! Trying to acquire connection ..."); - long time = System.currentTimeMillis(); - ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().acquireUninterruptibly(); - long endTime = System.currentTimeMillis() - time; - log.warn("Subscribe connection acquired, time spended: {} ms", endTime); - } + writeConnectionHolder.add(masterEntry); + pubSubConnectionHolder.add(masterEntry); } @Override - RedisPubSubConnection nextPubSubConnection() { - acquireSubscribeConnection(); - - RedisPubSubConnection conn = ((SubscribesConnectionEntry)masterEntry).pollFreeSubscribeConnection(); - if (conn != null) { - return conn; - } - - try { - return masterEntry.connectPubSub(config); - } catch (RedisConnectionException e) { - ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release(); - throw e; - } + Future nextPubSubConnection() { + return pubSubConnectionHolder.get(); } @Override public void returnSubscribeConnection(PubSubConnectionEntry entry) { - ((SubscribesConnectionEntry)masterEntry).offerFreeSubscribeConnection(entry.getConnection()); - ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release(); + pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); } @Override - public RedisConnection connectionReadOp() { + public Future connectionReadOp() { return super.connectionWriteOp(); } @@ -75,4 +60,5 @@ public class SingleEntry extends MasterSlaveEntry { public void releaseRead(RedisConnection сonnection) { super.releaseWrite(сonnection); } + } diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index 9fe0f8e82..48f66a0f7 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -17,7 +17,7 @@ package org.redisson.connection; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisClient; @@ -25,38 +25,49 @@ import org.redisson.client.RedisPubSubConnection; public class SubscribesConnectionEntry extends ConnectionEntry { - private final Semaphore subscribeConnectionsSemaphore; private final Queue allSubscribeConnections = new ConcurrentLinkedQueue(); private final Queue freeSubscribeConnections = new ConcurrentLinkedQueue(); + private final AtomicInteger connectionsCounter = new AtomicInteger(); public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) { super(client, poolSize); - this.subscribeConnectionsSemaphore = new Semaphore(subscribePoolSize); + connectionsCounter.set(subscribePoolSize); } public Queue getAllSubscribeConnections() { return allSubscribeConnections; } - public void registerSubscribeConnection(RedisPubSubConnection connection) { - allSubscribeConnections.offer(connection); - } - public RedisPubSubConnection pollFreeSubscribeConnection() { return freeSubscribeConnections.poll(); } - public void offerFreeSubscribeConnection(RedisPubSubConnection connection) { - freeSubscribeConnections.offer(connection); + public void releaseSubscribeConnection(RedisPubSubConnection connection) { + freeSubscribeConnections.add(connection); + } + + public int getFreeSubscribeAmount() { + return connectionsCounter.get(); + } + + public boolean tryAcquireSubscribeConnection() { + while (true) { + if (connectionsCounter.get() == 0) { + return false; + } + if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) { + return true; + } + } } - public Semaphore getSubscribeConnectionsSemaphore() { - return subscribeConnectionsSemaphore; + public void releaseSubscribeConnection() { + connectionsCounter.incrementAndGet(); } public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) { RedisPubSubConnection conn = super.connectPubSub(config); - allSubscribeConnections.offer(conn); + allSubscribeConnections.add(conn); return conn; } diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java new file mode 100644 index 000000000..7c0c3603a --- /dev/null +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -0,0 +1,147 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisConnection; +import org.redisson.client.RedisConnectionException; +import org.redisson.client.RedisException; +import org.redisson.connection.LoadBalancer; +import org.redisson.connection.SubscribesConnectionEntry; + +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.OneTimeTask; + +public class ConnectionPool { + + final List entries = new CopyOnWriteArrayList(); + + EventExecutor executor; + + MasterSlaveServersConfig config; + + LoadBalancer loadBalancer; + + public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) { + this.config = config; + this.loadBalancer = loadBalancer; + this.executor = eventLoopGroup.next(); + } + + public void add(SubscribesConnectionEntry entry) { + entries.add(entry); + } + + public void remove(SubscribesConnectionEntry entry) { + entries.remove(entry); + } + + public Future get() { + for (int j = entries.size()-1; j >= 0 ; j--) { + SubscribesConnectionEntry entry; + if (ConnectionPool.this.loadBalancer != null) { + entry = ConnectionPool.this.loadBalancer.getEntry(entries); + } else { + entry = entries.get(0); + } + if (!entry.isFreezed() && tryAcquireConnection(entry)) { + Promise promise = executor.newPromise(); + connect(entry, promise); + return promise; + } + } + + RedisConnectionException exception = new RedisConnectionException("Connection pool exhausted!"); + return executor.newFailedFuture(exception); + } + + public Future get(SubscribesConnectionEntry entry) { + if (!entry.isFreezed() && tryAcquireConnection(entry)) { + Promise promise = executor.newPromise(); + connect(entry, promise); + return promise; + } + + RedisConnectionException exception = new RedisConnectionException("Can't aquire connection for " + entry.getClient()); + return executor.newFailedFuture(exception); + } + + protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) { + return entry.tryAcquireConnection(); + } + + protected T poll(SubscribesConnectionEntry entry) { + return (T) entry.pollConnection(); + } + + protected T connect(SubscribesConnectionEntry entry) { + return (T) entry.connect(config); + } + + private Future connect(final SubscribesConnectionEntry entry, final Promise promise) { + T conn = poll(entry); + if (conn != null) { + if (!promise.trySuccess(conn)) { + releaseConnection(entry, conn); + releaseConnection(entry); + } + } else { + executor.execute(new OneTimeTask() { + @Override + public void run() { + try { + T conn = connect(entry); + if (!promise.trySuccess(conn)) { + releaseConnection(entry, conn); + releaseConnection(entry); + } + } catch (RedisException e) { + releaseConnection(entry); + promise.setFailure(e); + } + } + }); + } + return promise; + } + + public void returnConnection(SubscribesConnectionEntry entry, T connection) { + if (entry.isFreezed()) { + connection.closeAsync(); + } else { + if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) { + connection.forceReconnect(); + } + releaseConnection(entry, connection); + } + releaseConnection(entry); + } + + protected void releaseConnection(SubscribesConnectionEntry entry) { + entry.releaseConnection(); + } + + protected void releaseConnection(SubscribesConnectionEntry entry, T conn) { + entry.releaseConnection(conn); + } + +} diff --git a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java new file mode 100644 index 000000000..d5119087c --- /dev/null +++ b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java @@ -0,0 +1,57 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import org.redisson.MasterSlaveServersConfig; +import org.redisson.client.RedisPubSubConnection; +import org.redisson.connection.LoadBalancer; +import org.redisson.connection.SubscribesConnectionEntry; + +import io.netty.channel.EventLoopGroup; + +public class PubSubConnectionPoll extends ConnectionPool { + + public PubSubConnectionPoll(MasterSlaveServersConfig config, + LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) { + super(config, loadBalancer, eventLoopGroup); + } + + @Override + protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) { + return entry.pollFreeSubscribeConnection(); + } + + @Override + protected RedisPubSubConnection connect(SubscribesConnectionEntry entry) { + return entry.connectPubSub(config); + } + + @Override + protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) { + return entry.tryAcquireSubscribeConnection(); + } + + @Override + protected void releaseConnection(SubscribesConnectionEntry entry) { + entry.releaseSubscribeConnection(); + } + + @Override + protected void releaseConnection(SubscribesConnectionEntry entry, RedisPubSubConnection conn) { + entry.releaseSubscribeConnection(conn); + } + +}