PubSub connection re-subscription doesn't work in some cases of when there are only one slave available. #663

pull/673/head
Nikita 8 years ago
parent e5b996ea7d
commit 1173c9590c

@ -109,9 +109,9 @@ public interface ConnectionManager {
Codec unsubscribe(String channelName, AsyncSemaphore lock);
Codec unsubscribe(String channelName);
RFuture<Codec> unsubscribe(String channelName, boolean temporaryDown);
Codec punsubscribe(String channelName);
RFuture<Codec> punsubscribe(String channelName, boolean temporaryDown);
Codec punsubscribe(String channelName, AsyncSemaphore lock);

@ -53,7 +53,6 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.RedissonThreadFactory;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.TransferListener;
import org.slf4j.Logger;
@ -540,16 +539,32 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Codec unsubscribe(String channelName) {
public RFuture<Codec> unsubscribe(final String channelName, boolean temporaryDown) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
freePubSubConnections.remove(entry);
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
if (temporaryDown) {
final RPromise<Codec> result = newPromise();
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
result.trySuccess(entryCodec);
return true;
}
return false;
}
});
return result;
}
entry.unsubscribe(channelName, null);
return entryCodec;
return newSucceededFuture(entryCodec);
}
public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) {
@ -583,16 +598,32 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public Codec punsubscribe(final String channelName) {
public RFuture<Codec> punsubscribe(final String channelName, boolean temporaryDown) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
freePubSubConnections.remove(entry);
Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
if (temporaryDown) {
final RPromise<Codec> result = newPromise();
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
result.trySuccess(entryCodec);
return true;
}
return false;
}
});
return result;
}
entry.punsubscribe(channelName, null);
return entryCodec;
return newSucceededFuture(entryCodec);
}
@Override

@ -108,7 +108,7 @@ public class MasterSlaveEntry {
return false;
}
return slaveDown(e);
return slaveDown(e, freezeReason == FreezeReason.SYSTEM);
}
public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
@ -117,10 +117,10 @@ public class MasterSlaveEntry {
return false;
}
return slaveDown(entry);
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}
private boolean slaveDown(ClientConnectionsEntry entry) {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
@ -154,33 +154,45 @@ public class MasterSlaveEntry {
}
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
reattachPubSub(connection);
reattachPubSub(connection, temporaryDown);
}
entry.getAllSubscribeConnections().clear();
return true;
}
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners);
reattachPubSubListeners(channelName, listeners, temporaryDown);
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners);
reattachPatternPubSubListeners(channelName, listeners, temporaryDown);
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, boolean temporaryDown) {
RFuture<Codec> subscribeCodec = connectionManager.unsubscribe(channelName, temporaryDown);
if (listeners.isEmpty()) {
return;
}
subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
Codec subscribeCodec = future.get();
subscribe(channelName, listeners, subscribeCodec);
}
});
}
private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@ -188,42 +200,54 @@ public class MasterSlaveEntry {
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
subscribe(channelName, listeners, subscribeCodec);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
log.debug("resubscribed listeners of '{}' channel to {}", channelName, newEntry.getConnection().getRedisClient());
}
});
}
private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener<?>> listeners) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
RFuture<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
private void reattachPatternPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, boolean temporaryDown) {
RFuture<Codec> subscribeCodec = connectionManager.punsubscribe(channelName, temporaryDown);
if (listeners.isEmpty()) {
return;
}
subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
Codec subscribeCodec = future.get();
psubscribe(channelName, listeners, subscribeCodec);
}
});
}
private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
psubscribe(channelName, listeners, subscribeCodec);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
}
private void reattachBlockingQueue(RedisConnection connection) {
final CommandData<?, ?> commandData = connection.getCurrentCommand();
@ -272,7 +296,7 @@ public class MasterSlaveEntry {
public RFuture<Void> addSlave(String host, int port) {
return addSlave(host, port, true, NodeType.SLAVE);
}
private RFuture<Void> addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, host, port);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
@ -316,18 +340,23 @@ public class MasterSlaveEntry {
* @param host of Redis
* @param port of Redis
*/
public void changeMaster(String host, int port) {
ClientConnectionsEntry oldMaster = masterEntry;
setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);
// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
public void changeMaster(final String host, final int port) {
final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<Void> future = setupMasterEntry(host, port);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);
// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
}
});
}
public boolean isFreezed() {

@ -34,8 +34,6 @@ import io.netty.util.concurrent.Future;
public class PubSubConnectionEntry {
public enum Status {ACTIVE, INACTIVE}
private final AtomicInteger subscribedChannelsAmount;
private final RedisPubSubConnection conn;

@ -218,8 +218,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final String slaveAddr = ip + ":" + port;
// to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null && config.getReadMode() != ReadMode.MASTER) {
RFuture<Void> future = getEntry(singleSlotRange.getStartSlot()).addSlave(ip, Integer.valueOf(port));
if (slaves.putIfAbsent(slaveAddr, true) == null) {
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RFuture<Void> future = entry.addSlave(ip, Integer.valueOf(port));
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
@ -229,7 +230,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr);
}
@ -266,12 +267,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2];
String port = parts[3];
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.getFreezeReason() != FreezeReason.MANAGER) {
entry.freeze();
String addr = ip + ":" + port;
log.warn("master: {} has down", addr);
}
// should be resolved by master switch event
//
// MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
// if (entry.getFreezeReason() != FreezeReason.MANAGER) {
// entry.freeze();
// String addr = ip + ":" + port;
// log.warn("master: {} has down", addr);
// }
}
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());

Loading…
Cancel
Save