pull connection in async way. #223

pull/261/head
Nikita 9 years ago
parent 994bb2da47
commit 3f09bcb4cb

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
@ -192,43 +193,55 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
};
try {
org.redisson.client.RedisConnection connection;
if (entry.isReadOnlyMode()) {
connection = connectionManager.connectionReadOp(slot);
} else {
connection = connectionManager.connectionWriteOp(slot);
}
Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
connectionFuture = connectionManager.connectionReadOp(slot);
} else {
connectionFuture = connectionManager.connectionWriteOp(slot);
}
ArrayList<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
ex.set((RedisException)connFuture.cause());
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
});
if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
RedisConnection connection = connFuture.getNow();
ArrayList<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new WriteRedisConnectionException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
}
} catch (RedisException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
});
attemptPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {

@ -252,12 +252,16 @@ public class CommandExecutorService implements CommandExecutor {
}
try {
RedisConnection connection;
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connection = connectionManager.connectionReadOp(slot);
connectionFuture = connectionManager.connectionReadOp(slot);
} else {
connection = connectionManager.connectionWriteOp(slot);
connectionFuture = connectionManager.connectionWriteOp(slot);
}
connectionFuture.syncUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
try {
return operation.execute(codec, connection);
} catch (RedisMovedException e) {
@ -422,51 +426,64 @@ public class CommandExecutorService implements CommandExecutor {
}
};
try {
RedisConnection connection;
if (readOnlyMode) {
if (client != null) {
connection = connectionManager.connectionReadOp(slot, client);
} else {
connection = connectionManager.connectionReadOp(slot);
}
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
if (client != null) {
connectionFuture = connectionManager.connectionReadOp(slot, client);
} else {
connection = connectionManager.connectionWriteOp(slot);
connectionFuture = connectionManager.connectionReadOp(slot);
}
log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ex.set(new RedisTimeoutException());
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new WriteRedisConnectionException(
"Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
} else {
connectionFuture = connectionManager.connectionWriteOp(slot);
}
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
timeout.cancel();
ex.set((RedisException)connFuture.cause());
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
return;
}
});
if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
RedisConnection connection = connFuture.getNow();
log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new WriteRedisConnectionException(
"Can't send command: " + command + ", params: " + params + ", channel: " + future.channel(), future.cause()));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});
if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
}
}
} catch (RedisException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
});
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
timeout.cancel();
if (future.isCancelled()) {
return;
}
// TODO cancel timeout
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();

@ -25,6 +25,8 @@ import org.redisson.core.PatternMessageListener;
import org.redisson.core.PatternStatusListener;
import org.redisson.core.RPatternTopic;
import io.netty.util.concurrent.Future;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
*
@ -60,7 +62,9 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().psubscribe(name, codec);
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec);
future.syncUninterruptibly();
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);

@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection;
@ -40,7 +41,9 @@ import org.redisson.core.RSortedSet;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
/**
@ -310,21 +313,18 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
public Future<Boolean> addAsync(final V value) {
EventLoop loop = commandExecutor.getConnectionManager().getGroup().next();
final Promise<Boolean> promise = loop.newPromise();
loop.execute(new Runnable() {
final Promise<Boolean> promise = new DefaultPromise<Boolean>(){};
GlobalEventExecutor.INSTANCE.execute(new Runnable() {
@Override
public void run() {
try {
boolean result = add(value);
promise.setSuccess(result);
boolean res = add(value);
promise.setSuccess(res);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}

@ -77,7 +77,9 @@ public class RedissonTopic<M> implements RTopic<M> {
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().subscribe(name, codec);
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(name, codec);
future.syncUninterruptibly();
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);

@ -52,7 +52,8 @@ public class CommandsQueue extends ChannelDuplexHandler {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
if (queue.peek() != null && queue.peek().getCommand() == data) {
QueueCommandHolder holder = queue.peek();
if (holder != null && holder.getCommand() == data) {
super.write(ctx, msg, promise);
} else {
queue.add(new QueueCommandHolder(data, promise));

@ -26,39 +26,41 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.misc.ReclosableLatch;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent;
abstract class BaseLoadBalancer implements LoadBalancer {
private final Logger log = LoggerFactory.getLogger(getClass());
private MasterSlaveServersConfig config;
private ConnectionManager connectionManager;
private final ReclosableLatch clientsEmpty = new ReclosableLatch();
final Map<RedisClient, SubscribesConnectionEntry> clients = PlatformDependent.newConcurrentHashMap();
final Map<RedisClient, SubscribesConnectionEntry> client2Entry = PlatformDependent.newConcurrentHashMap();
PubSubConnectionPoll pubSubEntries;
ConnectionPool<RedisConnection> entries;
public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) {
this.config = config;
this.connectionManager = connectionManager;
entries = new ConnectionPool<RedisConnection>(config, this, connectionManager.getGroup());
pubSubEntries = new PubSubConnectionPoll(config, this, connectionManager.getGroup());
}
public synchronized void add(SubscribesConnectionEntry entry) {
clients.put(entry.getClient(), entry);
if (!entry.isFreezed()) {
clientsEmpty.open();
}
client2Entry.put(entry.getClient(), entry);
entries.add(entry);
pubSubEntries.add(entry);
}
public int getAvailableClients() {
int count = 0;
for (SubscribesConnectionEntry connectionEntry : clients.values()) {
for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
@ -68,12 +70,11 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized void unfreeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : clients.values()) {
for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) {
if (!connectionEntry.getClient().getAddr().equals(addr)) {
continue;
}
connectionEntry.setFreezed(false);
clientsEmpty.open();
return;
}
throw new IllegalStateException("Can't find " + addr + " in slaves!");
@ -81,7 +82,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public synchronized Collection<RedisPubSubConnection> freeze(String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
for (SubscribesConnectionEntry connectionEntry : clients.values()) {
for (SubscribesConnectionEntry connectionEntry : client2Entry.values()) {
if (connectionEntry.isFreezed()
|| !connectionEntry.getClient().getAddr().equals(addr)) {
continue;
@ -92,7 +93,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
// close all connections
while (true) {
RedisConnection connection = connectionEntry.getConnections().poll();
RedisConnection connection = connectionEntry.pollConnection();
if (connection == null) {
break;
}
@ -109,17 +110,6 @@ abstract class BaseLoadBalancer implements LoadBalancer {
}
boolean allFreezed = true;
for (SubscribesConnectionEntry entry : clients.values()) {
if (!entry.isFreezed()) {
allFreezed = false;
break;
}
}
if (allFreezed) {
clientsEmpty.close();
}
List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
connectionEntry.getAllSubscribeConnections().clear();
return list;
@ -128,122 +118,41 @@ abstract class BaseLoadBalancer implements LoadBalancer {
return Collections.emptyList();
}
public RedisPubSubConnection nextPubSubConnection() {
clientsEmpty.awaitUninterruptibly();
List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients.values());
while (true) {
if (clientsCopy.isEmpty()) {
throw new RedisConnectionException("Slave subscribe-connection pool gets exhausted!");
}
int index = getIndex(clientsCopy);
SubscribesConnectionEntry entry = clientsCopy.get(index);
if (entry.isFreezed()
|| !entry.getSubscribeConnectionsSemaphore().tryAcquire()) {
clientsCopy.remove(index);
} else {
try {
RedisPubSubConnection conn = entry.pollFreeSubscribeConnection();
if (conn != null) {
return conn;
}
return entry.connectPubSub(config);
} catch (RedisConnectionException e) {
entry.getSubscribeConnectionsSemaphore().release();
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
clientsCopy.remove(index);
}
}
}
public Future<RedisPubSubConnection> nextPubSubConnection() {
return pubSubEntries.get();
}
public RedisConnection getConnection(RedisClient client) {
SubscribesConnectionEntry entry = clients.get(client);
public Future<RedisConnection> getConnection(RedisClient client) {
SubscribesConnectionEntry entry = client2Entry.get(client);
if (entry != null) {
RedisConnection conn = retrieveConnection(entry);
if (conn == null) {
throw new RedisConnectionException("Slave connection pool gets exhausted for " + client);
}
return conn;
return entries.get(entry);
}
throw new RedisConnectionException("Can't find entry for " + client);
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + client);
return connectionManager.getGroup().next().newFailedFuture(exception);
}
public RedisConnection nextConnection() {
clientsEmpty.awaitUninterruptibly();
List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients.values());
while (true) {
if (clientsCopy.isEmpty()) {
throw new RedisConnectionException("Slave connection pool gets exhausted!");
}
int index = getIndex(clientsCopy);
SubscribesConnectionEntry entry = clientsCopy.get(index);
RedisConnection conn = retrieveConnection(entry);
if (conn == null) {
clientsCopy.remove(index);
} else {
return conn;
}
}
}
private RedisConnection retrieveConnection(SubscribesConnectionEntry entry) {
if (entry.isFreezed()
|| !entry.getConnectionsSemaphore().tryAcquire()) {
return null;
} else {
RedisConnection conn = entry.getConnections().poll();
if (conn != null) {
return conn;
}
try {
return entry.connect(config);
} catch (RedisException e) {
entry.getConnectionsSemaphore().release();
// TODO connection scoring
log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
return null;
}
}
public Future<RedisConnection> nextConnection() {
return entries.get();
}
abstract int getIndex(List<SubscribesConnectionEntry> clientsCopy);
public void returnSubscribeConnection(RedisPubSubConnection connection) {
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed()) {
connection.closeAsync();
} else {
entry.offerFreeSubscribeConnection(connection);
}
entry.getSubscribeConnectionsSemaphore().release();
SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient());
pubSubEntries.returnConnection(entry, connection);
}
public void returnConnection(RedisConnection connection) {
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed()) {
connection.closeAsync();
} else {
if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) {
connection.forceReconnect();
}
entry.getConnections().add(connection);
}
entry.getConnectionsSemaphore().release();
SubscribesConnectionEntry entry = client2Entry.get(connection.getRedisClient());
entries.returnConnection(entry, connection);
}
public void shutdown() {
for (SubscribesConnectionEntry entry : clients.values()) {
for (SubscribesConnectionEntry entry : client2Entry.values()) {
entry.getClient().shutdown();
}
}
public void shutdownAsync() {
for (RedisClient client : clients.keySet()) {
for (RedisClient client : client2Entry.keySet()) {
connectionManager.shutdownAsync(client);
}
}

@ -117,6 +117,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
config.setMasterAddress(partition.getMasterAddress());
SingleEntry entry = new SingleEntry(partition.getStartSlot(), partition.getEndSlot(), this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
entries.put(partition.getEndSlot(), entry);
lastPartitions.put(partition.getEndSlot(), partition);
}
@ -142,7 +143,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (newPart.isMasterFail()) {
ClusterPartition newMasterPart = partitions.get(part.getEndSlot());
if (!newMasterPart.getMasterAddress().equals(part.getMasterAddress())) {
log.debug("changing master from {} to {} for {}",
log.info("changing master from {} to {} for {}",
part.getMasterAddress(), newMasterPart.getMasterAddress(), newMasterPart.getEndSlot());
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = part.getMasterAddress();
@ -208,6 +209,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Map<Integer, ClusterPartition> result = new HashMap<Integer, ClusterPartition>();
List<ClusterNodeInfo> nodes = parse(nodesValue);
for (ClusterNodeInfo clusterNodeInfo : nodes) {
if (clusterNodeInfo.getFlags().contains(Flag.NOADDR)) {
// skip it
continue;
}
String id = clusterNodeInfo.getNodeId();
if (clusterNodeInfo.getFlags().contains(Flag.SLAVE)) {
id = clusterNodeInfo.getSlaveOf();

@ -17,7 +17,7 @@ package org.redisson.connection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener;
@ -36,11 +36,11 @@ public class ConnectionEntry {
final RedisClient client;
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Semaphore connectionsSemaphore;
private final AtomicInteger connectionsCounter = new AtomicInteger();
public ConnectionEntry(RedisClient client, int poolSize) {
this.client = client;
this.connectionsSemaphore = new Semaphore(poolSize);
this.connectionsCounter.set(poolSize);
}
public RedisClient getClient() {
@ -55,12 +55,31 @@ public class ConnectionEntry {
this.freezed = freezed;
}
public Semaphore getConnectionsSemaphore() {
return connectionsSemaphore;
public int getFreeAmount() {
return connectionsCounter.get();
}
public Queue<RedisConnection> getConnections() {
return connections;
public boolean tryAcquireConnection() {
while (true) {
if (connectionsCounter.get() == 0) {
return false;
}
if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) {
return true;
}
}
}
public void releaseConnection() {
connectionsCounter.incrementAndGet();
}
public RedisConnection pollConnection() {
return connections.poll();
}
public void releaseConnection(RedisConnection connection) {
connections.add(connection);
}
public RedisConnection connect(final MasterSlaveServersConfig config) {

@ -30,6 +30,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ -61,11 +62,11 @@ public interface ConnectionManager {
void releaseWrite(int slot, RedisConnection connection);
RedisConnection connectionReadOp(int slot);
Future<RedisConnection> connectionReadOp(int slot);
RedisConnection connectionReadOp(int slot, RedisClient client);
Future<RedisConnection> connectionReadOp(int slot, RedisClient client);
RedisConnection connectionWriteOp(int slot);
Future<RedisConnection> connectionWriteOp(int slot);
<T> FutureListener<T> createReleaseReadListener(int slot,
RedisConnection conn, Timeout timeout);
@ -79,9 +80,9 @@ public interface ConnectionManager {
PubSubConnectionEntry getEntry(String channelName);
PubSubConnectionEntry subscribe(String channelName, Codec codec);
Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec);
PubSubConnectionEntry psubscribe(String pattern, Codec codec);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec);
<V> void subscribe(RedisPubSubListener<V> listener, String channelName);

@ -16,15 +16,20 @@
package org.redisson.connection;
import java.util.Collection;
import java.util.List;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import io.netty.util.concurrent.Future;
public interface LoadBalancer {
RedisConnection getConnection(RedisClient client);
SubscribesConnectionEntry getEntry(List<SubscribesConnectionEntry> clientsCopy);
Future<RedisConnection> getConnection(RedisClient client);
int getAvailableClients();
@ -40,9 +45,9 @@ public interface LoadBalancer {
void add(SubscribesConnectionEntry entry);
RedisConnection nextConnection();
Future<RedisConnection> nextConnection();
RedisPubSubConnection nextPubSubConnection();
Future<RedisPubSubConnection> nextPubSubConnection();
void returnConnection(RedisConnection connection);

@ -49,6 +49,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
@ -129,6 +130,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void initEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
entries.put(MAX_SLOT, entry);
}
@ -220,12 +222,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return name2PubSubConnection.get(channelName);
}
@Override
public PubSubConnectionEntry subscribe(String channelName, Codec codec) {
public Future<PubSubConnectionEntry> subscribe(String channelName, Codec codec) {
Promise<PubSubConnectionEntry> promise = group.next().newPromise();
subscribe(channelName, codec, promise);
return promise;
}
private void subscribe(final String channelName, final Codec codec, final Promise<PubSubConnectionEntry> promise) {
// multiple channel names per PubSubConnections allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
promise.setSuccess(сonnEntry);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
@ -234,47 +242,70 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(channelName, codec);
subscribe(channelName, codec, promise);
return;
}
entry.subscribe(codec, channelName);
return entry;
promise.setSuccess(entry);
return;
}
}
}
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return oldEntry;
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribe(channelName, codec);
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(channelName, codec, promise);
return;
}
entry.subscribe(codec, channelName);
promise.setSuccess(entry);
}
}
entry.subscribe(codec, channelName);
return entry;
}
});
}
@Override
public PubSubConnectionEntry psubscribe(String channelName, Codec codec) {
// multiple channel names per PubSubConnections allowed
public Future<PubSubConnectionEntry> psubscribe(final String channelName, final Codec codec) {
Promise<PubSubConnectionEntry> promise = group.next().newPromise();
psubscribe(channelName, codec, promise);
return promise;
}
private void psubscribe(final String channelName, final Codec codec, final Promise<PubSubConnectionEntry> promise) {
// multiple channel names per PubSubConnections are allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
promise.setSuccess(сonnEntry);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
@ -283,43 +314,59 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName, codec);
psubscribe(channelName, codec, promise);
return;
}
entry.psubscribe(codec, channelName);
return entry;
promise.setSuccess(entry);
return;
}
}
}
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return oldEntry;
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName, codec);
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
psubscribe(channelName, codec, promise);
return;
}
entry.psubscribe(codec, channelName);
promise.setSuccess(entry);
}
}
entry.psubscribe(codec, channelName);
return entry;
}
});
}
@Override
public void subscribe(RedisPubSubListener listener, String channelName) {
public void subscribe(final RedisPubSubListener listener, final String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.subscribe(codec, listener, channelName);
@ -346,25 +393,31 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
entry.subscribe(codec, listener, channelName);
return;
}
}
entry.subscribe(codec, listener, channelName);
return;
}
}).syncUninterruptibly();
}
@Override
@ -432,7 +485,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
PubSubConnectionEntry entry = mapEntry.getValue();
String channelName = mapEntry.getKey();
final String channelName = mapEntry.getKey();
if (!entry.getConnection().equals(redisPubSubConnection)) {
continue;
@ -441,24 +494,39 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
entry.close();
Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
final Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
if (entry.getConnection().getPatternChannels().get(channelName) != null) {
Codec subscribeCodec = punsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = psubscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
Future<PubSubConnectionEntry> future = psubscribe(channelName, subscribeCodec);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
}
} else {
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
Future<PubSubConnectionEntry> future = subscribe(channelName, subscribeCodec);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
});
}
}
}
@ -475,7 +543,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RedisConnection connectionWriteOp(int slot) {
public Future<RedisConnection> connectionWriteOp(int slot) {
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
@ -484,7 +552,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RedisConnection connectionReadOp(int slot) {
public Future<RedisConnection> connectionReadOp(int slot) {
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
@ -493,7 +561,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RedisConnection connectionReadOp(int slot, RedisClient client) {
public Future<RedisConnection> connectionReadOp(int slot, RedisClient client) {
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
@ -501,7 +569,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return e.connectionReadOp(client);
}
RedisPubSubConnection nextPubSubConnection(int slot) {
Future<RedisPubSubConnection> nextPubSubConnection(int slot) {
return getEntry(slot).nextPubSubConnection();
}

@ -15,6 +15,7 @@
*/
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@ -23,23 +24,25 @@ import java.util.List;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.misc.ConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
*/
//TODO ping support
public class MasterSlaveEntry {
public class MasterSlaveEntry<E extends ConnectionEntry> {
final Logger log = LoggerFactory.getLogger(getClass());
LoadBalancer slaveBalancer;
volatile ConnectionEntry masterEntry;
SubscribesConnectionEntry masterEntry;
final MasterSlaveServersConfig config;
final ConnectionManager connectionManager;
@ -47,6 +50,8 @@ public class MasterSlaveEntry {
final int startSlot;
final int endSlot;
final ConnectionPool<RedisConnection> writeConnectionHolder;
public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.startSlot = startSlot;
this.endSlot = endSlot;
@ -68,18 +73,20 @@ public class MasterSlaveEntry {
slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
}
setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager.getGroup());
}
public void setupMasterEntry(String host, int port) {
protected void setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
masterEntry = new ConnectionEntry(client, config.getMasterConnectionPoolSize());
masterEntry = new SubscribesConnectionEntry(client, config.getMasterConnectionPoolSize(), 0);
writeConnectionHolder.add(masterEntry);
}
public Collection<RedisPubSubConnection> slaveDown(String host, int port) {
Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port);
if (slaveBalancer.getAvailableClients() == 0) {
slaveUp(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr();
slaveUp(addr.getHostName(), addr.getPort());
}
return conns;
}
@ -98,8 +105,9 @@ public class MasterSlaveEntry {
}
public void slaveUp(String host, int port) {
if (!masterEntry.getClient().getAddr().getHostName().equals(host) && port != masterEntry.getClient().getAddr().getPort()) {
slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr();
if (!addr.getHostName().equals(host) && port != addr.getPort()) {
slaveDown(addr.getHostName(), addr.getPort());
}
slaveBalancer.unfreeze(host, port);
}
@ -111,8 +119,9 @@ public class MasterSlaveEntry {
*
*/
public void changeMaster(String host, int port) {
ConnectionEntry oldMaster = masterEntry;
SubscribesConnectionEntry oldMaster = masterEntry;
setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster);
if (slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port);
}
@ -124,63 +133,29 @@ public class MasterSlaveEntry {
slaveBalancer.shutdownAsync();
}
public RedisConnection connectionWriteOp() {
// may changed during changeMaster call
ConnectionEntry entry = masterEntry;
acquireMasterConnection(entry);
RedisConnection conn = entry.getConnections().poll();
if (conn != null) {
return conn;
}
try {
return entry.connect(config);
} catch (RedisException e) {
entry.getConnectionsSemaphore().release();
throw e;
}
public Future<RedisConnection> connectionWriteOp() {
return writeConnectionHolder.get();
}
public RedisConnection connectionReadOp() {
public Future<RedisConnection> connectionReadOp() {
return slaveBalancer.nextConnection();
}
public RedisConnection connectionReadOp(RedisClient client) {
public Future<RedisConnection> connectionReadOp(RedisClient client) {
return slaveBalancer.getConnection(client);
}
RedisPubSubConnection nextPubSubConnection() {
Future<RedisPubSubConnection> nextPubSubConnection() {
return slaveBalancer.nextPubSubConnection();
}
void acquireMasterConnection(ConnectionEntry entry) {
if (!entry.getConnectionsSemaphore().tryAcquire()) {
log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
entry.getConnectionsSemaphore().acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Master connection acquired, time spended: {} ms", endTime);
}
}
public void returnSubscribeConnection(PubSubConnectionEntry entry) {
slaveBalancer.returnSubscribeConnection(entry.getConnection());
}
public void releaseWrite(RedisConnection connection) {
// may changed during changeMaster call
ConnectionEntry entry = masterEntry;
if (!entry.getClient().equals(connection.getRedisClient())) {
connection.closeAsync();
return;
} else if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) {
connection.forceReconnect();
}
entry.getConnections().add(connection);
entry.getConnectionsSemaphore().release();
writeConnectionHolder.returnConnection(masterEntry, connection);
}
public void releaseRead(RedisConnection сonnection) {

@ -15,15 +15,17 @@
*/
package org.redisson.connection;
import java.security.SecureRandom;
import java.util.List;
import java.util.Random;
public class RandomLoadBalancer extends BaseLoadBalancer {
private final Random random = new Random();
private final Random random = new SecureRandom();
int getIndex(List<SubscribesConnectionEntry> clientsCopy) {
return random.nextInt(clientsCopy.size());
public SubscribesConnectionEntry getEntry(List<SubscribesConnectionEntry> clientsCopy) {
int ind = random.nextInt(clientsCopy.size());
return clientsCopy.get(ind);
}
}

@ -23,8 +23,9 @@ public class RoundRobinLoadBalancer extends BaseLoadBalancer {
private final AtomicInteger index = new AtomicInteger(-1);
@Override
int getIndex(List<SubscribesConnectionEntry> clientsCopy) {
return Math.abs(index.incrementAndGet() % clientsCopy.size());
public SubscribesConnectionEntry getEntry(List<SubscribesConnectionEntry> clientsCopy) {
int ind = Math.abs(index.incrementAndGet() % clientsCopy.size());
return clientsCopy.get(ind);
}
}

@ -70,6 +70,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
@Override
protected void initEntry(MasterSlaveServersConfig config) {
SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config);
entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
entries.put(MAX_SLOT, entry);
}

@ -18,56 +18,41 @@ package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
public class SingleEntry extends MasterSlaveEntry {
import io.netty.util.concurrent.Future;
public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder;
public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(startSlot, endSlot, connectionManager, config);
pubSubConnectionHolder = new PubSubConnectionPoll(config, null, connectionManager.getGroup());
}
@Override
public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize());
}
private void acquireSubscribeConnection() {
if (!((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().tryAcquire()) {
log.warn("Subscribe connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Subscribe connection acquired, time spended: {} ms", endTime);
}
writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry);
}
@Override
RedisPubSubConnection nextPubSubConnection() {
acquireSubscribeConnection();
RedisPubSubConnection conn = ((SubscribesConnectionEntry)masterEntry).pollFreeSubscribeConnection();
if (conn != null) {
return conn;
}
try {
return masterEntry.connectPubSub(config);
} catch (RedisConnectionException e) {
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
throw e;
}
Future<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionHolder.get();
}
@Override
public void returnSubscribeConnection(PubSubConnectionEntry entry) {
((SubscribesConnectionEntry)masterEntry).offerFreeSubscribeConnection(entry.getConnection());
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
}
@Override
public RedisConnection connectionReadOp() {
public Future<RedisConnection> connectionReadOp() {
return super.connectionWriteOp();
}
@ -75,4 +60,5 @@ public class SingleEntry extends MasterSlaveEntry {
public void releaseRead(RedisConnection сonnection) {
super.releaseWrite(сonnection);
}
}

@ -17,7 +17,7 @@ package org.redisson.connection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
@ -25,38 +25,49 @@ import org.redisson.client.RedisPubSubConnection;
public class SubscribesConnectionEntry extends ConnectionEntry {
private final Semaphore subscribeConnectionsSemaphore;
private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger connectionsCounter = new AtomicInteger();
public SubscribesConnectionEntry(RedisClient client, int poolSize, int subscribePoolSize) {
super(client, poolSize);
this.subscribeConnectionsSemaphore = new Semaphore(subscribePoolSize);
connectionsCounter.set(subscribePoolSize);
}
public Queue<RedisPubSubConnection> getAllSubscribeConnections() {
return allSubscribeConnections;
}
public void registerSubscribeConnection(RedisPubSubConnection connection) {
allSubscribeConnections.offer(connection);
}
public RedisPubSubConnection pollFreeSubscribeConnection() {
return freeSubscribeConnections.poll();
}
public void offerFreeSubscribeConnection(RedisPubSubConnection connection) {
freeSubscribeConnections.offer(connection);
public void releaseSubscribeConnection(RedisPubSubConnection connection) {
freeSubscribeConnections.add(connection);
}
public int getFreeSubscribeAmount() {
return connectionsCounter.get();
}
public boolean tryAcquireSubscribeConnection() {
while (true) {
if (connectionsCounter.get() == 0) {
return false;
}
if (connectionsCounter.compareAndSet(connectionsCounter.get(), connectionsCounter.get() - 1)) {
return true;
}
}
}
public Semaphore getSubscribeConnectionsSemaphore() {
return subscribeConnectionsSemaphore;
public void releaseSubscribeConnection() {
connectionsCounter.incrementAndGet();
}
public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) {
RedisPubSubConnection conn = super.connectPubSub(config);
allSubscribeConnections.offer(conn);
allSubscribeConnections.add(conn);
return conn;
}

@ -0,0 +1,147 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.SubscribesConnectionEntry;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.OneTimeTask;
public class ConnectionPool<T extends RedisConnection> {
final List<SubscribesConnectionEntry> entries = new CopyOnWriteArrayList<SubscribesConnectionEntry>();
EventExecutor executor;
MasterSlaveServersConfig config;
LoadBalancer loadBalancer;
public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) {
this.config = config;
this.loadBalancer = loadBalancer;
this.executor = eventLoopGroup.next();
}
public void add(SubscribesConnectionEntry entry) {
entries.add(entry);
}
public void remove(SubscribesConnectionEntry entry) {
entries.remove(entry);
}
public Future<T> get() {
for (int j = entries.size()-1; j >= 0 ; j--) {
SubscribesConnectionEntry entry;
if (ConnectionPool.this.loadBalancer != null) {
entry = ConnectionPool.this.loadBalancer.getEntry(entries);
} else {
entry = entries.get(0);
}
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
Promise<T> promise = executor.newPromise();
connect(entry, promise);
return promise;
}
}
RedisConnectionException exception = new RedisConnectionException("Connection pool exhausted!");
return executor.newFailedFuture(exception);
}
public Future<T> get(SubscribesConnectionEntry entry) {
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
Promise<T> promise = executor.newPromise();
connect(entry, promise);
return promise;
}
RedisConnectionException exception = new RedisConnectionException("Can't aquire connection for " + entry.getClient());
return executor.newFailedFuture(exception);
}
protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) {
return entry.tryAcquireConnection();
}
protected T poll(SubscribesConnectionEntry entry) {
return (T) entry.pollConnection();
}
protected T connect(SubscribesConnectionEntry entry) {
return (T) entry.connect(config);
}
private Future<T> connect(final SubscribesConnectionEntry entry, final Promise<T> promise) {
T conn = poll(entry);
if (conn != null) {
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
try {
T conn = connect(entry);
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
} catch (RedisException e) {
releaseConnection(entry);
promise.setFailure(e);
}
}
});
}
return promise;
}
public void returnConnection(SubscribesConnectionEntry entry, T connection) {
if (entry.isFreezed()) {
connection.closeAsync();
} else {
if (connection.getFailAttempts() == config.getRefreshConnectionAfterFails()) {
connection.forceReconnect();
}
releaseConnection(entry, connection);
}
releaseConnection(entry);
}
protected void releaseConnection(SubscribesConnectionEntry entry) {
entry.releaseConnection();
}
protected void releaseConnection(SubscribesConnectionEntry entry, T conn) {
entry.releaseConnection(conn);
}
}

@ -0,0 +1,57 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.SubscribesConnectionEntry;
import io.netty.channel.EventLoopGroup;
public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection> {
public PubSubConnectionPoll(MasterSlaveServersConfig config,
LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) {
super(config, loadBalancer, eventLoopGroup);
}
@Override
protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) {
return entry.pollFreeSubscribeConnection();
}
@Override
protected RedisPubSubConnection connect(SubscribesConnectionEntry entry) {
return entry.connectPubSub(config);
}
@Override
protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) {
return entry.tryAcquireSubscribeConnection();
}
@Override
protected void releaseConnection(SubscribesConnectionEntry entry) {
entry.releaseSubscribeConnection();
}
@Override
protected void releaseConnection(SubscribesConnectionEntry entry, RedisPubSubConnection conn) {
entry.releaseSubscribeConnection(conn);
}
}
Loading…
Cancel
Save