Code cleanup

pull/303/head
Nikita 9 years ago
parent 2826623dfd
commit a8ae0b892b

@ -15,10 +15,8 @@
*/ */
package org.redisson.misc; package org.redisson.misc;
import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -26,11 +24,11 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
@ -42,8 +40,6 @@ public class ConnectionPool<T extends RedisConnection> {
protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>(); protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>();
final Deque<Promise<T>> promises = new LinkedBlockingDeque<Promise<T>>();
final ConnectionManager connectionManager; final ConnectionManager connectionManager;
final MasterSlaveServersConfig config; final MasterSlaveServersConfig config;
@ -54,17 +50,6 @@ public class ConnectionPool<T extends RedisConnection> {
this.config = config; this.config = config;
this.masterSlaveEntry = masterSlaveEntry; this.masterSlaveEntry = masterSlaveEntry;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
// Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
//
// @Override
// public void run() {
// if (promises.size() > 0) {
// System.out.println("promises " + promises.size());
// }
//
// }
// }, 1, 1, TimeUnit.SECONDS);
} }
public void add(final ClientConnectionsEntry entry) { public void add(final ClientConnectionsEntry entry) {
@ -72,7 +57,6 @@ public class ConnectionPool<T extends RedisConnection> {
@Override @Override
public void run() { public void run() {
entries.add(entry); entries.add(entry);
handleQueue(entry, true);
} }
}, true); }, true);
} }
@ -129,11 +113,8 @@ public class ConnectionPool<T extends RedisConnection> {
} }
RedisConnectionException exception = new RedisConnectionException( RedisConnectionException exception = new RedisConnectionException(
"Can't aquire connection from pool"); "Can't aquire connection from pool!");
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
// Promise<T> promise = connectionManager.newPromise();
// promises.add(promise);
// return promise;
} }
public Future<T> get(ClientConnectionsEntry entry) { public Future<T> get(ClientConnectionsEntry entry) {
@ -289,13 +270,10 @@ public class ConnectionPool<T extends RedisConnection> {
@Override @Override
public void run() { public void run() {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
handleQueue(entry, false);
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
} else { } else {
synchronized (entry) { synchronized (entry) {
if (entry.getFreezeReason() == FreezeReason.RECONNECT) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) {
handleQueue(entry, false);
entry.setFreezed(false); entry.setFreezed(false);
entry.setFreezeReason(null); entry.setFreezeReason(null);
} }
@ -332,30 +310,6 @@ public class ConnectionPool<T extends RedisConnection> {
protected void releaseConnection(ClientConnectionsEntry entry) { protected void releaseConnection(ClientConnectionsEntry entry) {
entry.releaseConnection(); entry.releaseConnection();
handleQueue(entry, true);
}
private void handleQueue(ClientConnectionsEntry entry, boolean checkFreezed) {
while (true) {
if (checkFreezed && entry.isFreezed()) {
return;
}
Promise<T> promise = promises.poll();
if (promise == null) {
return;
}
if (promise.isCancelled()) {
continue;
}
if (!tryAcquireConnection(entry)) {
promises.addFirst(promise);
} else {
connect(entry, promise);
}
return;
}
} }
protected void releaseConnection(ClientConnectionsEntry entry, T conn) { protected void releaseConnection(ClientConnectionsEntry entry, T conn) {

Loading…
Cancel
Save