Feature - lockWatchdogBatchSize setting added

Improvement - locks watchdog process optimization #6303
pull/6394/head
Nikita Koksharov 2 weeks ago
parent 1ce49b7a2e
commit 597d604a37

@ -34,6 +34,7 @@ import org.redisson.redisnode.RedissonClusterNodes;
import org.redisson.redisnode.RedissonMasterSlaveNodes;
import org.redisson.redisnode.RedissonSentinelMasterSlaveNodes;
import org.redisson.redisnode.RedissonSingleNode;
import org.redisson.renewal.LockRenewalScheduler;
import org.redisson.transaction.RedissonTransaction;
import java.time.Duration;
@ -71,6 +72,8 @@ public final class Redisson implements RedissonClient {
commandExecutor = connectionManager.createCommandExecutor(objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
connectionManager.getServiceManager().register(new LockRenewalScheduler(commandExecutor));
}
public EvictionScheduler getEvictionScheduler() {

@ -15,8 +15,6 @@
*/
package org.redisson;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.RedisException;
@ -30,11 +28,15 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.renewal.LockRenewalScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
/**
@ -45,66 +47,18 @@ import java.util.concurrent.locks.Condition;
*/
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
static class ExpirationEntry {
private final Queue<Long> threadsQueue = new ConcurrentLinkedQueue<>();
private final Map<Long, Integer> threadIds = new ConcurrentHashMap<>();
private volatile Timeout timeout;
ExpirationEntry() {
super();
}
public void addThreadId(long threadId) {
threadIds.compute(threadId, (t, counter) -> {
counter = Optional.ofNullable(counter).orElse(0);
counter++;
threadsQueue.add(threadId);
return counter;
});
}
public boolean hasNoThreads() {
return threadsQueue.isEmpty();
}
public Long getFirstThreadId() {
return threadsQueue.peek();
}
public void removeThreadId(long threadId) {
threadIds.computeIfPresent(threadId, (t, counter) -> {
counter--;
if (counter == 0) {
threadsQueue.remove(threadId);
return null;
}
return counter;
});
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
private static final Logger log = LoggerFactory.getLogger(RedissonBaseLock.class);
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
final long internalLockLeaseTime;
final String id;
final String entryName;
final LockRenewalScheduler renewalScheduler;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.id = getServiceManager().getId();
this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
this.renewalScheduler = getServiceManager().getRenewalScheduler();
}
protected String getEntryName() {
@ -115,94 +69,12 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc
return id + ":" + threadId;
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
if (getServiceManager().isShuttingDown(e)) {
return;
}
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null, null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected final void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
entry.addThreadId(threadId);
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
try {
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId, null);
}
}
}
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
protected void scheduleExpirationRenewal(long threadId) {
renewalScheduler.renewLock(getRawName(), threadId, getLockName(threadId));
}
protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
renewalScheduler.cancelLockRenewal(getRawName(), threadId);
}
protected final <T> RFuture<T> evalWriteSyncedNoRetryAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {

@ -101,11 +101,6 @@ public class RedissonFasterMultiLock extends RedissonBaseLock {
}
@Override
public String getName() {
throw new UnsupportedOperationException();
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
lock(leaseTime, unit, true);
@ -321,39 +316,13 @@ public class RedissonFasterMultiLock extends RedissonBaseLock {
}
@Override
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
List<Object> params = new ArrayList<>();
params.add(getLockName(threadId));
params.add(internalLockLeaseTime);
params.add(System.currentTimeMillis());
params.addAll(fields);
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local leaseTime = tonumber(ARGV[2]);" +
"local currentTime = tonumber(ARGV[3]);" +
"local currentThread = ARGV[1];" +
"if (redis.call('exists',KEYS[1]) > 0) then" +
" local newExpireTime = leaseTime + currentTime;" +
" for i=4, #ARGV, 1 do " +
" local lockThread = redis.call('hget', KEYS[1], ARGV[i]);" +
" if(lockThread ~= false and lockThread == currentThread) then " +
" local expireFieldName = ARGV[i]..':'..lockThread..':expire_time';" +
" local expireTime = redis.call('hget', KEYS[1], expireFieldName);" +
" if(tonumber(expireTime) < newExpireTime) then " +
" redis.call('hset', KEYS[1],expireFieldName, newExpireTime);" +
" end;" +
" else" +
" return 0;" +
" end;" +
" end; " +
" local expireTime = redis.call('pttl',KEYS[1]);" +
" if(tonumber(expireTime) < tonumber(leaseTime)) then " +
" redis.call('pexpire',KEYS[1], leaseTime);" +
" end;" +
" return 1;" +
"end;" +
"return 0;",
Collections.singletonList(getRawName()),
params.toArray());
protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) {
renewalScheduler.cancelFastMultilockRenewl(getRawName(), threadId);
}
@Override
protected void scheduleExpirationRenewal(long threadId) {
renewalScheduler.renewFastMultiLock(getRawName(), threadId, getLockName(threadId), fields);
}
private <T> RFuture<T> tryLockOnceInnerAsync(long leaseTime, TimeUnit unit, RedisStrictCommand<T> command, long threadId) {

@ -24,7 +24,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@ -143,36 +142,20 @@ public class RedissonReadLock extends RedissonLock implements RLock {
protected String getKeyPrefix(long threadId, String timeoutPrefix) {
return timeoutPrefix.split(":" + getLockName(threadId))[0];
}
@Override
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
protected void scheduleExpirationRenewal(long threadId) {
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
"if (counter ~= false) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
"return 0;",
Arrays.<Object>asList(getRawName(), keyPrefix),
internalLockLeaseTime, getLockName(threadId));
renewalScheduler.renewReadLock(getRawName(), threadId, getLockName(threadId), keyPrefix);
}
@Override
protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) {
super.cancelExpirationRenewal(threadId, unlockResult);
renewalScheduler.cancelReadLockRenewal(getRawName(), threadId);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();

@ -15,12 +15,6 @@
*/
package org.redisson;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.codec.LongCodec;
@ -30,6 +24,10 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
/**
* Lock will be removed automatically if client disconnects.
*
@ -123,18 +121,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
throw new UnsupportedOperationException();
}
@Override
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
CompletionStage<Boolean> f = super.renewExpirationAsync(threadId);
return f.thenCompose(r -> {
if (!r) {
RedissonReadLock lock = new RedissonReadLock(commandExecutor, getRawName());
return lock.renewExpirationAsync(threadId);
}
return CompletableFuture.completedFuture(r);
});
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null, null);

@ -73,6 +73,8 @@ public class Config {
private long lockWatchdogTimeout = 30 * 1000;
private int lockWatchdogBatchSize = 100;
private boolean checkLockSyncedSlaves = true;
private long slavesSyncTimeout = 1000;
@ -122,6 +124,7 @@ public class Config {
setUseScriptCache(oldConf.isUseScriptCache());
setKeepPubSubOrder(oldConf.isKeepPubSubOrder());
setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
setLockWatchdogBatchSize(oldConf.getLockWatchdogBatchSize());
setCheckLockSyncedSlaves(oldConf.isCheckLockSyncedSlaves());
setSlavesSyncTimeout(oldConf.getSlavesSyncTimeout());
setNettyThreads(oldConf.getNettyThreads());
@ -539,6 +542,24 @@ public class Config {
return lockWatchdogTimeout;
}
/**
* This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
* Defines amount of locks utilized in a single lock watchdog execution.
* <p>
* Default is 100
*
* @param lockWatchdogBatchSize amount of locks used by a single lock watchdog execution
* @return config
*/
public Config setLockWatchdogBatchSize(int lockWatchdogBatchSize) {
this.lockWatchdogBatchSize = lockWatchdogBatchSize;
return this;
}
public int getLockWatchdogBatchSize() {
return lockWatchdogBatchSize;
}
/**
* Defines whether to check synchronized slaves amount
* with actual slaves amount after lock acquisition.

@ -63,6 +63,7 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RandomXoshiro256PlusPlus;
import org.redisson.misc.RedisURI;
import org.redisson.remote.ResponseEntry;
import org.redisson.renewal.LockRenewalScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -151,6 +152,8 @@ public final class ServiceManager {
private final QueueTransferService queueTransferService = new QueueTransferService();
private LockRenewalScheduler renewalScheduler;
public ServiceManager(MasterSlaveServersConfig config, Config cfg) {
Version.logVersion();
@ -666,4 +669,11 @@ public final class ServiceManager {
}
});
public void register(LockRenewalScheduler renewalScheduler) {
this.renewalScheduler = renewalScheduler;
}
public LockRenewalScheduler getRenewalScheduler() {
return renewalScheduler;
}
}

@ -0,0 +1,37 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import java.util.Collection;
/**
*
* @author Nikita Koksharov
*
*/
public class FastMultilockEntry extends LockEntry {
private final Collection<String> fields;
public FastMultilockEntry(Collection<String> fields) {
this.fields = fields;
}
public Collection<String> getFields() {
return fields;
}
}

@ -0,0 +1,121 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
*
* @author Nikita Koksharov
*
*/
public class FastMultilockTask extends LockTask {
public FastMultilockTask(long internalLockLeaseTime, CommandAsyncExecutor executor) {
super(internalLockLeaseTime, executor, 1);
}
@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
if (!iter.hasNext()) {
return CompletableFuture.completedFuture(null);
}
Map<String, Long> name2lockName = new HashMap<>();
List<Object> args = new ArrayList<>();
args.add(internalLockLeaseTime);
args.add(System.currentTimeMillis());
List<String> keys = new ArrayList<>(chunkSize);
while (iter.hasNext()) {
String key = iter.next();
FastMultilockEntry entry = (FastMultilockEntry) name2entry.get(key);
if (entry == null) {
continue;
}
Long threadId = entry.getFirstThreadId();
if (threadId == null) {
continue;
}
keys.add(key);
args.add(entry.getLockName(threadId));
args.addAll(entry.getFields());
name2lockName.put(key, threadId);
if (keys.size() == chunkSize) {
break;
}
}
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
String firstName = keys.get(0);
CompletionStage<Boolean> f = executor.syncedEval(firstName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local leaseTime = tonumber(ARGV[1]);" +
"local currentTime = tonumber(ARGV[2]);" +
"local currentThread = ARGV[3];" +
"if (redis.call('exists',KEYS[1]) > 0) then" +
" local newExpireTime = leaseTime + currentTime;" +
" for i=4, #ARGV, 1 do " +
" local lockThread = redis.call('hget', KEYS[1], ARGV[i]);" +
" if(lockThread ~= false and lockThread == currentThread) then " +
" local expireFieldName = ARGV[i]..':'..lockThread..':expire_time';" +
" local expireTime = redis.call('hget', KEYS[1], expireFieldName);" +
" if(tonumber(expireTime) < newExpireTime) then " +
" redis.call('hset', KEYS[1],expireFieldName, newExpireTime);" +
" end;" +
" else" +
" return 0;" +
" end;" +
" end; " +
" local expireTime = redis.call('pttl',KEYS[1]);" +
" if(tonumber(expireTime) < tonumber(leaseTime)) then " +
" redis.call('pexpire',KEYS[1], leaseTime);" +
" end;" +
" return 1;" +
"end;" +
"return 0;",
Collections.singletonList(firstName),
args.toArray());
return f.thenCompose(exists -> {
if (!exists) {
cancelExpirationRenewal(firstName, name2lockName.get(firstName));
}
return renew(iter, chunkSize);
});
}
public void add(String rawName, String lockName, long threadId, Collection<String> fields) {
FastMultilockEntry entry = new FastMultilockEntry(fields);
entry.addThreadId(threadId, lockName);
add(rawName, lockName, threadId, entry);
}
}

@ -0,0 +1,73 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
*
* @author Nikita Koksharov
*
*/
public class LockEntry {
final Queue<Long> threadsQueue = new ConcurrentLinkedQueue<>();
final Map<Long, Integer> threadId2counter = new ConcurrentHashMap<>();
final Map<Long, String> threadId2lockName = new ConcurrentHashMap<>();
LockEntry() {
super();
}
public String getLockName(long threadId) {
return threadId2lockName.get(threadId);
}
public void addThreadId(long threadId, String lockName) {
threadId2counter.compute(threadId, (t, counter) -> {
counter = Optional.ofNullable(counter).orElse(0);
counter++;
threadsQueue.add(threadId);
return counter;
});
threadId2lockName.putIfAbsent(threadId, lockName);
}
public boolean hasNoThreads() {
return threadsQueue.isEmpty();
}
public Long getFirstThreadId() {
return threadsQueue.peek();
}
public void removeThreadId(long threadId) {
threadId2counter.computeIfPresent(threadId, (t, counter) -> {
counter--;
if (counter == 0) {
threadsQueue.remove(threadId);
threadId2lockName.remove(threadId);
return null;
}
return counter;
});
}
}

@ -0,0 +1,83 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import org.redisson.command.CommandAsyncExecutor;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
/**
*
* @author Nikita Koksharov
*
*/
public final class LockRenewalScheduler {
private final AtomicReference<LockTask> reference = new AtomicReference<>();
private final AtomicReference<FastMultilockTask> multilockReference = new AtomicReference<>();
private final AtomicReference<ReadLockTask> readLockReference = new AtomicReference<>();
private final CommandAsyncExecutor executor;
private final int batchSize;
private final long internalLockLeaseTime;
public LockRenewalScheduler(CommandAsyncExecutor executor) {
this.executor = executor;
this.internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
this.batchSize = executor.getServiceManager().getCfg().getLockWatchdogBatchSize();
}
public void renewReadLock(String name, Long threadId, String lockName, String keyPrefix) {
readLockReference.compareAndSet(null, new ReadLockTask(internalLockLeaseTime, executor, batchSize));
ReadLockTask task = readLockReference.get();
task.add(name, lockName, threadId, keyPrefix);
}
public void renewFastMultiLock(String name, Long threadId, String lockName, Collection<String> fields) {
multilockReference.compareAndSet(null, new FastMultilockTask(internalLockLeaseTime, executor));
FastMultilockTask task = multilockReference.get();
task.add(name, lockName, threadId, fields);
}
public void renewLock(String name, Long threadId, String lockName) {
reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize));
LockTask task = reference.get();
task.add(name, lockName, threadId);
}
public void cancelReadLockRenewal(String name, Long threadId) {
ReadLockTask rtask = readLockReference.get();
if (rtask != null) {
rtask.cancelExpirationRenewal(name, threadId);
}
}
public void cancelFastMultilockRenewl(String name, Long threadId) {
FastMultilockTask mtask = multilockReference.get();
if (mtask != null) {
mtask.cancelExpirationRenewal(name, threadId);
}
}
public void cancelLockRenewal(String name, Long threadId) {
LockTask task = reference.get();
if (task != null) {
task.cancelExpirationRenewal(name, threadId);
}
}
}

@ -0,0 +1,101 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.ContainsDecoder;
import org.redisson.command.CommandAsyncExecutor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
*
* @author Nikita Koksharov
*
*/
public class LockTask extends RenewalTask {
public LockTask(long internalLockLeaseTime,
CommandAsyncExecutor executor, int chunkSize) {
super(internalLockLeaseTime, executor, chunkSize);
}
@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
if (!iter.hasNext()) {
return CompletableFuture.completedFuture(null);
}
Map<String, Long> name2threadId = new HashMap<>(chunkSize);
List<Object> args = new ArrayList<>(chunkSize + 1);
args.add(internalLockLeaseTime);
List<String> keys = new ArrayList<>(chunkSize);
while (iter.hasNext()) {
String key = iter.next();
LockEntry entry = name2entry.get(key);
Long threadId = entry.getFirstThreadId();
if (threadId == null) {
continue;
}
keys.add(key);
args.add(entry.getLockName(threadId));
name2threadId.put(key, threadId);
if (keys.size() == chunkSize) {
break;
}
}
String firstName = keys.get(0);
CompletionStage<List<String>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
"local result = {} " +
"for i = 1, #KEYS, 1 do " +
"if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " +
"redis.call('pexpire', KEYS[i], ARGV[1]); " +
"table.insert(result, 1); " +
"else " +
"table.insert(result, 0); " +
"end; " +
"end; " +
"return result;",
new ArrayList<>(keys),
args.toArray());
return f.thenCompose(existingNames -> {
keys.removeAll(existingNames);
for (String key : keys) {
cancelExpirationRenewal(key, name2threadId.get(key));
}
return renew(iter, chunkSize);
});
}
public void add(String rawName, String lockName, long threadId) {
LockEntry entry = new LockEntry();
entry.addThreadId(threadId, lockName);
add(rawName, lockName, threadId, entry);
}
}

@ -0,0 +1,60 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author Nikita Koksharov
*
*/
public class ReadLockEntry extends LockEntry {
private final Map<Long, String> threadId2keyPrefix = new ConcurrentHashMap<>();
public String getKeyPrefix(Long threadId) {
return threadId2keyPrefix.get(threadId);
}
public void addThreadId(long threadId, String lockName, String keyPrefix) {
threadId2counter.compute(threadId, (t, counter) -> {
counter = Optional.ofNullable(counter).orElse(0);
counter++;
threadsQueue.add(threadId);
return counter;
});
threadId2lockName.putIfAbsent(threadId, lockName);
threadId2keyPrefix.putIfAbsent(threadId, keyPrefix);
}
@Override
public void removeThreadId(long threadId) {
threadId2counter.computeIfPresent(threadId, (t, counter) -> {
counter--;
if (counter == 0) {
threadsQueue.remove(threadId);
threadId2lockName.remove(threadId);
threadId2keyPrefix.remove(threadId);
return null;
}
return counter;
});
}
}

@ -0,0 +1,136 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.ContainsDecoder;
import org.redisson.command.CommandAsyncExecutor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
*
* @author Nikita Koksharov
*
*/
public class ReadLockTask extends LockTask {
public ReadLockTask(long internalLockLeaseTime, CommandAsyncExecutor executor, int chunkSize) {
super(internalLockLeaseTime, executor, chunkSize);
}
@Override
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
if (!iter.hasNext()) {
return CompletableFuture.completedFuture(null);
}
Map<String, Long> name2lockName = new HashMap<>();
List<Object> args = new ArrayList<>();
args.add(internalLockLeaseTime);
List<Object> keys = new ArrayList<>(chunkSize);
List<Object> keysArgs = new ArrayList<>(chunkSize);
while (iter.hasNext()) {
String key = iter.next();
ReadLockEntry entry = (ReadLockEntry) name2entry.get(key);
if (entry == null) {
continue;
}
Long threadId = entry.getFirstThreadId();
if (threadId == null) {
continue;
}
keys.add(key);
keysArgs.add(key);
keysArgs.add(entry.getKeyPrefix(threadId));
args.add(entry.getLockName(threadId));
name2lockName.put(key, threadId);
if (keys.size() == chunkSize) {
break;
}
}
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
String firstName = keys.get(0).toString();
CompletionStage<List<Object>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
"local result = {} " +
"local j = 1 " +
"for i = 1, #KEYS, 2 do " +
"j = j + 1; " +
"local counter = redis.call('hget', KEYS[i], ARGV[j]); " +
"if (counter ~= false) then " +
"redis.call('pexpire', KEYS[i], ARGV[1]); " +
"if (redis.call('hlen', KEYS[i]) > 1) then " +
"local keys = redis.call('hkeys', KEYS[i]); " +
"for n, key in ipairs(keys) do " +
"counter = tonumber(redis.call('hget', KEYS[i], key)); " +
"if type(counter) == 'number' then " +
"for i=counter, 1, -1 do " +
"redis.call('pexpire', KEYS[i+1] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"table.insert(result, 1); " +
"else " +
"table.insert(result, 0); " +
"end; " +
"end; " +
"return result;",
keysArgs,
args.toArray());
return f.thenCompose(existingNames -> {
keys.removeAll(existingNames);
for (Object k : keys) {
String key = k.toString();
cancelExpirationRenewal(key, name2lockName.get(key));
}
return renew(iter, chunkSize);
});
}
public void add(String rawName, String lockName, long threadId, String keyPrefix) {
addSlotName(rawName);
ReadLockEntry entry = new ReadLockEntry();
entry.addThreadId(threadId, lockName, keyPrefix);
ReadLockEntry oldEntry = (ReadLockEntry) name2entry.putIfAbsent(rawName, entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId, lockName, keyPrefix);
} else {
if (tryRun()) {
schedule();
}
}
}
}

@ -0,0 +1,182 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.renewal;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author Nikita Koksharov
*
*/
abstract class RenewalTask implements TimerTask {
private final Logger log = LoggerFactory.getLogger(getClass());
final CommandAsyncExecutor executor;
AtomicBoolean running = new AtomicBoolean();
final Map<Integer, Set<String>> slot2names = new ConcurrentHashMap<>();
final Map<String, LockEntry> name2entry = new ConcurrentHashMap<>();
final long internalLockLeaseTime;
final int chunkSize;
boolean tryRun() {
return running.compareAndSet(false, true);
}
void stop() {
running.set(false);
}
public void schedule() {
if (!running.get()) {
return;
}
long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}
RenewalTask(long internalLockLeaseTime,
CommandAsyncExecutor executor, int chunkSize) {
this.executor = executor;
this.internalLockLeaseTime = internalLockLeaseTime;
this.chunkSize = chunkSize;
}
final CompletionStage<Void> execute() {
if (name2entry.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
if (!executor.getServiceManager().getCfg().isClusterConfig()) {
return renew(name2entry.keySet().iterator(), chunkSize);
}
return renewSlots(slot2names.values().iterator(), chunkSize);
}
private CompletionStage<Void> renewSlots(Iterator<Set<String>> iter, int chunkSize) {
if (!iter.hasNext()) {
return CompletableFuture.completedFuture(null);
}
CompletionStage<Void> c = renew(iter.next().iterator(), chunkSize);
return c.thenCompose(r -> renewSlots(iter, chunkSize));
}
abstract CompletionStage<Void> renew(Iterator<String> iter, int chunkSize);
void cancelExpirationRenewal(String name, Long threadId) {
LockEntry task = name2entry.get(name);
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
name2entry.remove(name);
if (executor.getServiceManager().getCfg().isClusterConfig()) {
int slot = executor.getConnectionManager().calcSlot(name);
slot2names.computeIfPresent(slot, (k, v) -> {
v.remove(name);
if (v.isEmpty()) {
return null;
}
return v;
});
}
if (!name2entry.isEmpty()) {
return;
}
stop();
if (!name2entry.isEmpty() && tryRun()) {
schedule();
}
}
}
final void add(String rawName, String lockName, long threadId, LockEntry entry) {
addSlotName(rawName);
LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId, lockName);
} else {
if (tryRun()) {
schedule();
}
}
}
void addSlotName(String rawName) {
if (!executor.getServiceManager().getCfg().isClusterConfig()) {
return;
}
int slot = executor.getConnectionManager().calcSlot(rawName);
slot2names.compute(slot, (k, v) -> {
if (v == null) {
v = Collections.newSetFromMap(new ConcurrentHashMap<>());
}
v.add(rawName);
return v;
});
}
@Override
public void run(Timeout timeout) {
if (executor.getServiceManager().isShuttingDown()) {
return;
}
CompletionStage<Void> future = execute();
future.whenComplete((result, e) -> {
if (e != null) {
log.error("Can't update locks {} expiration", name2entry.keySet(), e);
schedule();
return;
}
schedule();
});
}
}

@ -1,7 +1,10 @@
package org.redisson;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.testcontainers.containers.GenericContainer;
@ -13,15 +16,90 @@ public class RedissonLockExpirationRenewalTest extends RedisDockerTest {
private static final String LOCK_KEY = "LOCK_KEY";
public static final long LOCK_WATCHDOG_TIMEOUT = 1_000L;
@Test
public void testExpirationRenewalIsWorkingAfterTimeout() throws InterruptedException {
GenericContainer<?> redis = createRedis();
RedissonClient redisson;
GenericContainer<?> redis;
@BeforeEach
public void beforeEachTest() {
redis = createRedis();
redis.start();
Config c = createConfig(redis);
c.setLockWatchdogTimeout(LOCK_WATCHDOG_TIMEOUT);
RedissonClient redisson = Redisson.create(c);
c.setLockWatchdogBatchSize(50);
redisson = Redisson.create(c);
}
@AfterEach
public void afterEachTest() {
redisson.shutdown();
redis.stop();
}
@Test
public void testWriteLockAfterTimeout() throws InterruptedException {
RReadWriteLock rw = redisson.getReadWriteLock(LOCK_KEY);
RLock lock = rw.writeLock();
lock.lock();
try {
// force expiration renewal error
restart(redis);
// wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally {
assertThatThrownBy(() -> {
lock.unlock();
}).isInstanceOf(IllegalMonitorStateException.class);
}
RReadWriteLock lock2 = redisson.getReadWriteLock(LOCK_KEY);
lock2.writeLock().lock();
try {
// wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally {
lock2.writeLock().unlock();
}
Thread.sleep(1000);
lock2.writeLock().lock();
try {
// wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally {
lock2.writeLock().unlock();
}
}
@Test
public void testReadLockAfterTimeout() throws InterruptedException {
RReadWriteLock rw = redisson.getReadWriteLock(LOCK_KEY);
RLock lock = rw.readLock();
lock.lock();
try {
// force expiration renewal error
restart(redis);
// wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally {
assertThatThrownBy(() -> {
lock.unlock();
}).isInstanceOf(IllegalMonitorStateException.class);
}
RReadWriteLock lock2 = redisson.getReadWriteLock(LOCK_KEY);
lock2.readLock().lock();
try {
// wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally {
lock2.readLock().unlock();
}
}
@Test
public void testLockAfterTimeout() throws InterruptedException {
RLock lock = redisson.getLock(LOCK_KEY);
lock.lock();
try {
@ -41,9 +119,6 @@ public class RedissonLockExpirationRenewalTest extends RedisDockerTest {
} finally {
lock2.unlock();
}
redisson.shutdown();
redis.stop();
}
}

Loading…
Cancel
Save