Merge branch 'master' into 3.0.0

pull/985/head
Nikita 8 years ago
commit 56625f461a

@ -56,9 +56,8 @@ public class RedissonLock extends RedissonExpirable implements RLock {
private static final Logger log = LoggerFactory.getLogger(RedissonLock.class);
public static final long LOCK_EXPIRATION_INTERVAL_SECONDS = 30;
private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();
protected long internalLockLeaseTime = TimeUnit.SECONDS.toMillis(LOCK_EXPIRATION_INTERVAL_SECONDS);
protected long internalLockLeaseTime;
final UUID id;
@ -70,6 +69,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
protected String getEntryName() {
@ -149,7 +149,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -171,7 +171,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {

@ -89,6 +89,8 @@ public class Config {
private EventLoopGroup eventLoopGroup;
private long lockWatchdogTimeout = 30 * 1000;
public Config() {
}
@ -101,6 +103,7 @@ public class Config {
oldConf.setCodec(new JsonJacksonCodec());
}
setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
setNettyThreads(oldConf.getNettyThreads());
setThreads(oldConf.getThreads());
setCodec(oldConf.getCodec());
@ -558,6 +561,25 @@ public class Config {
return eventLoopGroup;
}
/**
* Works only if lock has been acquired without leaseTimeout parameter definition.
* Lock will be expired after <code>lockWatchdogTimeout</code> if watchdog
* didn't extend it to next <code>lockWatchdogTimeout</code> time interval.
* <p>
* This prevents against infinity locked locks due to Redisson client crush or
* any other reason when lock can't be released in proper way.
*
* @param lockWatchdogTimeout timeout in milliseconds
* @return config
*/
public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
this.lockWatchdogTimeout = lockWatchdogTimeout;
return this;
}
public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
}
/**
* Read config object stored in JSON format from <code>String</code>
*

@ -13,6 +13,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RLock;
import com.jayway.awaitility.Awaitility;
public class RedissonFairLockTest extends BaseConcurrentTest {
@Test
@ -141,8 +143,8 @@ public class RedissonFairLockTest extends BaseConcurrentTest {
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
RLock lock = redisson.getFairLock("lock");
Thread.sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS + 1));
Assert.assertFalse("Transient lock has not expired automatically", lock.isLocked());
Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@Test

@ -12,6 +12,8 @@ import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import com.jayway.awaitility.Awaitility;
public class RedissonLockTest extends BaseConcurrentTest {
@Test
@ -97,8 +99,8 @@ public class RedissonLockTest extends BaseConcurrentTest {
RLock lock = redisson.getLock("lock");
t.join();
r.shutdown();
Thread.sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS));
Assert.assertFalse("Transient lock has not expired automatically", lock.isLocked());
Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock.isLocked());
}
@Test

@ -15,6 +15,8 @@ import org.junit.Test;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import com.jayway.awaitility.Awaitility;
public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
@ -317,8 +319,7 @@ public class RedissonReadWriteLockTest extends BaseConcurrentTest {
});
RReadWriteLock lock1 = redisson.getReadWriteLock("lock");
Thread.sleep(TimeUnit.SECONDS.toMillis(RedissonLock.LOCK_EXPIRATION_INTERVAL_SECONDS + 1));
Assert.assertFalse("Transient lock expired automatically", lock1.writeLock().isLocked());
Awaitility.await().atMost(redisson.getConfig().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS).until(() -> !lock1.writeLock().isLocked());
}
@Test

@ -1,5 +1,7 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
@ -10,12 +12,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Assert;
import org.junit.Test;
import static org.redisson.BaseTest.createConfig;
import static org.redisson.BaseTest.createInstance;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;

Loading…
Cancel
Save