RedissonMultiLock deadlock fixed. #467

pull/472/head
Nikita 9 years ago
parent 11eaf5d62e
commit 5274e762b5

@ -377,6 +377,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
public Future<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
public Future<Void> unlockAsync(final long threadId) {
final Promise<Void> result = newPromise();
Future<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
@ -396,7 +401,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
@ -409,7 +414,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
Boolean opStatus = future.getNow();
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
+ id + " thread-id: " + threadId);
result.setFailure(cause);
return;
}
@ -428,8 +433,12 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
public Future<Void> lockAsync(final long leaseTime, final TimeUnit unit) {
final Promise<Void> result = newPromise();
final long currentThreadId = Thread.currentThread().getId();
return lockAsync(leaseTime, unit, currentThreadId);
}
public Future<Void> lockAsync(final long leaseTime, final TimeUnit unit, final long currentThreadId) {
final Promise<Void> result = newPromise();
Future<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.addListener(new FutureListener<Long>() {
@Override
@ -525,8 +534,13 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
public Future<Boolean> tryLockAsync() {
long threadId = Thread.currentThread().getId();
return tryLockAsync(threadId);
}
public Future<Boolean> tryLockAsync(long threadId) {
final Promise<Boolean> result = newPromise();
Future<Long> future = tryLockInnerAsync(Thread.currentThread().getId());
Future<Long> future = tryLockInnerAsync(threadId);
future.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
@ -546,10 +560,15 @@ public class RedissonLock extends RedissonExpirable implements RLock {
}
public Future<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit) {
final long currentThreadId = Thread.currentThread().getId();
return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
}
public Future<Boolean> tryLockAsync(final long waitTime, final long leaseTime, final TimeUnit unit,
final long currentThreadId) {
final Promise<Boolean> result = newPromise();
final AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
final long currentThreadId = Thread.currentThread().getId();
Future<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.addListener(new FutureListener<Long>() {
@Override

@ -21,11 +21,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.redisson.RedissonLock;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
@ -74,7 +76,8 @@ public class RedissonMultiLock implements Lock {
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
Promise<Void> promise = ImmediateEventExecutor.INSTANCE.newPromise();
lock(promise, 0, leaseTime, unit);
long currentThreadId = Thread.currentThread().getId();
lock(promise, 0, leaseTime, unit, locks, currentThreadId);
promise.sync();
}
@ -84,37 +87,45 @@ public class RedissonMultiLock implements Lock {
lockInterruptibly(-1, null);
}
private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit) throws InterruptedException {
private void lock(final Promise<Void> promise, final long waitTime, final long leaseTime, final TimeUnit unit, final List<RLock> locks, final long currentThreadId) throws InterruptedException {
final AtomicInteger tryLockRequestsAmount = new AtomicInteger();
final Map<Future<Boolean>, RLock> tryLockFutures = new HashMap<Future<Boolean>, RLock>(locks.size());
FutureListener<Boolean> listener = new FutureListener<Boolean>() {
AtomicBoolean unlock = new AtomicBoolean();
AtomicReference<RLock> lockedLockHolder = new AtomicReference<RLock>();
AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
public void operationComplete(final Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
// unlock once
if (unlock.compareAndSet(false, true)) {
for (RLock lock : locks) {
lock.unlockAsync();
failed.compareAndSet(null, future.cause());
}
promise.setFailure(future.cause());
Boolean res = future.getNow();
if (res != null && !res) {
RLock lock = tryLockFutures.get(future);
lockedLockHolder.compareAndSet(null, lock);
}
if (tryLockRequestsAmount.decrementAndGet() == 0) {
if (lockedLockHolder.get() == null && failed.get() == null) {
promise.setSuccess(null);
return;
}
Boolean res = future.getNow();
// unlock once
if (!res && unlock.compareAndSet(false, true)) {
for (RLock lock : locks) {
lock.unlockAsync();
tryLockRequestsAmount.set(tryLockFutures.size());
for (RLock lock : tryLockFutures.values()) {
lock.unlockAsync().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (tryLockRequestsAmount.decrementAndGet() == 0) {
if (failed.get() != null) {
promise.setFailure(failed.get());
}
RLock lock = tryLockFutures.get(future);
lock.lockAsync().addListener(new FutureListener<Void>() {
if (lockedLockHolder.get() != null) {
final RedissonLock lockedLock = (RedissonLock) lockedLockHolder.get();
lockedLock.lockAsync(leaseTime, unit, currentThreadId).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
@ -122,12 +133,16 @@ public class RedissonMultiLock implements Lock {
return;
}
lock(promise, waitTime, leaseTime, unit);
List<RLock> newLocks = new ArrayList<RLock>(tryLockFutures.values());
newLocks.remove(lockedLock);
lock(promise, waitTime, leaseTime, unit, newLocks, currentThreadId);
}
});
}
}
}
});
}
if (!unlock.get() && tryLockRequestsAmount.decrementAndGet() == 0) {
promise.setSuccess(null);
}
}
};
@ -138,11 +153,13 @@ public class RedissonMultiLock implements Lock {
}
tryLockRequestsAmount.incrementAndGet();
Future<Boolean> future;
if (waitTime > 0 || leaseTime > 0) {
tryLockFutures.put(lock.tryLockAsync(waitTime, leaseTime, unit), lock);
future = ((RedissonLock)lock).tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
} else {
tryLockFutures.put(lock.tryLockAsync(), lock);
future = ((RedissonLock)lock).tryLockAsync(currentThreadId);
}
tryLockFutures.put(future, lock);
}
for (Future<Boolean> future : tryLockFutures.keySet()) {

@ -16,6 +16,41 @@ import org.redisson.RedisRunner.RedisProcess;
public class RedissonMultiLockTest {
@Test
public void testMultiThreads() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance1();
Config config1 = new Config();
config1.useSingleServer().setAddress("127.0.0.1:6320");
RedissonClient client = Redisson.create(config1);
RLock lock1 = client.getLock("lock1");
RLock lock2 = client.getLock("lock2");
RLock lock3 = client.getLock("lock3");
Thread t = new Thread() {
public void run() {
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
lock.unlock();
};
};
t.start();
t.join(1000);
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
lock.lock();
lock.unlock();
assertThat(redis1.stop()).isEqualTo(0);
}
@Test
public void test() throws IOException, InterruptedException {
RedisProcess redis1 = redisTestMultilockInstance1();

Loading…
Cancel
Save