refactoring

pull/555/head
Nikita 9 years ago
parent 8ddd8cae7e
commit 62f4e9a6bb

@ -135,6 +135,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private IdleConnectionWatcher connectionWatcher; private IdleConnectionWatcher connectionWatcher;
private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub(); private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
private final Semaphore[] locks = new Semaphore[50];
private final Semaphore freePubSubLock = new Semaphore(1);
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
this(config); this(config);
@ -305,9 +309,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener) { public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener) {
Promise<PubSubConnectionEntry> promise = newPromise(); Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE); lock.acquireUninterruptibly();
return promise; return psubscribe(channelName, codec, listener, lock);
} }
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, Semaphore semaphore) { public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener, Semaphore semaphore) {
@ -316,159 +320,84 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return promise; return promise;
} }
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener) {
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener) { Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)];
Promise<PubSubConnectionEntry> promise = newPromise(); lock.acquireUninterruptibly();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE); return subscribe(codec, channelName, listener, lock);
return promise;
} }
public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener, Semaphore semaphore) { public Future<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, Semaphore semaphore) {
Promise<PubSubConnectionEntry> promise = newPromise(); Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore); subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore);
return promise; return promise;
} }
private Semaphore[] locks = new Semaphore[50];
private Semaphore freePubSubLock = new Semaphore(1);
public Semaphore getSemaphore(String channelName) { public Semaphore getSemaphore(String channelName) {
return locks[Math.abs(channelName.hashCode() % locks.length)]; return locks[Math.abs(channelName.hashCode() % locks.length)];
} }
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type, final Semaphore lock) { private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type, final Semaphore lock) {
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) {
if (сonnEntry != null) { сonnEntry.addListener(channelName, listener);
сonnEntry.addListener(channelName, listener); сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
}
});
lock.release();
return;
}
freePubSubLock.acquireUninterruptibly();
final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
lock.release();
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
freeEntry.addListener(channelName, listener);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(freeEntry); promise.setSuccess(сonnEntry);
lock.release();
} }
}); });
lock.release();
return;
if (PubSubType.PSUBSCRIBE == type) { }
freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
}
}
freePubSubLock.acquireUninterruptibly();
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type) { final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock);
return;
}
final Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; int remainFreeAmount = freeEntry.tryAcquire();
lock.acquireUninterruptibly(); if (remainFreeAmount == -1) {
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); throw new IllegalStateException();
if (сonnEntry != null) { }
сonnEntry.addListener(channelName, listener);
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
@Override if (oldEntry != null) {
public void operationComplete(Future<Void> future) throws Exception { freeEntry.release();
promise.setSuccess(сonnEntry);
}
});
lock.release();
return;
}
freePubSubLock.acquireUninterruptibly();
final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
connect(codec, channelName, listener, promise, type, lock);
return;
}
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
lock.release();
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release(); freePubSubLock.release();
freeEntry.addListener(channelName, listener); oldEntry.addListener(channelName, listener);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() { oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(freeEntry); promise.setSuccess(oldEntry);
lock.release();
} }
}); });
lock.release();
return;
if (PubSubType.PSUBSCRIBE == type) { }
freeEntry.psubscribe(codec, channelName);
} else { if (remainFreeAmount == 0) {
freeEntry.subscribe(codec, channelName); freePubSubConnections.poll();
}
freePubSubLock.release();
freeEntry.addListener(channelName, listener);
freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(freeEntry);
lock.release();
} }
});
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
}
} }
private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener, private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,

Loading…
Cancel
Save