|
|
|
@ -28,9 +28,9 @@ import org.redisson.client.RedisConnectionException;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
|
import org.redisson.connection.MasterSlaveEntry;
|
|
|
|
|
import org.redisson.connection.SubscribesConnectionEntry;
|
|
|
|
|
import org.redisson.connection.SubscribesConnectionEntry.FreezeReason;
|
|
|
|
|
import org.redisson.connection.SubscribesConnectionEntry.NodeType;
|
|
|
|
|
import org.redisson.connection.ClientConnectionsEntry;
|
|
|
|
|
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
|
|
|
|
|
import org.redisson.connection.ClientConnectionsEntry.NodeType;
|
|
|
|
|
|
|
|
|
|
import io.netty.util.Timeout;
|
|
|
|
|
import io.netty.util.TimerTask;
|
|
|
|
@ -40,7 +40,7 @@ import io.netty.util.concurrent.Promise;
|
|
|
|
|
|
|
|
|
|
public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
|
|
|
|
|
protected final List<SubscribesConnectionEntry> entries = new CopyOnWriteArrayList<SubscribesConnectionEntry>();
|
|
|
|
|
protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>();
|
|
|
|
|
|
|
|
|
|
final Deque<Promise<T>> promises = new LinkedBlockingDeque<Promise<T>>();
|
|
|
|
|
|
|
|
|
@ -67,7 +67,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
// }, 1, 1, TimeUnit.SECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void add(final SubscribesConnectionEntry entry) {
|
|
|
|
|
public void add(final ClientConnectionsEntry entry) {
|
|
|
|
|
initConnections(entry, new Runnable() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
@ -77,7 +77,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}, true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void initConnections(final SubscribesConnectionEntry entry, final Runnable runnable, boolean checkFreezed) {
|
|
|
|
|
private void initConnections(final ClientConnectionsEntry entry, final Runnable runnable, boolean checkFreezed) {
|
|
|
|
|
int minimumIdleSize = getMinimumIdleSize(entry);
|
|
|
|
|
|
|
|
|
|
if (minimumIdleSize == 0) {
|
|
|
|
@ -110,21 +110,21 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected int getMinimumIdleSize(SubscribesConnectionEntry entry) {
|
|
|
|
|
protected int getMinimumIdleSize(ClientConnectionsEntry entry) {
|
|
|
|
|
return config.getSlaveConnectionMinimumIdleSize();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void remove(SubscribesConnectionEntry entry) {
|
|
|
|
|
public void remove(ClientConnectionsEntry entry) {
|
|
|
|
|
entries.remove(entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected SubscribesConnectionEntry getEntry() {
|
|
|
|
|
protected ClientConnectionsEntry getEntry() {
|
|
|
|
|
return config.getLoadBalancer().getEntry(entries);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Future<T> get() {
|
|
|
|
|
for (int j = entries.size() - 1; j >= 0; j--) {
|
|
|
|
|
SubscribesConnectionEntry entry = getEntry();
|
|
|
|
|
ClientConnectionsEntry entry = getEntry();
|
|
|
|
|
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
|
|
|
|
|
Promise<T> promise = connectionManager.newPromise();
|
|
|
|
|
connect(entry, promise);
|
|
|
|
@ -137,7 +137,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
return promise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public Future<T> get(SubscribesConnectionEntry entry) {
|
|
|
|
|
public Future<T> get(ClientConnectionsEntry entry) {
|
|
|
|
|
if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
|
|
|
|
|
&& tryAcquireConnection(entry)) {
|
|
|
|
|
Promise<T> promise = connectionManager.newPromise();
|
|
|
|
@ -150,19 +150,19 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
return connectionManager.newFailedFuture(exception);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected boolean tryAcquireConnection(SubscribesConnectionEntry entry) {
|
|
|
|
|
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
|
|
|
|
|
return entry.getFailedAttempts() < config.getSlaveFailedAttempts() && entry.tryAcquireConnection();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected T poll(SubscribesConnectionEntry entry) {
|
|
|
|
|
protected T poll(ClientConnectionsEntry entry) {
|
|
|
|
|
return (T) entry.pollConnection();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected Future<T> connect(SubscribesConnectionEntry entry) {
|
|
|
|
|
protected Future<T> connect(ClientConnectionsEntry entry) {
|
|
|
|
|
return (Future<T>) entry.connect(config);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void connect(final SubscribesConnectionEntry entry, final Promise<T> promise) {
|
|
|
|
|
private void connect(final ClientConnectionsEntry entry, final Promise<T> promise) {
|
|
|
|
|
T conn = poll(entry);
|
|
|
|
|
if (conn != null) {
|
|
|
|
|
if (!conn.isActive()) {
|
|
|
|
@ -196,7 +196,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void promiseSuccessful(final SubscribesConnectionEntry entry, final Promise<T> promise, T conn) {
|
|
|
|
|
private void promiseSuccessful(final ClientConnectionsEntry entry, final Promise<T> promise, T conn) {
|
|
|
|
|
entry.resetFailedAttempts();
|
|
|
|
|
if (!promise.trySuccess(conn)) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
@ -204,7 +204,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, Throwable cause) {
|
|
|
|
|
private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, Throwable cause) {
|
|
|
|
|
if (entry.incFailedAttempts() == config.getSlaveFailedAttempts()) {
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
|
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
|
|
|
|
@ -219,14 +219,14 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
promise.tryFailure(cause);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void freezeMaster(SubscribesConnectionEntry entry) {
|
|
|
|
|
private void freezeMaster(ClientConnectionsEntry entry) {
|
|
|
|
|
if (entry.freezeMaster(FreezeReason.RECONNECT)) {
|
|
|
|
|
scheduleCheck(entry);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, T conn) {
|
|
|
|
|
private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, T conn) {
|
|
|
|
|
int attempts = entry.incFailedAttempts();
|
|
|
|
|
if (attempts == config.getSlaveFailedAttempts()) {
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
@ -247,7 +247,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
promise.tryFailure(cause);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void scheduleCheck(final SubscribesConnectionEntry entry) {
|
|
|
|
|
private void scheduleCheck(final ClientConnectionsEntry entry) {
|
|
|
|
|
connectionManager.newTimeout(new TimerTask() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run(Timeout timeout) throws Exception {
|
|
|
|
@ -321,7 +321,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}, config.getSlaveReconnectionTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void returnConnection(SubscribesConnectionEntry entry, T connection) {
|
|
|
|
|
public void returnConnection(ClientConnectionsEntry entry, T connection) {
|
|
|
|
|
if (entry.isFreezed()) {
|
|
|
|
|
connection.closeAsync();
|
|
|
|
|
} else {
|
|
|
|
@ -333,13 +333,13 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void releaseConnection(SubscribesConnectionEntry entry) {
|
|
|
|
|
protected void releaseConnection(ClientConnectionsEntry entry) {
|
|
|
|
|
entry.releaseConnection();
|
|
|
|
|
|
|
|
|
|
handleQueue(entry, true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void handleQueue(SubscribesConnectionEntry entry, boolean checkFreezed) {
|
|
|
|
|
private void handleQueue(ClientConnectionsEntry entry, boolean checkFreezed) {
|
|
|
|
|
while (true) {
|
|
|
|
|
if (checkFreezed && entry.isFreezed()) {
|
|
|
|
|
return;
|
|
|
|
@ -361,7 +361,7 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void releaseConnection(SubscribesConnectionEntry entry, T conn) {
|
|
|
|
|
protected void releaseConnection(ClientConnectionsEntry entry, T conn) {
|
|
|
|
|
entry.releaseConnection(conn);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|