OneShot listener handling fixed

pull/243/head
Nikita 10 years ago
parent c13edde71c
commit 7c9e7c4239

@ -80,7 +80,8 @@ public class RedisPubSubTopicListenerWrapper<V> implements RedisPubSubListener<V
}
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
return false;
}
}

@ -88,10 +88,12 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
return true;
}
return false;
}
};

@ -129,10 +129,12 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
return true;
}
return false;
}
};

@ -20,7 +20,8 @@ import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> {
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
return false;
}
@Override

@ -17,21 +17,24 @@ package org.redisson.client;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
public class OnceRedisPubSubListener<V> implements RedisPubSubListener<V> {
public class OneShotPubSubListener<V> implements RedisPubSubListener<V> {
private RedisPubSubConnection connection;
private RedisPubSubListener<V> listener;
public OnceRedisPubSubListener(RedisPubSubConnection connection, RedisPubSubListener<V> listener) {
public OneShotPubSubListener(RedisPubSubConnection connection, RedisPubSubListener<V> listener) {
super();
this.connection = connection;
this.listener = listener;
}
@Override
public void onStatus(Type type, String channel) {
listener.onStatus(type, channel);
connection.removeListener(this);
public boolean onStatus(Type type, String channel) {
if (listener.onStatus(type, channel)) {
connection.removeListener(this);
return true;
}
return false;
}
@Override

@ -44,7 +44,7 @@ public class RedisPubSubConnection extends RedisConnection {
}
public void addOneShotListener(RedisPubSubListener listener) {
listeners.add(new OnceRedisPubSubListener<Object>(this, listener));
listeners.add(new OneShotPubSubListener<Object>(this, listener));
}
public void removeListener(RedisPubSubListener<?> listener) {

@ -19,7 +19,7 @@ import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
public interface RedisPubSubListener<V> {
void onStatus(PubSubStatusMessage.Type type, String channel);
boolean onStatus(PubSubStatusMessage.Type type, String channel);
void onMessage(String channel, V message);

@ -23,14 +23,12 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.misc.InfinitySemaphoreLatch;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

@ -337,14 +337,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
if (type == Type.UNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
}
}
return true;
}
return false;
}
});
@ -360,14 +362,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
if (type == Type.PUNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
returnSubscribeConnection(-1, entry);
}
}
return true;
}
return false;
}
});

@ -137,7 +137,7 @@ public class PubSubConnectionEntry {
public void unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public void onStatus(Type type, String ch) {
public boolean onStatus(Type type, String ch) {
if (type == Type.UNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
@ -146,7 +146,9 @@ public class PubSubConnectionEntry {
}
}
subscribedChannelsAmount.release();
return true;
}
return false;
}
});
conn.addOneShotListener(listener);
@ -156,7 +158,7 @@ public class PubSubConnectionEntry {
public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
@Override
public void onStatus(Type type, String ch) {
public boolean onStatus(Type type, String ch) {
if (type == Type.PUNSUBSCRIBE && channel.equals(ch)) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channel);
if (listeners != null) {
@ -165,7 +167,9 @@ public class PubSubConnectionEntry {
}
}
subscribedChannelsAmount.release();
return true;
}
return false;
}
});
conn.addOneShotListener(listener);

@ -33,14 +33,10 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -133,10 +129,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
if (type == Type.SUBSCRIBE) {
log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort());
}
return true;
}
});

@ -21,10 +21,8 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage.Type;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisClientTest {
@ -37,10 +35,11 @@ public class RedisClientTest {
pubSubConnection.addListener(new RedisPubSubListener<Object>() {
@Override
public void onStatus(Type type, String channel) {
public boolean onStatus(Type type, String channel) {
Assert.assertEquals(Type.SUBSCRIBE, type);
Assert.assertTrue(Arrays.asList("test1", "test2").contains(channel));
latch.countDown();
return true;
}
@Override

Loading…
Cancel
Save