refactoring

pull/1300/head
Nikita 7 years ago
parent a481259aa5
commit 4e82a6eae6

@ -447,7 +447,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners); return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners);
} }
protected RFuture<PubSubConnectionEntry> subscribe(PubSubType type, final Codec codec, final String channelName, private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final String channelName,
final RedisPubSubListener<?>... listeners) { final RedisPubSubListener<?>... listeners) {
final AsyncSemaphore lock = getSemaphore(channelName); final AsyncSemaphore lock = getSemaphore(channelName);
final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>(); final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();

@ -268,9 +268,12 @@ public class MasterSlaveEntry {
@Override @Override
public void operationComplete(Future<PubSubConnectionEntry> future) public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception { throws Exception {
if (future.isSuccess()) { if (!future.isSuccess()) {
log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient()); subscribe(channelName, listeners, subscribeCodec);
return;
} }
log.debug("resubscribed listeners of '{}' channel to '{}'", channelName, future.getNow().getConnection().getRedisClient());
} }
}); });
} }
@ -292,7 +295,7 @@ public class MasterSlaveEntry {
private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners, private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) { final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null); RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() { subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override @Override
public void operationComplete(Future<PubSubConnectionEntry> future) public void operationComplete(Future<PubSubConnectionEntry> future)
@ -302,11 +305,7 @@ public class MasterSlaveEntry {
return; return;
} }
PubSubConnectionEntry newEntry = future.getNow(); log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient());
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
} }
}); });
} }

Loading…
Cancel
Save