|
|
|
@ -15,18 +15,20 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson;
|
|
|
|
|
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
|
import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
import io.netty.util.concurrent.Promise;
|
|
|
|
|
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.Future;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
|
|
|
|
import org.redisson.connection.ConnectionManager;
|
|
|
|
|
import org.redisson.connection.PubSubConnectionEntry;
|
|
|
|
|
import org.redisson.core.MessageListener;
|
|
|
|
|
import org.redisson.core.RTopic;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import com.lambdaworks.redis.RedisConnection;
|
|
|
|
|
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
|
|
|
|
@ -41,6 +43,8 @@ import com.lambdaworks.redis.pubsub.RedisPubSubListener;
|
|
|
|
|
*/
|
|
|
|
|
public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
|
|
|
|
|
private final Logger log = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
|
|
|
|
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
|
|
|
|
|
|
|
|
|
|
private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners =
|
|
|
|
@ -68,6 +72,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
@Override
|
|
|
|
|
public void subscribed(String channel, long count) {
|
|
|
|
|
if (channel.equals(getName())) {
|
|
|
|
|
log.debug("subscribed to '{}' channel", getName());
|
|
|
|
|
newPromise.setSuccess(true);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -78,6 +83,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long publish(M message) {
|
|
|
|
|
// TODO refactor to publishAsync usage
|
|
|
|
|
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
|
|
|
|
|
try {
|
|
|
|
|
return conn.publish(getName(), message);
|
|
|
|
@ -89,11 +95,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
@Override
|
|
|
|
|
public Future<Long> publishAsync(M message) {
|
|
|
|
|
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
|
|
|
|
|
try {
|
|
|
|
|
return conn.getAsync().publish(getName(), message);
|
|
|
|
|
} finally {
|
|
|
|
|
connectionManager.release(conn);
|
|
|
|
|
}
|
|
|
|
|
return conn.getAsync().publish(getName(), message).addListener(connectionManager.createListener(conn));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -138,6 +140,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
|
|
|
|
|
connectionManager.unsubscribe(oldPubSubEntry, getName());
|
|
|
|
|
log.debug("unsubscribed from '{}' channel", getName());
|
|
|
|
|
|
|
|
|
|
// reattach eventually added listeners
|
|
|
|
|
for (RedisPubSubListener listener : oldPubSubEntry.getListeners()) {
|
|
|
|
|