diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 1ce6afaea..f90f42ebe 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -69,6 +69,7 @@ public class Redisson implements RedissonClient { config.useSingleServer().setAddress("127.0.0.1:6379"); // config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399"); // config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379"); +// config.useClusterServers().addNodeAddress("127.0.0.1:7000"); return create(config); } @@ -233,12 +234,12 @@ public class Redisson implements RedissonClient { } public void flushdb() { - connectionManager.write(new ResultOperation() { + connectionManager.writeAllAsync(new ResultOperation() { @Override protected Future execute(RedisAsyncConnection conn) { return conn.flushdb(); } - }); + }).awaitUninterruptibly(); } } diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index d61ffbfa8..3c7918272 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -101,7 +101,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown }; synchronized (ENTRIES) { - connectionManager.subscribeOnce(listener, getChannelName()); + connectionManager.subscribe(listener, getChannelName()); } return newPromise; } diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 84fafaac8..ce533cb0f 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -199,7 +199,7 @@ public class RedissonLock extends RedissonObject implements RLock { }; synchronized (ENTRIES) { - connectionManager.subscribeOnce(listener, getChannelName()); + connectionManager.subscribe(listener, getChannelName()); } return newPromise; } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index e10e73f79..459f63bc3 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -54,7 +54,7 @@ public interface ConnectionManager { PubSubConnectionEntry subscribe(String channelName); - PubSubConnectionEntry subscribeOnce(RedisPubSubAdapter listener, String channelName); + PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName); Future unsubscribe(String channelName); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index c0e424be7..3bc8eb9ce 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -447,13 +447,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public PubSubConnectionEntry subscribeOnce(RedisPubSubAdapter listener, String channelName) { + public PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName) { PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); if (сonnEntry != null) { - сonnEntry.addListener(channelName, listener); - // notify subscribed manually - listener.subscribed(channelName, 1); - listener.psubscribed(channelName, 1); + сonnEntry.subscribe(listener, channelName); return сonnEntry; } @@ -468,7 +465,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - return subscribeOnce(listener, channelName); + return subscribe(listener, channelName); } entry.subscribe(listener, channelName); return entry; @@ -489,7 +486,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { synchronized (entry) { if (!entry.isActive()) { entry.release(); - return subscribeOnce(listener, channelName); + return subscribe(listener, channelName); } entry.subscribe(listener, channelName); return entry;