diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index e68ef3654..559f3ec27 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -34,6 +34,7 @@ import org.redisson.redisnode.RedissonClusterNodes; import org.redisson.redisnode.RedissonMasterSlaveNodes; import org.redisson.redisnode.RedissonSentinelMasterSlaveNodes; import org.redisson.redisnode.RedissonSingleNode; +import org.redisson.renewal.LockRenewalScheduler; import org.redisson.transaction.RedissonTransaction; import java.time.Duration; @@ -71,6 +72,8 @@ public final class Redisson implements RedissonClient { commandExecutor = connectionManager.createCommandExecutor(objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT); evictionScheduler = new EvictionScheduler(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor); + + connectionManager.getServiceManager().register(new LockRenewalScheduler(commandExecutor)); } public EvictionScheduler getEvictionScheduler() { diff --git a/redisson/src/main/java/org/redisson/RedissonBaseLock.java b/redisson/src/main/java/org/redisson/RedissonBaseLock.java index 4abe6dded..ad2866d5d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseLock.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseLock.java @@ -15,8 +15,6 @@ */ package org.redisson; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.client.RedisException; @@ -30,11 +28,15 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.CompletableFutureWrapper; +import org.redisson.renewal.LockRenewalScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.*; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; /** @@ -45,66 +47,18 @@ import java.util.concurrent.locks.Condition; */ public abstract class RedissonBaseLock extends RedissonExpirable implements RLock { - static class ExpirationEntry { - - private final Queue threadsQueue = new ConcurrentLinkedQueue<>(); - private final Map threadIds = new ConcurrentHashMap<>(); - private volatile Timeout timeout; - - ExpirationEntry() { - super(); - } - - public void addThreadId(long threadId) { - threadIds.compute(threadId, (t, counter) -> { - counter = Optional.ofNullable(counter).orElse(0); - counter++; - threadsQueue.add(threadId); - return counter; - }); - } - - public boolean hasNoThreads() { - return threadsQueue.isEmpty(); - } - - public Long getFirstThreadId() { - return threadsQueue.peek(); - } - - public void removeThreadId(long threadId) { - threadIds.computeIfPresent(threadId, (t, counter) -> { - counter--; - if (counter == 0) { - threadsQueue.remove(threadId); - return null; - } - return counter; - }); - } - - public void setTimeout(Timeout timeout) { - this.timeout = timeout; - } - public Timeout getTimeout() { - return timeout; - } - - } - private static final Logger log = LoggerFactory.getLogger(RedissonBaseLock.class); - private static final ConcurrentMap EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>(); - final long internalLockLeaseTime; - final String id; final String entryName; + final LockRenewalScheduler renewalScheduler; + public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.id = getServiceManager().getId(); - this.internalLockLeaseTime = getServiceManager().getCfg().getLockWatchdogTimeout(); this.entryName = id + ":" + name; + this.renewalScheduler = getServiceManager().getRenewalScheduler(); } protected String getEntryName() { @@ -115,94 +69,12 @@ public abstract class RedissonBaseLock extends RedissonExpirable implements RLoc return id + ":" + threadId; } - private void renewExpiration() { - ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); - if (ee == null) { - return; - } - - Timeout task = getServiceManager().newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); - if (ent == null) { - return; - } - Long threadId = ent.getFirstThreadId(); - if (threadId == null) { - return; - } - - CompletionStage future = renewExpirationAsync(threadId); - future.whenComplete((res, e) -> { - if (e != null) { - if (getServiceManager().isShuttingDown(e)) { - return; - } - - log.error("Can't update lock {} expiration", getRawName(), e); - EXPIRATION_RENEWAL_MAP.remove(getEntryName()); - return; - } - - if (res) { - // reschedule itself - renewExpiration(); - } else { - cancelExpirationRenewal(null, null); - } - }); - } - }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); - - ee.setTimeout(task); - } - - protected final void scheduleExpirationRenewal(long threadId) { - ExpirationEntry entry = new ExpirationEntry(); - entry.addThreadId(threadId); - ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); - if (oldEntry != null) { - oldEntry.addThreadId(threadId); - } else { - try { - renewExpiration(); - } finally { - if (Thread.currentThread().isInterrupted()) { - cancelExpirationRenewal(threadId, null); - } - } - } - } - - protected CompletionStage renewExpirationAsync(long threadId) { - return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - "return 1; " + - "end; " + - "return 0;", - Collections.singletonList(getRawName()), - internalLockLeaseTime, getLockName(threadId)); + protected void scheduleExpirationRenewal(long threadId) { + renewalScheduler.renewLock(getRawName(), threadId, getLockName(threadId)); } protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) { - ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); - if (task == null) { - return; - } - - if (threadId != null) { - task.removeThreadId(threadId); - } - - if (threadId == null || task.hasNoThreads()) { - Timeout timeout = task.getTimeout(); - if (timeout != null) { - timeout.cancel(); - } - EXPIRATION_RENEWAL_MAP.remove(getEntryName()); - } + renewalScheduler.cancelLockRenewal(getRawName(), threadId); } protected final RFuture evalWriteSyncedNoRetryAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { diff --git a/redisson/src/main/java/org/redisson/RedissonFasterMultiLock.java b/redisson/src/main/java/org/redisson/RedissonFasterMultiLock.java index 3dda4b822..5224aef12 100644 --- a/redisson/src/main/java/org/redisson/RedissonFasterMultiLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFasterMultiLock.java @@ -101,11 +101,6 @@ public class RedissonFasterMultiLock extends RedissonBaseLock { } - @Override - public String getName() { - throw new UnsupportedOperationException(); - } - @Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { lock(leaseTime, unit, true); @@ -321,39 +316,13 @@ public class RedissonFasterMultiLock extends RedissonBaseLock { } @Override - protected CompletionStage renewExpirationAsync(long threadId) { - List params = new ArrayList<>(); - params.add(getLockName(threadId)); - params.add(internalLockLeaseTime); - params.add(System.currentTimeMillis()); - params.addAll(fields); - return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local leaseTime = tonumber(ARGV[2]);" + - "local currentTime = tonumber(ARGV[3]);" + - "local currentThread = ARGV[1];" + - "if (redis.call('exists',KEYS[1]) > 0) then" + - " local newExpireTime = leaseTime + currentTime;" + - " for i=4, #ARGV, 1 do " + - " local lockThread = redis.call('hget', KEYS[1], ARGV[i]);" + - " if(lockThread ~= false and lockThread == currentThread) then " + - " local expireFieldName = ARGV[i]..':'..lockThread..':expire_time';" + - " local expireTime = redis.call('hget', KEYS[1], expireFieldName);" + - " if(tonumber(expireTime) < newExpireTime) then " + - " redis.call('hset', KEYS[1],expireFieldName, newExpireTime);" + - " end;" + - " else" + - " return 0;" + - " end;" + - " end; " + - " local expireTime = redis.call('pttl',KEYS[1]);" + - " if(tonumber(expireTime) < tonumber(leaseTime)) then " + - " redis.call('pexpire',KEYS[1], leaseTime);" + - " end;" + - " return 1;" + - "end;" + - "return 0;", - Collections.singletonList(getRawName()), - params.toArray()); + protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) { + renewalScheduler.cancelFastMultilockRenewl(getRawName(), threadId); + } + + @Override + protected void scheduleExpirationRenewal(long threadId) { + renewalScheduler.renewFastMultiLock(getRawName(), threadId, getLockName(threadId), fields); } private RFuture tryLockOnceInnerAsync(long leaseTime, TimeUnit unit, RedisStrictCommand command, long threadId) { diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index 173d6bd53..efa77f4f8 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -24,7 +24,6 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; import java.util.Arrays; -import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -143,36 +142,20 @@ public class RedissonReadLock extends RedissonLock implements RLock { protected String getKeyPrefix(long threadId, String timeoutPrefix) { return timeoutPrefix.split(":" + getLockName(threadId))[0]; } - + @Override - protected CompletionStage renewExpirationAsync(long threadId) { + protected void scheduleExpirationRenewal(long threadId) { String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); - - return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local counter = redis.call('hget', KEYS[1], ARGV[2]); " + - "if (counter ~= false) then " + - "redis.call('pexpire', KEYS[1], ARGV[1]); " + - - "if (redis.call('hlen', KEYS[1]) > 1) then " + - "local keys = redis.call('hkeys', KEYS[1]); " + - "for n, key in ipairs(keys) do " + - "counter = tonumber(redis.call('hget', KEYS[1], key)); " + - "if type(counter) == 'number' then " + - "for i=counter, 1, -1 do " + - "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + - "end; " + - "end; " + - "end; " + - "end; " + - - "return 1; " + - "end; " + - "return 0;", - Arrays.asList(getRawName(), keyPrefix), - internalLockLeaseTime, getLockName(threadId)); + renewalScheduler.renewReadLock(getRawName(), threadId, getLockName(threadId), keyPrefix); } - + + @Override + protected void cancelExpirationRenewal(Long threadId, Boolean unlockResult) { + super.cancelExpirationRenewal(threadId, unlockResult); + renewalScheduler.cancelReadLockRenewal(getRawName(), threadId); + } + @Override public Condition newCondition() { throw new UnsupportedOperationException(); diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index 5bf6bf408..3fde03776 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -15,12 +15,6 @@ */ package org.redisson; -import java.util.Arrays; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; - import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.client.codec.LongCodec; @@ -30,6 +24,10 @@ import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; + /** * Lock will be removed automatically if client disconnects. * @@ -123,18 +121,6 @@ public class RedissonWriteLock extends RedissonLock implements RLock { throw new UnsupportedOperationException(); } - @Override - protected CompletionStage renewExpirationAsync(long threadId) { - CompletionStage f = super.renewExpirationAsync(threadId); - return f.thenCompose(r -> { - if (!r) { - RedissonReadLock lock = new RedissonReadLock(commandExecutor, getRawName()); - return lock.renewExpirationAsync(threadId); - } - return CompletableFuture.completedFuture(r); - }); - } - @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(null, null); diff --git a/redisson/src/main/java/org/redisson/config/Config.java b/redisson/src/main/java/org/redisson/config/Config.java index fb3cba5e1..5feb8a664 100644 --- a/redisson/src/main/java/org/redisson/config/Config.java +++ b/redisson/src/main/java/org/redisson/config/Config.java @@ -73,6 +73,8 @@ public class Config { private long lockWatchdogTimeout = 30 * 1000; + private int lockWatchdogBatchSize = 100; + private boolean checkLockSyncedSlaves = true; private long slavesSyncTimeout = 1000; @@ -122,6 +124,7 @@ public class Config { setUseScriptCache(oldConf.isUseScriptCache()); setKeepPubSubOrder(oldConf.isKeepPubSubOrder()); setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout()); + setLockWatchdogBatchSize(oldConf.getLockWatchdogBatchSize()); setCheckLockSyncedSlaves(oldConf.isCheckLockSyncedSlaves()); setSlavesSyncTimeout(oldConf.getSlavesSyncTimeout()); setNettyThreads(oldConf.getNettyThreads()); @@ -539,6 +542,24 @@ public class Config { return lockWatchdogTimeout; } + + /** + * This parameter is only used if lock has been acquired without leaseTimeout parameter definition. + * Defines amount of locks utilized in a single lock watchdog execution. + *

+ * Default is 100 + * + * @param lockWatchdogBatchSize amount of locks used by a single lock watchdog execution + * @return config + */ + public Config setLockWatchdogBatchSize(int lockWatchdogBatchSize) { + this.lockWatchdogBatchSize = lockWatchdogBatchSize; + return this; + } + public int getLockWatchdogBatchSize() { + return lockWatchdogBatchSize; + } + /** * Defines whether to check synchronized slaves amount * with actual slaves amount after lock acquisition. diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index c83875169..2cf74fae6 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -63,6 +63,7 @@ import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RandomXoshiro256PlusPlus; import org.redisson.misc.RedisURI; import org.redisson.remote.ResponseEntry; +import org.redisson.renewal.LockRenewalScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,6 +152,8 @@ public final class ServiceManager { private final QueueTransferService queueTransferService = new QueueTransferService(); + private LockRenewalScheduler renewalScheduler; + public ServiceManager(MasterSlaveServersConfig config, Config cfg) { Version.logVersion(); @@ -666,4 +669,11 @@ public final class ServiceManager { } }); + public void register(LockRenewalScheduler renewalScheduler) { + this.renewalScheduler = renewalScheduler; + } + + public LockRenewalScheduler getRenewalScheduler() { + return renewalScheduler; + } } diff --git a/redisson/src/main/java/org/redisson/renewal/FastMultilockEntry.java b/redisson/src/main/java/org/redisson/renewal/FastMultilockEntry.java new file mode 100644 index 000000000..cb851a415 --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/FastMultilockEntry.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import java.util.Collection; + +/** + * + * @author Nikita Koksharov + * + */ +public class FastMultilockEntry extends LockEntry { + + private final Collection fields; + + public FastMultilockEntry(Collection fields) { + this.fields = fields; + } + + public Collection getFields() { + return fields; + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/FastMultilockTask.java b/redisson/src/main/java/org/redisson/renewal/FastMultilockTask.java new file mode 100644 index 000000000..4aa989a82 --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/FastMultilockTask.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandAsyncExecutor; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * + * @author Nikita Koksharov + * + */ +public class FastMultilockTask extends LockTask { + + public FastMultilockTask(long internalLockLeaseTime, CommandAsyncExecutor executor) { + super(internalLockLeaseTime, executor, 1); + } + + @Override + CompletionStage renew(Iterator iter, int chunkSize) { + if (!iter.hasNext()) { + return CompletableFuture.completedFuture(null); + } + + Map name2lockName = new HashMap<>(); + List args = new ArrayList<>(); + args.add(internalLockLeaseTime); + args.add(System.currentTimeMillis()); + + List keys = new ArrayList<>(chunkSize); + while (iter.hasNext()) { + String key = iter.next(); + + FastMultilockEntry entry = (FastMultilockEntry) name2entry.get(key); + if (entry == null) { + continue; + } + + Long threadId = entry.getFirstThreadId(); + if (threadId == null) { + continue; + } + + keys.add(key); + args.add(entry.getLockName(threadId)); + args.addAll(entry.getFields()); + name2lockName.put(key, threadId); + + if (keys.size() == chunkSize) { + break; + } + } + + if (keys.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + String firstName = keys.get(0); + + CompletionStage f = executor.syncedEval(firstName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local leaseTime = tonumber(ARGV[1]);" + + "local currentTime = tonumber(ARGV[2]);" + + "local currentThread = ARGV[3];" + + "if (redis.call('exists',KEYS[1]) > 0) then" + + " local newExpireTime = leaseTime + currentTime;" + + " for i=4, #ARGV, 1 do " + + " local lockThread = redis.call('hget', KEYS[1], ARGV[i]);" + + " if(lockThread ~= false and lockThread == currentThread) then " + + " local expireFieldName = ARGV[i]..':'..lockThread..':expire_time';" + + " local expireTime = redis.call('hget', KEYS[1], expireFieldName);" + + " if(tonumber(expireTime) < newExpireTime) then " + + " redis.call('hset', KEYS[1],expireFieldName, newExpireTime);" + + " end;" + + " else" + + " return 0;" + + " end;" + + " end; " + + " local expireTime = redis.call('pttl',KEYS[1]);" + + " if(tonumber(expireTime) < tonumber(leaseTime)) then " + + " redis.call('pexpire',KEYS[1], leaseTime);" + + " end;" + + " return 1;" + + "end;" + + "return 0;", + Collections.singletonList(firstName), + args.toArray()); + + return f.thenCompose(exists -> { + if (!exists) { + cancelExpirationRenewal(firstName, name2lockName.get(firstName)); + } + return renew(iter, chunkSize); + }); + } + + public void add(String rawName, String lockName, long threadId, Collection fields) { + FastMultilockEntry entry = new FastMultilockEntry(fields); + entry.addThreadId(threadId, lockName); + + add(rawName, lockName, threadId, entry); + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/LockEntry.java b/redisson/src/main/java/org/redisson/renewal/LockEntry.java new file mode 100644 index 000000000..a7e494f1f --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/LockEntry.java @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * + * @author Nikita Koksharov + * + */ +public class LockEntry { + + final Queue threadsQueue = new ConcurrentLinkedQueue<>(); + final Map threadId2counter = new ConcurrentHashMap<>(); + final Map threadId2lockName = new ConcurrentHashMap<>(); + + LockEntry() { + super(); + } + + public String getLockName(long threadId) { + return threadId2lockName.get(threadId); + } + + public void addThreadId(long threadId, String lockName) { + threadId2counter.compute(threadId, (t, counter) -> { + counter = Optional.ofNullable(counter).orElse(0); + counter++; + threadsQueue.add(threadId); + return counter; + }); + threadId2lockName.putIfAbsent(threadId, lockName); + } + + public boolean hasNoThreads() { + return threadsQueue.isEmpty(); + } + + public Long getFirstThreadId() { + return threadsQueue.peek(); + } + + public void removeThreadId(long threadId) { + threadId2counter.computeIfPresent(threadId, (t, counter) -> { + counter--; + if (counter == 0) { + threadsQueue.remove(threadId); + threadId2lockName.remove(threadId); + return null; + } + return counter; + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/LockRenewalScheduler.java b/redisson/src/main/java/org/redisson/renewal/LockRenewalScheduler.java new file mode 100644 index 000000000..846bdf1f5 --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/LockRenewalScheduler.java @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import org.redisson.command.CommandAsyncExecutor; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; + +/** + * + * @author Nikita Koksharov + * + */ +public final class LockRenewalScheduler { + + private final AtomicReference reference = new AtomicReference<>(); + private final AtomicReference multilockReference = new AtomicReference<>(); + private final AtomicReference readLockReference = new AtomicReference<>(); + private final CommandAsyncExecutor executor; + + private final int batchSize; + private final long internalLockLeaseTime; + + public LockRenewalScheduler(CommandAsyncExecutor executor) { + this.executor = executor; + this.internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout(); + this.batchSize = executor.getServiceManager().getCfg().getLockWatchdogBatchSize(); + } + + public void renewReadLock(String name, Long threadId, String lockName, String keyPrefix) { + readLockReference.compareAndSet(null, new ReadLockTask(internalLockLeaseTime, executor, batchSize)); + ReadLockTask task = readLockReference.get(); + task.add(name, lockName, threadId, keyPrefix); + } + + public void renewFastMultiLock(String name, Long threadId, String lockName, Collection fields) { + multilockReference.compareAndSet(null, new FastMultilockTask(internalLockLeaseTime, executor)); + FastMultilockTask task = multilockReference.get(); + task.add(name, lockName, threadId, fields); + } + + public void renewLock(String name, Long threadId, String lockName) { + reference.compareAndSet(null, new LockTask(internalLockLeaseTime, executor, batchSize)); + LockTask task = reference.get(); + task.add(name, lockName, threadId); + } + + public void cancelReadLockRenewal(String name, Long threadId) { + ReadLockTask rtask = readLockReference.get(); + if (rtask != null) { + rtask.cancelExpirationRenewal(name, threadId); + } + } + + public void cancelFastMultilockRenewl(String name, Long threadId) { + FastMultilockTask mtask = multilockReference.get(); + if (mtask != null) { + mtask.cancelExpirationRenewal(name, threadId); + } + } + + public void cancelLockRenewal(String name, Long threadId) { + LockTask task = reference.get(); + if (task != null) { + task.cancelExpirationRenewal(name, threadId); + } + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/LockTask.java b/redisson/src/main/java/org/redisson/renewal/LockTask.java new file mode 100644 index 000000000..c6220d38f --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/LockTask.java @@ -0,0 +1,101 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.decoder.ContainsDecoder; +import org.redisson.command.CommandAsyncExecutor; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * + * @author Nikita Koksharov + * + */ +public class LockTask extends RenewalTask { + + public LockTask(long internalLockLeaseTime, + CommandAsyncExecutor executor, int chunkSize) { + super(internalLockLeaseTime, executor, chunkSize); + } + + @Override + CompletionStage renew(Iterator iter, int chunkSize) { + if (!iter.hasNext()) { + return CompletableFuture.completedFuture(null); + } + + Map name2threadId = new HashMap<>(chunkSize); + List args = new ArrayList<>(chunkSize + 1); + args.add(internalLockLeaseTime); + + List keys = new ArrayList<>(chunkSize); + while (iter.hasNext()) { + String key = iter.next(); + + LockEntry entry = name2entry.get(key); + Long threadId = entry.getFirstThreadId(); + if (threadId == null) { + continue; + } + + keys.add(key); + args.add(entry.getLockName(threadId)); + name2threadId.put(key, threadId); + + if (keys.size() == chunkSize) { + break; + } + } + + String firstName = keys.get(0); + + CompletionStage> f = executor.syncedEval(firstName, LongCodec.INSTANCE, + new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)), + "local result = {} " + + "for i = 1, #KEYS, 1 do " + + "if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " + + "redis.call('pexpire', KEYS[i], ARGV[1]); " + + "table.insert(result, 1); " + + "else " + + "table.insert(result, 0); " + + "end; " + + "end; " + + "return result;", + new ArrayList<>(keys), + args.toArray()); + + return f.thenCompose(existingNames -> { + keys.removeAll(existingNames); + for (String key : keys) { + cancelExpirationRenewal(key, name2threadId.get(key)); + } + return renew(iter, chunkSize); + }); + } + + public void add(String rawName, String lockName, long threadId) { + LockEntry entry = new LockEntry(); + entry.addThreadId(threadId, lockName); + + add(rawName, lockName, threadId, entry); + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/ReadLockEntry.java b/redisson/src/main/java/org/redisson/renewal/ReadLockEntry.java new file mode 100644 index 000000000..77af1160d --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/ReadLockEntry.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * @author Nikita Koksharov + * + */ +public class ReadLockEntry extends LockEntry { + + private final Map threadId2keyPrefix = new ConcurrentHashMap<>(); + + public String getKeyPrefix(Long threadId) { + return threadId2keyPrefix.get(threadId); + } + + public void addThreadId(long threadId, String lockName, String keyPrefix) { + threadId2counter.compute(threadId, (t, counter) -> { + counter = Optional.ofNullable(counter).orElse(0); + counter++; + threadsQueue.add(threadId); + return counter; + }); + threadId2lockName.putIfAbsent(threadId, lockName); + threadId2keyPrefix.putIfAbsent(threadId, keyPrefix); + } + + @Override + public void removeThreadId(long threadId) { + threadId2counter.computeIfPresent(threadId, (t, counter) -> { + counter--; + if (counter == 0) { + threadsQueue.remove(threadId); + threadId2lockName.remove(threadId); + threadId2keyPrefix.remove(threadId); + return null; + } + return counter; + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/ReadLockTask.java b/redisson/src/main/java/org/redisson/renewal/ReadLockTask.java new file mode 100644 index 000000000..6c3f5d17d --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/ReadLockTask.java @@ -0,0 +1,136 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.decoder.ContainsDecoder; +import org.redisson.command.CommandAsyncExecutor; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * + * @author Nikita Koksharov + * + */ +public class ReadLockTask extends LockTask { + + public ReadLockTask(long internalLockLeaseTime, CommandAsyncExecutor executor, int chunkSize) { + super(internalLockLeaseTime, executor, chunkSize); + } + + @Override + CompletionStage renew(Iterator iter, int chunkSize) { + if (!iter.hasNext()) { + return CompletableFuture.completedFuture(null); + } + + Map name2lockName = new HashMap<>(); + List args = new ArrayList<>(); + args.add(internalLockLeaseTime); + + List keys = new ArrayList<>(chunkSize); + List keysArgs = new ArrayList<>(chunkSize); + while (iter.hasNext()) { + String key = iter.next(); + + ReadLockEntry entry = (ReadLockEntry) name2entry.get(key); + if (entry == null) { + continue; + } + + Long threadId = entry.getFirstThreadId(); + if (threadId == null) { + continue; + } + + keys.add(key); + keysArgs.add(key); + keysArgs.add(entry.getKeyPrefix(threadId)); + args.add(entry.getLockName(threadId)); + name2lockName.put(key, threadId); + + if (keys.size() == chunkSize) { + break; + } + } + + if (keys.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + String firstName = keys.get(0).toString(); + + CompletionStage> f = executor.syncedEval(firstName, LongCodec.INSTANCE, + new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)), + "local result = {} " + + "local j = 1 " + + "for i = 1, #KEYS, 2 do " + + "j = j + 1; " + + "local counter = redis.call('hget', KEYS[i], ARGV[j]); " + + "if (counter ~= false) then " + + "redis.call('pexpire', KEYS[i], ARGV[1]); " + + + "if (redis.call('hlen', KEYS[i]) > 1) then " + + "local keys = redis.call('hkeys', KEYS[i]); " + + "for n, key in ipairs(keys) do " + + "counter = tonumber(redis.call('hget', KEYS[i], key)); " + + "if type(counter) == 'number' then " + + "for i=counter, 1, -1 do " + + "redis.call('pexpire', KEYS[i+1] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + + "end; " + + "end; " + + "end; " + + "end; " + + "table.insert(result, 1); " + + "else " + + "table.insert(result, 0); " + + "end; " + + "end; " + + "return result;", + keysArgs, + args.toArray()); + + return f.thenCompose(existingNames -> { + keys.removeAll(existingNames); + for (Object k : keys) { + String key = k.toString(); + cancelExpirationRenewal(key, name2lockName.get(key)); + } + return renew(iter, chunkSize); + }); + } + + public void add(String rawName, String lockName, long threadId, String keyPrefix) { + addSlotName(rawName); + + ReadLockEntry entry = new ReadLockEntry(); + entry.addThreadId(threadId, lockName, keyPrefix); + + ReadLockEntry oldEntry = (ReadLockEntry) name2entry.putIfAbsent(rawName, entry); + if (oldEntry != null) { + oldEntry.addThreadId(threadId, lockName, keyPrefix); + } else { + if (tryRun()) { + schedule(); + } + } + } + +} diff --git a/redisson/src/main/java/org/redisson/renewal/RenewalTask.java b/redisson/src/main/java/org/redisson/renewal/RenewalTask.java new file mode 100644 index 000000000..d0179e73e --- /dev/null +++ b/redisson/src/main/java/org/redisson/renewal/RenewalTask.java @@ -0,0 +1,182 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.renewal; + +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import org.redisson.command.CommandAsyncExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * @author Nikita Koksharov + * + */ +abstract class RenewalTask implements TimerTask { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + final CommandAsyncExecutor executor; + + AtomicBoolean running = new AtomicBoolean(); + + final Map> slot2names = new ConcurrentHashMap<>(); + final Map name2entry = new ConcurrentHashMap<>(); + + final long internalLockLeaseTime; + final int chunkSize; + + boolean tryRun() { + return running.compareAndSet(false, true); + } + + void stop() { + running.set(false); + } + + public void schedule() { + if (!running.get()) { + return; + } + + long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout(); + executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); + } + + RenewalTask(long internalLockLeaseTime, + CommandAsyncExecutor executor, int chunkSize) { + this.executor = executor; + this.internalLockLeaseTime = internalLockLeaseTime; + this.chunkSize = chunkSize; + } + + final CompletionStage execute() { + if (name2entry.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + if (!executor.getServiceManager().getCfg().isClusterConfig()) { + return renew(name2entry.keySet().iterator(), chunkSize); + } + + return renewSlots(slot2names.values().iterator(), chunkSize); + } + + private CompletionStage renewSlots(Iterator> iter, int chunkSize) { + if (!iter.hasNext()) { + return CompletableFuture.completedFuture(null); + } + + CompletionStage c = renew(iter.next().iterator(), chunkSize); + return c.thenCompose(r -> renewSlots(iter, chunkSize)); + } + + abstract CompletionStage renew(Iterator iter, int chunkSize); + + void cancelExpirationRenewal(String name, Long threadId) { + LockEntry task = name2entry.get(name); + if (task == null) { + return; + } + + if (threadId != null) { + task.removeThreadId(threadId); + } + + if (threadId == null || task.hasNoThreads()) { + name2entry.remove(name); + + if (executor.getServiceManager().getCfg().isClusterConfig()) { + int slot = executor.getConnectionManager().calcSlot(name); + slot2names.computeIfPresent(slot, (k, v) -> { + v.remove(name); + if (v.isEmpty()) { + return null; + } + return v; + }); + } + + if (!name2entry.isEmpty()) { + return; + } + + stop(); + + if (!name2entry.isEmpty() && tryRun()) { + schedule(); + } + } + } + + final void add(String rawName, String lockName, long threadId, LockEntry entry) { + addSlotName(rawName); + + LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry); + if (oldEntry != null) { + oldEntry.addThreadId(threadId, lockName); + } else { + if (tryRun()) { + schedule(); + } + } + } + + void addSlotName(String rawName) { + if (!executor.getServiceManager().getCfg().isClusterConfig()) { + return; + } + + int slot = executor.getConnectionManager().calcSlot(rawName); + slot2names.compute(slot, (k, v) -> { + if (v == null) { + v = Collections.newSetFromMap(new ConcurrentHashMap<>()); + } + v.add(rawName); + return v; + }); + } + + @Override + public void run(Timeout timeout) { + if (executor.getServiceManager().isShuttingDown()) { + return; + } + + CompletionStage future = execute(); + future.whenComplete((result, e) -> { + if (e != null) { + log.error("Can't update locks {} expiration", name2entry.keySet(), e); + schedule(); + return; + } + + schedule(); + }); + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java b/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java index 25466db1c..866145f31 100644 --- a/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLockExpirationRenewalTest.java @@ -1,7 +1,10 @@ package org.redisson; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; +import org.redisson.api.RReadWriteLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.testcontainers.containers.GenericContainer; @@ -13,15 +16,90 @@ public class RedissonLockExpirationRenewalTest extends RedisDockerTest { private static final String LOCK_KEY = "LOCK_KEY"; public static final long LOCK_WATCHDOG_TIMEOUT = 1_000L; - @Test - public void testExpirationRenewalIsWorkingAfterTimeout() throws InterruptedException { - GenericContainer redis = createRedis(); + RedissonClient redisson; + GenericContainer redis; + + @BeforeEach + public void beforeEachTest() { + redis = createRedis(); redis.start(); Config c = createConfig(redis); c.setLockWatchdogTimeout(LOCK_WATCHDOG_TIMEOUT); - RedissonClient redisson = Redisson.create(c); + c.setLockWatchdogBatchSize(50); + redisson = Redisson.create(c); + } + + @AfterEach + public void afterEachTest() { + redisson.shutdown(); + redis.stop(); + } + + @Test + public void testWriteLockAfterTimeout() throws InterruptedException { + RReadWriteLock rw = redisson.getReadWriteLock(LOCK_KEY); + RLock lock = rw.writeLock(); + lock.lock(); + try { + // force expiration renewal error + restart(redis); + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + assertThatThrownBy(() -> { + lock.unlock(); + }).isInstanceOf(IllegalMonitorStateException.class); + } + + RReadWriteLock lock2 = redisson.getReadWriteLock(LOCK_KEY); + lock2.writeLock().lock(); + try { + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + lock2.writeLock().unlock(); + } + + Thread.sleep(1000); + + lock2.writeLock().lock(); + try { + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + lock2.writeLock().unlock(); + } + } + + @Test + public void testReadLockAfterTimeout() throws InterruptedException { + RReadWriteLock rw = redisson.getReadWriteLock(LOCK_KEY); + RLock lock = rw.readLock(); + lock.lock(); + try { + // force expiration renewal error + restart(redis); + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + assertThatThrownBy(() -> { + lock.unlock(); + }).isInstanceOf(IllegalMonitorStateException.class); + } + RReadWriteLock lock2 = redisson.getReadWriteLock(LOCK_KEY); + lock2.readLock().lock(); + try { + // wait for timeout + Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); + } finally { + lock2.readLock().unlock(); + } + } + + @Test + public void testLockAfterTimeout() throws InterruptedException { RLock lock = redisson.getLock(LOCK_KEY); lock.lock(); try { @@ -41,9 +119,6 @@ public class RedissonLockExpirationRenewalTest extends RedisDockerTest { } finally { lock2.unlock(); } - - redisson.shutdown(); - redis.stop(); } }