From 62f4e9a6bbb1ea1a3ce7be5e97affd1c75e85837 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 13 Jul 2016 14:49:04 +0300 Subject: [PATCH] refactoring --- .../MasterSlaveConnectionManager.java | 191 ++++++------------ 1 file changed, 60 insertions(+), 131 deletions(-) diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 0a0280fe0..6ae43cac9 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -135,6 +135,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private IdleConnectionWatcher connectionWatcher; private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub(); + + private final Semaphore[] locks = new Semaphore[50]; + + private final Semaphore freePubSubLock = new Semaphore(1); public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { this(config); @@ -305,9 +309,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Future psubscribe(String channelName, Codec codec, RedisPubSubListener listener) { - Promise promise = newPromise(); - subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE); - return promise; + Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; + lock.acquireUninterruptibly(); + return psubscribe(channelName, codec, listener, lock); } public Future psubscribe(String channelName, Codec codec, RedisPubSubListener listener, Semaphore semaphore) { @@ -316,159 +320,84 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return promise; } - - public Future subscribe(Codec codec, String channelName, final RedisPubSubListener listener) { - Promise promise = newPromise(); - subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE); - return promise; + public Future subscribe(Codec codec, String channelName, RedisPubSubListener listener) { + Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; + lock.acquireUninterruptibly(); + return subscribe(codec, channelName, listener, lock); } - public Future subscribe(Codec codec, String channelName, final RedisPubSubListener listener, Semaphore semaphore) { + public Future subscribe(Codec codec, String channelName, RedisPubSubListener listener, Semaphore semaphore) { Promise promise = newPromise(); subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore); return promise; } - - private Semaphore[] locks = new Semaphore[50]; - - private Semaphore freePubSubLock = new Semaphore(1); - public Semaphore getSemaphore(String channelName) { return locks[Math.abs(channelName.hashCode() % locks.length)]; } private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise, PubSubType type, final Semaphore lock) { - - final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); - if (сonnEntry != null) { - сonnEntry.addListener(channelName, listener); - сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(сonnEntry); - } - }); - lock.release(); - return; - } - - freePubSubLock.acquireUninterruptibly(); - final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); - if (freeEntry == null) { - connect(codec, channelName, listener, promise, type, lock); - return; - } - - int remainFreeAmount = freeEntry.tryAcquire(); - if (remainFreeAmount == -1) { - throw new IllegalStateException(); - } - - final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); - if (oldEntry != null) { - freeEntry.release(); - freePubSubLock.release(); - - oldEntry.addListener(channelName, listener); - oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(oldEntry); - } - }); - lock.release(); - return; - } - - if (remainFreeAmount == 0) { - freePubSubConnections.poll(); - } - freePubSubLock.release(); - - freeEntry.addListener(channelName, listener); - freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); + if (сonnEntry != null) { + сonnEntry.addListener(channelName, listener); + сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(freeEntry); - lock.release(); + promise.setSuccess(сonnEntry); } }); - - - if (PubSubType.PSUBSCRIBE == type) { - freeEntry.psubscribe(codec, channelName); - } else { - freeEntry.subscribe(codec, channelName); - } - } + lock.release(); + return; + } - - private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise promise, PubSubType type) { + freePubSubLock.acquireUninterruptibly(); + final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); + if (freeEntry == null) { + connect(codec, channelName, listener, promise, type, lock); + return; + } - final Semaphore lock = locks[Math.abs(channelName.hashCode() % locks.length)]; - lock.acquireUninterruptibly(); - final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); - if (сonnEntry != null) { - сonnEntry.addListener(channelName, listener); - сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(сonnEntry); - } - }); - lock.release(); - return; - } - - freePubSubLock.acquireUninterruptibly(); - final PubSubConnectionEntry freeEntry = freePubSubConnections.peek(); - if (freeEntry == null) { - connect(codec, channelName, listener, promise, type, lock); - return; - } - - int remainFreeAmount = freeEntry.tryAcquire(); - if (remainFreeAmount == -1) { - throw new IllegalStateException(); - } - - final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); - if (oldEntry != null) { - freeEntry.release(); - freePubSubLock.release(); - - oldEntry.addListener(channelName, listener); - oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - promise.setSuccess(oldEntry); - } - }); - lock.release(); - return; - } - - if (remainFreeAmount == 0) { - freePubSubConnections.poll(); - } + int remainFreeAmount = freeEntry.tryAcquire(); + if (remainFreeAmount == -1) { + throw new IllegalStateException(); + } + + final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry); + if (oldEntry != null) { + freeEntry.release(); freePubSubLock.release(); - freeEntry.addListener(channelName, listener); - freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + oldEntry.addListener(channelName, listener); + oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - promise.setSuccess(freeEntry); - lock.release(); + promise.setSuccess(oldEntry); } }); - - - if (PubSubType.PSUBSCRIBE == type) { - freeEntry.psubscribe(codec, channelName); - } else { - freeEntry.subscribe(codec, channelName); + lock.release(); + return; + } + + if (remainFreeAmount == 0) { + freePubSubConnections.poll(); + } + freePubSubLock.release(); + + freeEntry.addListener(channelName, listener); + freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + promise.setSuccess(freeEntry); + lock.release(); } + }); + + + if (PubSubType.PSUBSCRIBE == type) { + freeEntry.psubscribe(codec, channelName); + } else { + freeEntry.subscribe(codec, channelName); + } } private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,