From f5e3618b330117200f483865eab485d8cae7a695 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 8 Sep 2018 17:29:37 +0300 Subject: [PATCH] Fixed - Lock expiration renewal should be canceled if unlock method failed to execute. #1602 --- .../java/org/redisson/RedissonFairLock.java | 2 +- .../main/java/org/redisson/RedissonLock.java | 62 +++++++++++++------ .../java/org/redisson/RedissonReadLock.java | 17 +---- .../java/org/redisson/RedissonWriteLock.java | 17 +---- 4 files changed, 49 insertions(+), 49 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index 747e50f61..9c5a6b0a2 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -262,7 +262,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override public RFuture forceUnlockAsync() { - cancelExpirationRenewal(); + cancelExpirationRenewal(null); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // remove stale threads "while true do " diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 1015d4782..168d51757 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -27,11 +27,12 @@ import java.util.concurrent.locks.Condition; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.client.RedisException; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; @@ -58,12 +59,34 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonLock extends RedissonExpirable implements RLock { + public static class ExpirationEntry { + + private long threadId; + private Timeout timeout; + + public ExpirationEntry(long threadId, Timeout timeout) { + super(); + this.threadId = threadId; + this.timeout = timeout; + } + + public long getThreadId() { + return threadId; + } + + public Timeout getTimeout() { + return timeout; + } + + } + private static final Logger log = LoggerFactory.getLogger(RedissonLock.class); - private static final ConcurrentMap expirationRenewalMap = PlatformDependent.newConcurrentHashMap(); + private static final ConcurrentMap expirationRenewalMap = PlatformDependent.newConcurrentHashMap(); protected long internalLockLeaseTime; final UUID id; + final String entryName; protected static final LockPubSub PUBSUB = new LockPubSub(); @@ -74,10 +97,11 @@ public class RedissonLock extends RedissonExpirable implements RLock { this.commandExecutor = commandExecutor; this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); + this.entryName = id + ":" + name; } protected String getEntryName() { - return id + ":" + getName(); + return entryName; } String getChannelName() { @@ -233,15 +257,15 @@ public class RedissonLock extends RedissonExpirable implements RLock { } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); - if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { + if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) { task.cancel(); } } - void cancelExpirationRenewal() { - Timeout task = expirationRenewalMap.remove(getEntryName()); - if (task != null) { - task.cancel(); + void cancelExpirationRenewal(Long threadId) { + ExpirationEntry task = expirationRenewalMap.remove(getEntryName()); + if (task != null && (threadId == null || task.getThreadId() == threadId)) { + task.getTimeout().cancel(); } } @@ -365,15 +389,16 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void unlock() { - Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); - if (opStatus == null) { - throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " - + id + " thread-id: " + Thread.currentThread().getId()); - } - if (opStatus) { - cancelExpirationRenewal(); + try { + get(unlockAsync(Thread.currentThread().getId())); + } catch (RedisException e) { + if (e.getCause() instanceof IllegalMonitorStateException) { + throw (IllegalMonitorStateException)e.getCause(); + } else { + throw e; + } } - + // Future future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { @@ -398,7 +423,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public RFuture forceUnlockAsync() { - cancelExpirationRenewal(); + cancelExpirationRenewal(null); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " @@ -479,6 +504,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + cancelExpirationRenewal(threadId); result.tryFailure(future.cause()); return; } @@ -491,7 +517,7 @@ public class RedissonLock extends RedissonExpirable implements RLock { return; } if (opStatus) { - cancelExpirationRenewal(); + cancelExpirationRenewal(null); } result.trySuccess(null); } diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index bee326e5c..d7907d01d 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -28,9 +28,6 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - /** * Lock will be removed automatically if client disconnects. * @@ -141,7 +138,8 @@ public class RedissonReadLock extends RedissonLock implements RLock { @Override public RFuture forceUnlockAsync() { - RFuture result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + cancelExpirationRenewal(null); + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + @@ -149,17 +147,6 @@ public class RedissonReadLock extends RedissonLock implements RLock { "end; " + "return 0; ", Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage); - - result.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess() && future.getNow()) { - cancelExpirationRenewal(); - } - } - }); - - return result; } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index cd74d7afb..ab3374380 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -28,9 +28,6 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - /** * Lock will be removed automatically if client disconnects. * @@ -120,7 +117,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock { @Override public RFuture forceUnlockAsync() { - RFuture result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + cancelExpirationRenewal(null); + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + @@ -128,17 +126,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock { "end; " + "return 0; ", Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage); - - result.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess() && future.getNow()) { - cancelExpirationRenewal(); - } - } - }); - - return result; } @Override