resubscribe to channel-patterns during slave down

pull/255/head
Nikita 10 years ago
parent 387343e304
commit 40af48f4ae

@ -374,7 +374,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null;
}
return entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
@ -390,6 +391,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
});
return entryCodec;
}
@Override
@ -399,7 +401,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null;
}
return entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
@ -415,6 +418,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
});
return entryCodec;
}
protected MasterSlaveEntry getEntry(int slot) {
@ -438,13 +442,24 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.close();
Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
if (entry.getConnection().getPatternChannels().get(channelName) != null) {
Codec subscribeCodec = punsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = psubscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
} else {
Codec subscribeCodec = unsubscribe(channelName);
if (!listeners.isEmpty()) {
PubSubConnectionEntry newEntry = subscribe(channelName, subscribeCodec);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
}
}

@ -18,7 +18,6 @@ package org.redisson.connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -33,8 +32,6 @@ import org.redisson.client.protocol.pubsub.PubSubType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.PlatformDependent;
public class PubSubConnectionEntry {
public enum Status {ACTIVE, INACTIVE}
@ -46,7 +43,6 @@ public class PubSubConnectionEntry {
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
private final Map<String, Codec> channel2Codec = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
@ -127,12 +123,10 @@ public class PubSubConnectionEntry {
}
public void subscribe(Codec codec, String channelName) {
channel2Codec.put(channelName, codec);
conn.subscribe(codec, channelName);
}
public void psubscribe(Codec codec, String pattern) {
channel2Codec.put(pattern, codec);
conn.psubscribe(codec, pattern);
}
@ -141,7 +135,7 @@ public class PubSubConnectionEntry {
conn.subscribe(codec, channel);
}
public Codec unsubscribe(final String channel, RedisPubSubListener listener) {
public void unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public boolean onStatus(PubSubType type, String ch) {
@ -155,7 +149,6 @@ public class PubSubConnectionEntry {
});
conn.addOneShotListener(listener);
conn.unsubscribe(channel);
return channel2Codec.remove(channel);
}
private void removeListeners(String channel) {
@ -171,7 +164,7 @@ public class PubSubConnectionEntry {
subscribedChannelsAmount.release();
}
public Codec punsubscribe(final String channel, RedisPubSubListener listener) {
public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public boolean onStatus(PubSubType type, String ch) {
@ -184,7 +177,6 @@ public class PubSubConnectionEntry {
});
conn.addOneShotListener(listener);
conn.punsubscribe(channel);
return channel2Codec.remove(channel);
}

Loading…
Cancel
Save