Connection pool further optimization. #223

pull/282/head
Nikita 9 years ago
parent 6bde9d2509
commit df928fe20f

@ -88,7 +88,7 @@ public class RedisClient {
future.syncUninterruptibly();
return new RedisConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
throw new RedisConnectionException("Unable to connect to " + addr, e);
}
}
@ -115,10 +115,27 @@ public class RedisClient {
future.syncUninterruptibly();
return new RedisPubSubConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
throw new RedisConnectionException("Unable to connect to " + addr, e);
}
}
public Future<RedisPubSubConnection> connectPubSubAsync() {
final Promise<RedisPubSubConnection> f = bootstrap.group().next().newPromise();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RedisPubSubConnection c = new RedisPubSubConnection(RedisClient.this, future.channel());
f.setSuccess(c);
} else {
f.setFailure(future.cause());
}
}
});
return f;
}
public void shutdown() {
shutdownAsync().syncUninterruptibly();
}

@ -28,6 +28,9 @@ import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class ConnectionEntry {
final Logger log = LoggerFactory.getLogger(getClass());
@ -82,19 +85,27 @@ public class ConnectionEntry {
connections.add(connection);
}
public RedisConnection connect(final MasterSlaveServersConfig config) {
RedisConnection conn = client.connect();
log.debug("new connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
public Future<RedisConnection> connect(final MasterSlaveServersConfig config) {
Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
public void onReconnect(RedisConnection conn) {
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
}
});
return conn;
return future;
}
private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) {
@ -109,19 +120,27 @@ public class ConnectionEntry {
}
}
public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) {
RedisPubSubConnection conn = client.connectPubSub();
log.debug("new pubsub connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
public Future<RedisPubSubConnection> connectPubSub(final MasterSlaveServersConfig config) {
Future<RedisPubSubConnection> future = client.connectPubSubAsync();
future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void onReconnect(RedisConnection conn) {
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);
prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
}
});
return conn;
return future;
}
@Override

@ -52,6 +52,7 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
*
@ -73,7 +74,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
protected MasterSlaveServersConfig config;
@ -395,28 +396,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
entry.subscribe(codec, listener, channelName);
return;
}
connFuture.syncUninterruptibly();
RedisPubSubConnection conn = connFuture.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
}).syncUninterruptibly();
entry.subscribe(codec, listener, channelName);
return;
}
}

@ -23,6 +23,9 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisPubSubConnection;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class SubscribesConnectionEntry extends ConnectionEntry {
private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
@ -65,10 +68,19 @@ public class SubscribesConnectionEntry extends ConnectionEntry {
connectionsCounter.incrementAndGet();
}
public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) {
RedisPubSubConnection conn = super.connectPubSub(config);
allSubscribeConnections.add(conn);
return conn;
public Future<RedisPubSubConnection> connectPubSub(MasterSlaveServersConfig config) {
Future<RedisPubSubConnection> future = super.connectPubSub(config);
future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RedisPubSubConnection conn = future.getNow();
allSubscribeConnections.add(conn);
}
});
return future;
}

@ -16,20 +16,20 @@
package org.redisson.misc;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.SubscribesConnectionEntry;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.OneTimeTask;
public class ConnectionPool<T extends RedisConnection> {
@ -41,6 +41,8 @@ public class ConnectionPool<T extends RedisConnection> {
LoadBalancer loadBalancer;
final ConcurrentLinkedQueue<Promise<T>> promises = new ConcurrentLinkedQueue<Promise<T>>();
public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) {
this.config = config;
this.loadBalancer = loadBalancer;
@ -49,6 +51,7 @@ public class ConnectionPool<T extends RedisConnection> {
public void add(SubscribesConnectionEntry entry) {
entries.add(entry);
handleQueue(entry);
}
public void remove(SubscribesConnectionEntry entry) {
@ -70,8 +73,9 @@ public class ConnectionPool<T extends RedisConnection> {
}
}
RedisConnectionException exception = new RedisConnectionException("Connection pool exhausted!");
return executor.newFailedFuture(exception);
Promise<T> promise = executor.newPromise();
promises.add(promise);
return promise;
}
public Future<T> get(SubscribesConnectionEntry entry) {
@ -93,35 +97,36 @@ public class ConnectionPool<T extends RedisConnection> {
return (T) entry.pollConnection();
}
protected T connect(SubscribesConnectionEntry entry) {
return (T) entry.connect(config);
protected Future<T> connect(SubscribesConnectionEntry entry) {
return (Future<T>) entry.connect(config);
}
private Future<T> connect(final SubscribesConnectionEntry entry, final Promise<T> promise) {
private void connect(final SubscribesConnectionEntry entry, final Promise<T> promise) {
T conn = poll(entry);
if (conn != null) {
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
try {
T conn = connect(entry);
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
} catch (RedisException e) {
releaseConnection(entry);
promise.setFailure(e);
}
}
});
return;
}
return promise;
Future<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
releaseConnection(entry);
promise.setFailure(future.cause());
return;
}
T conn = future.getNow();
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
}
});
}
public void returnConnection(SubscribesConnectionEntry entry, T connection) {
@ -138,6 +143,19 @@ public class ConnectionPool<T extends RedisConnection> {
protected void releaseConnection(SubscribesConnectionEntry entry) {
entry.releaseConnection();
handleQueue(entry);
}
private void handleQueue(SubscribesConnectionEntry entry) {
Promise<T> promise = promises.poll();
if (promise != null) {
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
connect(entry, promise);
} else {
promises.add(promise);
}
}
}
protected void releaseConnection(SubscribesConnectionEntry entry, T conn) {

@ -21,6 +21,7 @@ import org.redisson.connection.LoadBalancer;
import org.redisson.connection.SubscribesConnectionEntry;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection> {
@ -35,7 +36,7 @@ public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected RedisPubSubConnection connect(SubscribesConnectionEntry entry) {
protected Future<RedisPubSubConnection> connect(SubscribesConnectionEntry entry) {
return entry.connectPubSub(config);
}

Loading…
Cancel
Save