diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index f9465124f..782f6ab2c 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -15,18 +15,20 @@ */ package org.redisson; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import org.redisson.connection.ConnectionManager; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.MessageListener; import org.redisson.core.RTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; @@ -41,6 +43,8 @@ import com.lambdaworks.redis.pubsub.RedisPubSubListener; */ public class RedissonTopic extends RedissonObject implements RTopic { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final AtomicReference> promise = new AtomicReference>(); private final Map> listeners = @@ -68,6 +72,7 @@ public class RedissonTopic extends RedissonObject implements RTopic { @Override public void subscribed(String channel, long count) { if (channel.equals(getName())) { + log.debug("subscribed to '{}' channel", getName()); newPromise.setSuccess(true); } } @@ -78,6 +83,7 @@ public class RedissonTopic extends RedissonObject implements RTopic { @Override public long publish(M message) { + // TODO refactor to publishAsync usage RedisConnection conn = connectionManager.connectionWriteOp(); try { return conn.publish(getName(), message); @@ -89,11 +95,7 @@ public class RedissonTopic extends RedissonObject implements RTopic { @Override public Future publishAsync(M message) { RedisConnection conn = connectionManager.connectionWriteOp(); - try { - return conn.getAsync().publish(getName(), message); - } finally { - connectionManager.release(conn); - } + return conn.getAsync().publish(getName(), message).addListener(connectionManager.createListener(conn)); } @Override @@ -138,6 +140,7 @@ public class RedissonTopic extends RedissonObject implements RTopic { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { connectionManager.unsubscribe(oldPubSubEntry, getName()); + log.debug("unsubscribed from '{}' channel", getName()); // reattach eventually added listeners for (RedisPubSubListener listener : oldPubSubEntry.getListeners()) {