refactoring

pull/709/merge
Nikita 8 years ago
parent 952bbb708e
commit 40cf0f0c1a

@ -25,8 +25,8 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener; import org.redisson.misc.TransferListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -67,7 +67,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
final RPromise<E> newPromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) { final RPromise<E> newPromise = new RedissonPromise<E>() {
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get()); return semaphore.remove(listenerHolder.get());

Loading…
Cancel
Save