Fixed - Lock expiration renewal should be canceled if unlock method failed to execute. #1602

pull/1639/head
Nikita 7 years ago
parent 583fb3cf3f
commit f5e3618b33

@ -262,7 +262,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override @Override
public RFuture<Boolean> forceUnlockAsync() { public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(); cancelExpirationRenewal(null);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads // remove stale threads
"while true do " "while true do "

@ -27,11 +27,12 @@ import java.util.concurrent.locks.Condition;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.client.RedisException;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
@ -58,12 +59,34 @@ import io.netty.util.internal.PlatformDependent;
*/ */
public class RedissonLock extends RedissonExpirable implements RLock { public class RedissonLock extends RedissonExpirable implements RLock {
public static class ExpirationEntry {
private long threadId;
private Timeout timeout;
public ExpirationEntry(long threadId, Timeout timeout) {
super();
this.threadId = threadId;
this.timeout = timeout;
}
public long getThreadId() {
return threadId;
}
public Timeout getTimeout() {
return timeout;
}
}
private static final Logger log = LoggerFactory.getLogger(RedissonLock.class); private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);
private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap(); private static final ConcurrentMap<String, ExpirationEntry> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();
protected long internalLockLeaseTime; protected long internalLockLeaseTime;
final UUID id; final UUID id;
final String entryName;
protected static final LockPubSub PUBSUB = new LockPubSub(); protected static final LockPubSub PUBSUB = new LockPubSub();
@ -74,10 +97,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId(); this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
} }
protected String getEntryName() { protected String getEntryName() {
return id + ":" + getName(); return entryName;
} }
String getChannelName() { String getChannelName() {
@ -233,15 +257,15 @@ public class RedissonLock extends RedissonExpirable implements RLock {
} }
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) { if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
task.cancel(); task.cancel();
} }
} }
void cancelExpirationRenewal() { void cancelExpirationRenewal(Long threadId) {
Timeout task = expirationRenewalMap.remove(getEntryName()); ExpirationEntry task = expirationRenewalMap.remove(getEntryName());
if (task != null) { if (task != null && (threadId == null || task.getThreadId() == threadId)) {
task.cancel(); task.getTimeout().cancel();
} }
} }
@ -365,15 +389,16 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public void unlock() { public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); try {
if (opStatus == null) { get(unlockAsync(Thread.currentThread().getId()));
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " } catch (RedisException e) {
+ id + " thread-id: " + Thread.currentThread().getId()); if (e.getCause() instanceof IllegalMonitorStateException) {
} throw (IllegalMonitorStateException)e.getCause();
if (opStatus) { } else {
cancelExpirationRenewal(); throw e;
}
} }
// Future<Void> future = unlockAsync(); // Future<Void> future = unlockAsync();
// future.awaitUninterruptibly(); // future.awaitUninterruptibly();
// if (future.isSuccess()) { // if (future.isSuccess()) {
@ -398,7 +423,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public RFuture<Boolean> forceUnlockAsync() { public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(); cancelExpirationRenewal(null);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then " "if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); "
@ -479,6 +504,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
@Override @Override
public void operationComplete(Future<Boolean> future) throws Exception { public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
@ -491,7 +517,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return; return;
} }
if (opStatus) { if (opStatus) {
cancelExpirationRenewal(); cancelExpirationRenewal(null);
} }
result.trySuccess(null); result.trySuccess(null);
} }

@ -28,9 +28,6 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub; import org.redisson.pubsub.LockPubSub;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/** /**
* Lock will be removed automatically if client disconnects. * Lock will be removed automatically if client disconnects.
* *
@ -141,7 +138,8 @@ public class RedissonReadLock extends RedissonLock implements RLock {
@Override @Override
public RFuture<Boolean> forceUnlockAsync() { public RFuture<Boolean> forceUnlockAsync() {
RFuture<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, cancelExpirationRenewal(null);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
@ -149,17 +147,6 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"end; " + "end; " +
"return 0; ", "return 0; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage); Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage);
result.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess() && future.getNow()) {
cancelExpirationRenewal();
}
}
});
return result;
} }
@Override @Override

@ -28,9 +28,6 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub; import org.redisson.pubsub.LockPubSub;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/** /**
* Lock will be removed automatically if client disconnects. * Lock will be removed automatically if client disconnects.
* *
@ -120,7 +117,8 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
@Override @Override
public RFuture<Boolean> forceUnlockAsync() { public RFuture<Boolean> forceUnlockAsync() {
RFuture<Boolean> result = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, cancelExpirationRenewal(null);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'write') then " + "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
@ -128,17 +126,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"end; " + "end; " +
"return 0; ", "return 0; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage); Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage);
result.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess() && future.getNow()) {
cancelExpirationRenewal();
}
}
});
return result;
} }
@Override @Override

Loading…
Cancel
Save