1. Fixed - RTopic.addListener sometimes worked asynchronous

2. Fixed - ClastCastException occurred if multi-type PubSub channels were used with single connection
3. Fixed - PubSub status message decoding
4. Fixed - PubSub subscription may stuck in some cases #530
pull/537/head
Nikita 9 years ago
parent eeec753070
commit bb9a016d5b

@ -26,7 +26,7 @@ import org.redisson.core.MessageListener;
* @param <K>
* @param <V>
*/
public class PubSubMessageListener<V> implements RedisPubSubListener<V> {
public class PubSubMessageListener<V> implements RedisPubSubListener<Object> {
private final MessageListener<V> listener;
private final String name;
@ -67,18 +67,18 @@ public class PubSubMessageListener<V> implements RedisPubSubListener<V> {
}
@Override
public void onMessage(String channel, V message) {
public void onMessage(String channel, Object message) {
// could be subscribed to multiple channels
if (name.equals(channel)) {
listener.onMessage(channel, message);
listener.onMessage(channel, (V)message);
}
}
@Override
public void onPatternMessage(String pattern, String channel, V message) {
public void onPatternMessage(String pattern, String channel, Object message) {
// could be subscribed to multiple channels
if (name.equals(pattern)) {
listener.onMessage(channel, message);
listener.onMessage(channel, (V)message);
}
}

@ -53,7 +53,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
@Override
public int addListener(PatternStatusListener listener) {
return addListener(new PubSubPatternStatusListener(listener, name));
return addListener(new PubSubPatternStatusListener<Object>(listener, name));
};
@Override
@ -62,18 +62,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
return addListener(pubSubListener);
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec);
private int addListener(RedisPubSubListener<?> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
future.syncUninterruptibly();
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
return System.identityHashCode(pubSubListener);
}
}
// entry is inactive trying add again
return addListener(pubSubListener);
return System.identityHashCode(pubSubListener);
}
@Override
@ -82,17 +74,21 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
if (entry == null) {
return;
}
synchronized (entry) {
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (entry.getListeners(name).isEmpty()) {
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
} finally {
entry.unlock();
}
// entry is inactive trying add again
// listener has been re-attached
removeListener(listenerId);
}

@ -68,7 +68,7 @@ public class RedissonTopic<M> implements RTopic<M> {
@Override
public int addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name));
return addListener(new PubSubStatusListener<Object>(listener, name));
};
@Override
@ -77,7 +77,7 @@ public class RedissonTopic<M> implements RTopic<M> {
return addListener(pubSubListener);
}
private int addListener(RedisPubSubListener<M> pubSubListener) {
private int addListener(RedisPubSubListener<?> pubSubListener) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.syncUninterruptibly();
return System.identityHashCode(pubSubListener);
@ -89,7 +89,9 @@ public class RedissonTopic<M> implements RTopic<M> {
if (entry == null) {
return;
}
synchronized (entry) {
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
@ -97,6 +99,8 @@ public class RedissonTopic<M> implements RTopic<M> {
}
return;
}
} finally {
entry.unlock();
}
// listener has been re-attached

@ -17,7 +17,7 @@ package org.redisson.client;
import org.redisson.client.protocol.pubsub.PubSubType;
public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> {
public class BaseRedisPubSubListener implements RedisPubSubListener<Object> {
@Override
public boolean onStatus(PubSubType type, String channel) {
@ -25,11 +25,11 @@ public class BaseRedisPubSubListener<V> implements RedisPubSubListener<V> {
}
@Override
public void onMessage(String channel, V message) {
public void onMessage(String channel, Object message) {
}
@Override
public void onPatternMessage(String pattern, String channel, V message) {
public void onPatternMessage(String pattern, String channel, Object message) {
}
}

@ -0,0 +1,47 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
public class SubscribeListener extends BaseRedisPubSubListener {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
String name;
PubSubType type;
public SubscribeListener(String name, PubSubType type) {
super();
this.name = name;
this.type = type;
}
public boolean onStatus(PubSubType type, String channel) {
if (name.equals(channel) && this.type.equals(type)) {
promise.trySuccess(null);
}
return true;
}
public Future<Void> getSuccessFuture() {
return promise;
}
}

@ -71,10 +71,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
// It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, MultiDecoder<Object>> pubSubMessageDecoders = new HashMap<String, MultiDecoder<Object>>();
private final Map<String, CommandData<Object, Object>> pubSubChannels = PlatformDependent.newConcurrentHashMap();
private final Map<PubSubKey, CommandData<Object, Object>> pubSubChannels = PlatformDependent.newConcurrentHashMap();
public void addPubSubCommand(String channel, CommandData<Object, Object> data) {
pubSubChannels.put(channel, data);
String operation = data.getCommand().getName().toLowerCase();
pubSubChannels.put(new PubSubKey(channel, operation), data);
}
@Override
@ -313,13 +314,15 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Channel channel, Object result) {
if (result instanceof PubSubStatusMessage) {
String channelName = ((PubSubStatusMessage) result).getChannel();
CommandData<Object, Object> d = pubSubChannels.get(channelName);
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = pubSubChannels.get(key);
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
pubSubChannels.remove(channelName);
pubSubChannels.remove(key);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
pubSubChannels.remove(channelName);
pubSubChannels.remove(key);
pubSubMessageDecoders.remove(channelName);
}
}
@ -353,9 +356,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts, Channel channel) {
if (data == null) {
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(parts.get(0))) {
String channelName = (String) parts.get(1);
CommandData<Object, Object> commandData = pubSubChannels.get(channelName);
String command = parts.get(0).toString();
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) {
String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = pubSubChannels.get(key);
if (commandData == null) {
return null;
}

@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.handler;
public class PubSubKey {
private final String channel;
private final String operation;
public PubSubKey(String channel, String operation) {
super();
this.channel = channel;
this.operation = operation;
}
public String getChannel() {
return channel;
}
public String getOperation() {
return operation;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
result = prime * result + ((operation == null) ? 0 : operation.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
PubSubKey other = (PubSubKey) obj;
if (channel == null) {
if (other.channel != null)
return false;
} else if (!channel.equals(other.channel))
return false;
if (operation == null) {
if (other.operation != null)
return false;
} else if (!operation.equals(other.operation))
return false;
return true;
}
}

@ -15,12 +15,12 @@
*/
package org.redisson.client.protocol.pubsub;
public class PubSubMessage<V> implements Message {
public class PubSubMessage implements Message {
private final String channel;
private final V value;
private final Object value;
public PubSubMessage(String channel, V value) {
public PubSubMessage(String channel, Object value) {
super();
this.channel = channel;
this.value = value;
@ -30,7 +30,7 @@ public class PubSubMessage<V> implements Message {
return channel;
}
public V getValue() {
public Object getValue() {
return value;
}

@ -39,8 +39,8 @@ public class PubSubMessageDecoder implements MultiDecoder<Object> {
}
@Override
public PubSubMessage<Object> decode(List<Object> parts, State state) {
return new PubSubMessage<Object>(parts.get(1).toString(), parts.get(2));
public PubSubMessage decode(List<Object> parts, State state) {
return new PubSubMessage(parts.get(1).toString(), parts.get(2));
}
@Override

@ -91,9 +91,9 @@ public interface ConnectionManager {
PubSubConnectionEntry getPubSubEntry(String channelName);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec);
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?> listener);
Future<Codec> unsubscribe(String channelName);
Codec unsubscribe(String channelName);
Codec punsubscribe(String channelName);

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.redisson.BaseMasterSlaveServersConfig;
@ -296,131 +297,113 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec) {
public Future<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?> listener) {
Promise<PubSubConnectionEntry> promise = newPromise();
psubscribe(channelName, codec, promise);
subscribe(codec, channelName, listener, promise, PubSubType.PSUBSCRIBE);
return promise;
}
private void psubscribe(final String channelName, final Codec codec, final Promise<PubSubConnectionEntry> promise) {
// multiple channel names per PubSubConnections are allowed
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
promise.setSuccess(сonnEntry);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
psubscribe(channelName, codec, promise);
return;
}
entry.psubscribe(codec, channelName);
promise.setSuccess(entry);
return;
}
}
}
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
promise.setSuccess(oldEntry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
psubscribe(channelName, codec, promise);
return;
}
entry.psubscribe(codec, channelName);
promise.setSuccess(entry);
}
}
});
}
public Promise<PubSubConnectionEntry> subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener) {
public Promise<PubSubConnectionEntry> subscribe(Codec codec, String channelName, final RedisPubSubListener<?> listener) {
Promise<PubSubConnectionEntry> promise = newPromise();
subscribe(codec, channelName, listener, promise);
subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE);
return promise;
}
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, final Promise<PubSubConnectionEntry> promise) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener listener, final Promise<PubSubConnectionEntry> promise, PubSubType type) {
final PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
synchronized (сonnEntry) {
сonnEntry.lock();
if (name2PubSubConnection.get(channelName) != сonnEntry) {
сonnEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (сonnEntry.isActive()) {
сonnEntry.addListener(channelName, listener);
promise.setSuccess(сonnEntry);
сonnEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(сonnEntry);
}
});
сonnEntry.unlock();
return;
}
}
connect(codec, channelName, listener, promise);
сonnEntry.unlock();
connect(codec, channelName, listener, promise, type);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
for (final PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
synchronized (oldEntry) {
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
promise.setSuccess(oldEntry);
oldEntry.lock();
if (name2PubSubConnection.get(channelName) != oldEntry) {
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
}
subscribe(codec, channelName, listener, promise);
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
oldEntry.unlock();
return;
}
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
synchronized (entry) {
entry.lock();
if (name2PubSubConnection.get(channelName) != entry) {
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (!entry.isActive()) {
entry.release();
subscribe(codec, channelName, listener, promise);
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
entry.subscribe(codec, listener, channelName);
promise.setSuccess(entry);
return;
}
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(entry);
}
});
entry.addListener(channelName, listener);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
}
entry.unlock();
return;
}
}
connect(codec, channelName, listener, promise);
connect(codec, channelName, listener, promise, type);
}
private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
final Promise<PubSubConnectionEntry> promise) {
private void connect(final Codec codec, final String channelName, final RedisPubSubListener listener,
final Promise<PubSubConnectionEntry> promise, final PubSubType type) {
final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@ -433,65 +416,108 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
synchronized (oldEntry) {
oldEntry.lock();
if (name2PubSubConnection.get(channelName) != oldEntry) {
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (oldEntry.isActive()) {
oldEntry.addListener(channelName, listener);
promise.setSuccess(oldEntry);
oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(oldEntry);
}
});
oldEntry.unlock();
return;
}
}
subscribe(codec, channelName, listener, promise);
oldEntry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
synchronized (entry) {
entry.lock();
if (name2PubSubConnection.get(channelName) != entry) {
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
if (!entry.isActive()) {
entry.release();
subscribe(codec, channelName, listener, promise);
entry.unlock();
subscribe(codec, channelName, listener, promise, type);
return;
}
entry.subscribe(codec, listener, channelName);
promise.setSuccess(entry);
return;
}
entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
promise.setSuccess(entry);
}
});
entry.addListener(channelName, listener);
if (PubSubType.PSUBSCRIBE == type) {
entry.psubscribe(codec, channelName);
} else {
entry.subscribe(codec, channelName);
}
entry.unlock();
return;
}
});
}
@Override
public Future<Codec> unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
final Promise<Codec> result = newPromise();
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
releaseSubscribeConnection(0, entry);
}
public Codec unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
final CountDownLatch latch = new CountDownLatch(1);
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
latch.countDown();
return true;
}
result.setSuccess(entryCodec);
return true;
return false;
}
return false;
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
return result;
// same thread should be used
entry.lock();
try {
if (entry.tryClose()) {
releaseSubscribeConnection(0, entry);
}
} finally {
entry.unlock();
}
return entryCodec;
}
@Override
public Codec punsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
@ -500,22 +526,30 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
final CountDownLatch latch = new CountDownLatch(1);
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
if (entry.tryClose()) {
releaseSubscribeConnection(0, entry);
}
}
latch.countDown();
return true;
}
return false;
}
});
// same thread should be used
entry.lock();
try {
if (entry.tryClose()) {
releaseSubscribeConnection(0, entry);
}
} finally {
entry.unlock();
}
return entryCodec;
}

@ -159,53 +159,53 @@ public class MasterSlaveEntry {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
synchronized (pubSubEntry) {
pubSubEntry.lock();
try {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners);
} finally {
pubSubEntry.unlock();
}
}
for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
synchronized (pubSubEntry) {
pubSubEntry.lock();
try {
pubSubEntry.close();
Collection<RedisPubSubListener> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners);
} finally {
pubSubEntry.unlock();
}
}
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
Future<Codec> unsubscribeFuture = connectionManager.unsubscribe(channelName);
unsubscribeFuture.addListener(new FutureListener<Codec>() {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
if (listeners.isEmpty()) {
return;
}
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
if (listeners.isEmpty()) {
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
Codec subscribeCodec = future.getNow();
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
});
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
});
}
@ -214,7 +214,7 @@ public class MasterSlaveEntry {
final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec);
Future<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)

@ -23,22 +23,28 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.ReentrantLock;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.SubscribeListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import io.netty.util.concurrent.Future;
public class PubSubConnectionEntry {
public enum Status {ACTIVE, INACTIVE}
private ReentrantLock lock = new ReentrantLock();
private volatile Status status = Status.ACTIVE;
private final Semaphore subscribedChannelsAmount;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
private final ConcurrentMap<String, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<String, SubscribeListener>();
private final ConcurrentMap<String, Queue<RedisPubSubListener>> channelListeners = new ConcurrentHashMap<String, Queue<RedisPubSubListener>>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
@ -57,7 +63,7 @@ public class PubSubConnectionEntry {
if (result == null) {
return Collections.emptyList();
}
return new ArrayList<RedisPubSubListener>(result);
return result;
}
public void addListener(String channelName, RedisPubSubListener<?> listener) {
@ -90,14 +96,15 @@ public class PubSubConnectionEntry {
}
// TODO optimize
public void removeListener(String channelName, int listenerId) {
public boolean removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener> listeners = channelListeners.get(channelName);
for (RedisPubSubListener listener : listeners) {
if (System.identityHashCode(listener) == listenerId) {
removeListener(channelName, listener);
break;
return true;
}
}
return false;
}
private void removeListener(String channelName, RedisPubSubListener listener) {
@ -126,28 +133,45 @@ public class PubSubConnectionEntry {
conn.psubscribe(codec, pattern);
}
public void subscribe(Codec codec, RedisPubSubListener listener, String channel) {
addListener(channel, listener);
conn.subscribe(codec, channel);
private SubscribeListener addSubscribeListener(String channel, PubSubType type) {
SubscribeListener subscribeListener = new SubscribeListener(channel, type);
SubscribeListener oldSubscribeListener = subscribeChannelListeners.putIfAbsent(channel, subscribeListener);
if (oldSubscribeListener != null) {
return oldSubscribeListener;
} else {
conn.addListener(subscribeListener);
return subscribeListener;
}
}
public void unsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
public Future<Void> getSubscribeFuture(String channel, PubSubType type) {
SubscribeListener listener = subscribeChannelListeners.get(channel);
if (listener == null) {
listener = addSubscribeListener(channel, type);
}
return listener.getSuccessFuture();
}
public void unsubscribe(final String channel, final RedisPubSubListener listener) {
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) {
removeListeners(channel);
listener.onStatus(type, channel);
conn.removeListener(this);
return true;
}
return false;
}
});
conn.addOneShotListener(listener);
conn.unsubscribe(channel);
}
private void removeListeners(String channel) {
public void removeListeners(String channel) {
SubscribeListener s = subscribeChannelListeners.remove(channel);
conn.removeListener(s);
Queue<RedisPubSubListener> queue = channelListeners.get(channel);
if (queue != null) {
synchronized (queue) {
@ -160,18 +184,19 @@ public class PubSubConnectionEntry {
subscribedChannelsAmount.release();
}
public void punsubscribe(final String channel, RedisPubSubListener listener) {
conn.addOneShotListener(new BaseRedisPubSubListener<Object>() {
public void punsubscribe(final String channel, final RedisPubSubListener listener) {
conn.addListener(new BaseRedisPubSubListener() {
@Override
public boolean onStatus(PubSubType type, String ch) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) {
removeListeners(channel);
listener.onStatus(type, channel);
conn.removeListener(this);
return true;
}
return false;
}
});
conn.addOneShotListener(listener);
conn.punsubscribe(channel);
}
@ -187,5 +212,13 @@ public class PubSubConnectionEntry {
public RedisPubSubConnection getConnection() {
return conn;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
}

@ -156,24 +156,24 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
RedisPubSubConnection pubsub = future.getNow();
pubsub.addListener(new BaseRedisPubSubListener<String>() {
pubsub.addListener(new BaseRedisPubSubListener() {
@Override
public void onMessage(String channel, String msg) {
public void onMessage(String channel, Object msg) {
if ("+sentinel".equals(channel)) {
onSentinelAdded(cfg, msg, c);
onSentinelAdded(cfg, (String) msg, c);
}
if ("+slave".equals(channel)) {
onSlaveAdded(addr, msg);
onSlaveAdded(addr, (String) msg);
}
if ("+sdown".equals(channel)) {
onNodeDown(addr, msg);
onNodeDown(addr, (String) msg);
}
if ("-sdown".equals(channel)) {
onNodeUp(addr, msg);
onNodeUp(addr, (String) msg);
}
if ("+switch-master".equals(channel)) {
onMasterChange(cfg, addr, msg);
onMasterChange(cfg, addr, (String) msg);
}
}

@ -23,6 +23,7 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
@ -38,7 +39,13 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (removed) {
connectionManager.unsubscribe(channelName).syncUninterruptibly();
PubSubConnectionEntry e = connectionManager.getPubSubEntry(channelName);
e.lock();
try {
connectionManager.unsubscribe(channelName);
} finally {
e.unlock();
}
}
}
}
@ -66,7 +73,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
return oldValue.getPromise();
}
RedisPubSubListener<Long> listener = createListener(channelName, value);
RedisPubSubListener<Object> listener = createListener(channelName, value);
connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener);
return newPromise;
}
@ -76,16 +83,16 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
protected abstract void onMessage(E value, Long message);
private RedisPubSubListener<Long> createListener(final String channelName, final E value) {
RedisPubSubListener<Long> listener = new BaseRedisPubSubListener<Long>() {
private RedisPubSubListener<Object> createListener(final String channelName, final E value) {
RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
@Override
public void onMessage(String channel, Long message) {
public void onMessage(String channel, Object message) {
if (!channelName.equals(channel)) {
return;
}
PublishSubscribe.this.onMessage(value, message);
PublishSubscribe.this.onMessage(value, (Long)message);
}
@Override

@ -72,7 +72,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
}
private void addListener(final RedisPubSubListener<M> pubSubListener, final Promise<Integer> promise) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec);
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
@ -82,12 +82,15 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
}
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
entry.lock();
try {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
promise.setSuccess(pubSubListener.hashCode());
return;
}
} finally {
entry.unlock();
}
addListener(pubSubListener, promise);
}
@ -100,14 +103,18 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
if (entry == null) {
return;
}
synchronized (entry) {
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (entry.getListeners(name).isEmpty()) {
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
} finally {
entry.unlock();
}
// entry is inactive trying add again

@ -29,6 +29,7 @@ import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.StatusListener;
import org.redisson.misc.ReclosableLatch;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -69,7 +70,7 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
@Override
public Publisher<Integer> addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name));
return addListener(new PubSubStatusListener<Object>(listener, name));
};
@Override
@ -78,7 +79,7 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
return addListener(pubSubListener);
}
private Publisher<Integer> addListener(final RedisPubSubListener<M> pubSubListener) {
private Publisher<Integer> addListener(final RedisPubSubListener<?> pubSubListener) {
final Promise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@ -102,7 +103,9 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
if (entry == null) {
return;
}
synchronized (entry) {
entry.lock();
try {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
@ -110,6 +113,8 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
}
return;
}
} finally {
entry.unlock();
}
// listener has been re-attached

@ -1,20 +1,25 @@
package org.redisson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.core.BasePatternStatusListener;
import org.redisson.core.MessageListener;
import org.redisson.core.PatternMessageListener;
import org.redisson.core.PatternStatusListener;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RTopic;
import org.redisson.core.StatusListener;
public class RedissonTopicPatternTest {
@ -84,7 +89,7 @@ public class RedissonTopicPatternTest {
}
}
@Test(timeout = 300 * 1000)
@Test
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
@ -109,7 +114,7 @@ public class RedissonTopicPatternTest {
redisson.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
@ -141,7 +146,7 @@ public class RedissonTopicPatternTest {
redisson2.shutdown();
}
@Test(timeout = 600 * 1000)
@Test
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(5);
@ -184,7 +189,7 @@ public class RedissonTopicPatternTest {
redisson2.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void testListenerRemove() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RPatternTopic<Message> topic1 = redisson1.getPatternTopic("topic.*");
@ -205,10 +210,55 @@ public class RedissonTopicPatternTest {
topic1.removeListener(id);
topic2.publish(new Message("123"));
Thread.sleep(1000);
redisson1.shutdown();
redisson2.shutdown();
}
@Test
public void testConcurrentTopic() throws Exception {
Config config = BaseTest.createConfig();
config.useSingleServer().setSubscriptionConnectionPoolSize(100);
RedissonClient redisson = Redisson.create(config);
int threads = 30;
int loops = 50000;
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < threads; i++) {
Runnable worker = new Runnable() {
@Override
public void run() {
for (int j = 0; j < loops; j++) {
RPatternTopic<String> t = redisson.getPatternTopic("PUBSUB*");
int listenerId = t.addListener(new PatternStatusListener() {
@Override
public void onPUnsubscribe(String channel) {
}
@Override
public void onPSubscribe(String channel) {
}
});
redisson.getTopic("PUBSUB_" + j).publish("message");
t.removeListener(listenerId);
}
}
};
Future<?> s = executor.submit(worker);
futures.add(s);
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(threads * loops * 1000, TimeUnit.SECONDS));
for (Future<?> future : futures) {
future.get();
}
redisson.shutdown();
}
}

@ -1,20 +1,34 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.core.BaseStatusListener;
import org.redisson.core.MessageListener;
import org.redisson.core.RSet;
import org.redisson.core.RTopic;
import org.redisson.core.StatusListener;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
public class RedissonTopicTest {
@ -79,8 +93,124 @@ public class RedissonTopicTest {
}
}
@Test
public void testConcurrentTopic() throws Exception {
RedissonClient redisson = BaseTest.createInstance();
int threads = 30;
int loops = 50000;
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < threads; i++) {
Runnable worker = new Runnable() {
@Override
public void run() {
for (int j = 0; j < loops; j++) {
RTopic<String> t = redisson.getTopic("PUBSUB_" + j);
int listenerId = t.addListener(new StatusListener() {
@Override
public void onUnsubscribe(String channel) {
}
@Override
public void onSubscribe(String channel) {
}
});
t.publish("message");
t.removeListener(listenerId);
}
}
};
Future<?> s = executor.submit(worker);
futures.add(s);
}
executor.shutdown();
Assert.assertTrue(executor.awaitTermination(threads * loops * 1000, TimeUnit.SECONDS));
for (Future<?> future : futures) {
future.get();
}
redisson.shutdown();
}
@Test
public void testCommandsOrdering() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RTopic<Long> topic1 = redisson1.getTopic("topic", LongCodec.INSTANCE);
AtomicBoolean stringMessageReceived = new AtomicBoolean();
topic1.addListener((channel, msg) -> {
assertThat(msg).isEqualTo(123);
stringMessageReceived.set(true);
});
topic1.publish(123L);
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
redisson1.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void testTopicState() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<String> stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE);
for (int i = 0; i < 3; i++) {
AtomicBoolean stringMessageReceived = new AtomicBoolean();
int listenerId = stringTopic.addListener(new MessageListener<String>() {
@Override
public void onMessage(String channel, String msg) {
assertThat(msg).isEqualTo("testmsg");
stringMessageReceived.set(true);
}
});
stringTopic.publish("testmsg");
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
stringTopic.removeListener(listenerId);
}
redisson.shutdown();
}
@Test
public void testMultiTypeConnection() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<String> stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE);
AtomicBoolean stringMessageReceived = new AtomicBoolean();
stringTopic.addListener(new MessageListener<String>() {
@Override
public void onMessage(String channel, String msg) {
assertThat(msg).isEqualTo("testmsg");
stringMessageReceived.set(true);
}
});
stringTopic.publish("testmsg");
RTopic<Long> longTopic = redisson.getTopic("test2", LongCodec.INSTANCE);
AtomicBoolean longMessageReceived = new AtomicBoolean();
longTopic.addListener(new MessageListener<Long>() {
@Override
public void onMessage(String channel, Long msg) {
assertThat(msg).isEqualTo(1L);
longMessageReceived.set(true);
}
});
longTopic.publish(1L);
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(stringMessageReceived);
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(longMessageReceived);
}
@Test
public void testSyncCommands() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<String> topic = redisson.getTopic("system_bus");
@ -99,7 +229,7 @@ public class RedissonTopicTest {
redisson.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void testInnerPublish() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
@ -128,7 +258,7 @@ public class RedissonTopicTest {
redisson2.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void testStatus() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
final RTopic<Message> topic1 = redisson.getTopic("topic1");
@ -156,7 +286,7 @@ public class RedissonTopicTest {
Assert.assertTrue(l.await(5, TimeUnit.SECONDS));
}
@Test(timeout = 300 * 1000)
@Test
public void testUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
@ -181,7 +311,7 @@ public class RedissonTopicTest {
}
@Test(timeout = 300 * 1000)
@Test
public void testLazyUnsubscribe() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1);
@ -208,8 +338,7 @@ public class RedissonTopicTest {
redisson2.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void test() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(2);
@ -236,7 +365,7 @@ public class RedissonTopicTest {
volatile long counter;
@Test(timeout = 600 * 1000)
@Test
public void testHeavyLoad() throws InterruptedException {
final CountDownLatch messageRecieved = new CountDownLatch(1000);
@ -268,7 +397,8 @@ public class RedissonTopicTest {
redisson1.shutdown();
redisson2.shutdown();
}
@Test(timeout = 300 * 1000)
@Test
public void testListenerRemove() throws InterruptedException {
RedissonClient redisson1 = BaseTest.createInstance();
RTopic<Message> topic1 = redisson1.getTopic("topic");

Loading…
Cancel
Save