RedissonLock "promised"

pull/25/head
Nikita 11 years ago
parent 5cdf69aac4
commit 59e5749e11

@ -136,7 +136,6 @@ public class Redisson {
}
}
lock.subscribe();
return lock;
}

@ -15,12 +15,14 @@
*/
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import org.redisson.connection.ConnectionManager;
@ -106,8 +108,7 @@ public class RedissonLock extends RedissonObject implements RLock {
private static final Integer unlockMessage = 0;
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
private final Semaphore msg = new Semaphore(1);
@ -118,40 +119,45 @@ public class RedissonLock extends RedissonObject implements RLock {
this.id = id;
}
public void subscribe() {
if (subscribeOnce.compareAndSet(false, true)) {
msg.acquireUninterruptibly();
private Future<Boolean> subscribe() {
Promise<Boolean> p = promise.get();
if (p != null) {
return p;
}
final Promise<Boolean> newPromise = newPromise();
if (!promise.compareAndSet(null, newPromise)) {
return promise.get();
}
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
msg.acquireUninterruptibly();
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
subscribeLatch.countDown();
}
}
RedisPubSubAdapter<String, Integer> listener = new RedisPubSubAdapter<String, Integer>() {
@Override
public void message(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
msg.release();
}
@Override
public void subscribed(String channel, long count) {
if (getChannelName().equals(channel)) {
newPromise.setSuccess(true);
}
}
};
@Override
public void message(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
msg.release();
}
}
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
}
};
try {
subscribeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
pubSubEntry = connectionManager.subscribe(listener, getChannelName());
return newPromise;
}
@Override
public void lock() {
subscribe().awaitUninterruptibly();
try {
lockInterruptibly();
} catch (InterruptedException e) {
@ -170,6 +176,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void lockInterruptibly() throws InterruptedException {
subscribe().awaitUninterruptibly();
while (!tryLock()) {
// waiting for message
msg.acquire();
@ -178,6 +186,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public boolean tryLock() {
subscribe().awaitUninterruptibly();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
currentLock.incCounter();
@ -200,6 +210,10 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (!subscribe().awaitUninterruptibly(time, unit)) {
return false;
}
time = unit.toMillis(time);
while (!tryLock()) {
if (time <= 0) {
@ -216,6 +230,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void unlock() {
subscribe().awaitUninterruptibly();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
@ -265,6 +281,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public void forceUnlock() {
subscribe().awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionWriteOp();
try {
while (true) {
@ -280,6 +298,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public boolean isLocked() {
subscribe().awaitUninterruptibly();
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try {
LockValue lock = (LockValue) connection.get(getKeyName());
@ -291,6 +311,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public boolean isHeldByCurrentThread() {
subscribe().awaitUninterruptibly();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
@ -304,6 +326,8 @@ public class RedissonLock extends RedissonObject implements RLock {
@Override
public int getHoldCount() {
subscribe().awaitUninterruptibly();
LockValue currentLock = new LockValue(id, Thread.currentThread().getId());
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();

Loading…
Cancel
Save