MasterSlaveConnectionManager.subscribe fixed. #93

pull/99/head
Nikita 10 years ago
parent e887d97e20
commit de8ee69a14

@ -69,6 +69,7 @@ public class Redisson implements RedissonClient {
config.useSingleServer().setAddress("127.0.0.1:6379");
// config.useMasterSlaveConnection().setMasterAddress("127.0.0.1:6379").addSlaveAddress("127.0.0.1:6389").addSlaveAddress("127.0.0.1:6399");
// config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379");
// config.useClusterServers().addNodeAddress("127.0.0.1:7000");
return create(config);
}
@ -233,12 +234,12 @@ public class Redisson implements RedissonClient {
}
public void flushdb() {
connectionManager.write(new ResultOperation<String, Object>() {
connectionManager.writeAllAsync(new ResultOperation<String, Object>() {
@Override
protected Future<String> execute(RedisAsyncConnection<Object, Object> conn) {
return conn.flushdb();
}
});
}).awaitUninterruptibly();
}
}

@ -101,7 +101,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
};
synchronized (ENTRIES) {
connectionManager.subscribeOnce(listener, getChannelName());
connectionManager.subscribe(listener, getChannelName());
}
return newPromise;
}

@ -199,7 +199,7 @@ public class RedissonLock extends RedissonObject implements RLock {
};
synchronized (ENTRIES) {
connectionManager.subscribeOnce(listener, getChannelName());
connectionManager.subscribe(listener, getChannelName());
}
return newPromise;
}

@ -54,7 +54,7 @@ public interface ConnectionManager {
<K, V> PubSubConnectionEntry subscribe(String channelName);
<K, V> PubSubConnectionEntry subscribeOnce(RedisPubSubAdapter<V> listener, String channelName);
<K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName);
Future unsubscribe(String channelName);

@ -447,13 +447,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public <K, V> PubSubConnectionEntry subscribeOnce(RedisPubSubAdapter<V> listener, String channelName) {
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.addListener(channelName, listener);
// notify subscribed manually
listener.subscribed(channelName, 1);
listener.psubscribed(channelName, 1);
сonnEntry.subscribe(listener, channelName);
return сonnEntry;
}
@ -468,7 +465,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribeOnce(listener, channelName);
return subscribe(listener, channelName);
}
entry.subscribe(listener, channelName);
return entry;
@ -489,7 +486,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return subscribeOnce(listener, channelName);
return subscribe(listener, channelName);
}
entry.subscribe(listener, channelName);
return entry;

Loading…
Cancel
Save