diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index eae66c979..cddfa7254 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -136,7 +136,6 @@ public class Redisson { } } - lock.subscribe(); return lock; } diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index f5c489284..2d9351a32 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -15,12 +15,14 @@ */ package org.redisson; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + import java.io.Serializable; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import org.redisson.connection.ConnectionManager; @@ -106,8 +108,7 @@ public class RedissonLock extends RedissonObject implements RLock { private static final Integer unlockMessage = 0; - private final CountDownLatch subscribeLatch = new CountDownLatch(1); - private final AtomicBoolean subscribeOnce = new AtomicBoolean(); + private final AtomicReference> promise = new AtomicReference>(); private final Semaphore msg = new Semaphore(1); @@ -118,40 +119,45 @@ public class RedissonLock extends RedissonObject implements RLock { this.id = id; } - public void subscribe() { - if (subscribeOnce.compareAndSet(false, true)) { - msg.acquireUninterruptibly(); + private Future subscribe() { + Promise p = promise.get(); + if (p != null) { + return p; + } + + final Promise newPromise = newPromise(); + if (!promise.compareAndSet(null, newPromise)) { + return promise.get(); + } - RedisPubSubAdapter listener = new RedisPubSubAdapter() { + msg.acquireUninterruptibly(); - @Override - public void subscribed(String channel, long count) { - if (getChannelName().equals(channel)) { - subscribeLatch.countDown(); - } - } + RedisPubSubAdapter listener = new RedisPubSubAdapter() { - @Override - public void message(String channel, Integer message) { - if (message.equals(unlockMessage) && getChannelName().equals(channel)) { - msg.release(); - } + @Override + public void subscribed(String channel, long count) { + if (getChannelName().equals(channel)) { + newPromise.setSuccess(true); } + } - }; + @Override + public void message(String channel, Integer message) { + if (message.equals(unlockMessage) && getChannelName().equals(channel)) { + msg.release(); + } + } - pubSubEntry = connectionManager.subscribe(listener, getChannelName()); - } + }; - try { - subscribeLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + pubSubEntry = connectionManager.subscribe(listener, getChannelName()); + return newPromise; } @Override public void lock() { + subscribe().awaitUninterruptibly(); + try { lockInterruptibly(); } catch (InterruptedException e) { @@ -170,6 +176,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void lockInterruptibly() throws InterruptedException { + subscribe().awaitUninterruptibly(); + while (!tryLock()) { // waiting for message msg.acquire(); @@ -178,6 +186,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public boolean tryLock() { + subscribe().awaitUninterruptibly(); + LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); currentLock.incCounter(); @@ -200,6 +210,10 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + if (!subscribe().awaitUninterruptibly(time, unit)) { + return false; + } + time = unit.toMillis(time); while (!tryLock()) { if (time <= 0) { @@ -216,6 +230,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void unlock() { + subscribe().awaitUninterruptibly(); + LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); RedisConnection connection = connectionManager.connectionWriteOp(); @@ -265,6 +281,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public void forceUnlock() { + subscribe().awaitUninterruptibly(); + RedisConnection connection = connectionManager.connectionWriteOp(); try { while (true) { @@ -280,6 +298,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public boolean isLocked() { + subscribe().awaitUninterruptibly(); + RedisConnection connection = connectionManager.connectionReadOp(); try { LockValue lock = (LockValue) connection.get(getKeyName()); @@ -291,6 +311,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public boolean isHeldByCurrentThread() { + subscribe().awaitUninterruptibly(); + LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); RedisConnection connection = connectionManager.connectionReadOp(); @@ -304,6 +326,8 @@ public class RedissonLock extends RedissonObject implements RLock { @Override public int getHoldCount() { + subscribe().awaitUninterruptibly(); + LockValue currentLock = new LockValue(id, Thread.currentThread().getId()); RedisConnection connection = connectionManager.connectionReadOp();