diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index d5e3c8aaa..f4791c994 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -33,7 +33,6 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RedissonPromise; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; /** * @@ -44,9 +43,15 @@ import io.netty.util.internal.ThreadLocalRandom; public class RedissonDelayedQueue extends RedissonExpirable implements RDelayedQueue { private final QueueTransferService queueTransferService; + private final String channelName; + private final String queueName; + private final String timeoutSetName; protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + channelName = prefixName("redisson_delay_queue_channel", getName()); + queueName = prefixName("redisson_delay_queue", getName()); + timeoutSetName = prefixName("redisson_delay_queue_timeout", getName()); QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) { @@ -68,33 +73,21 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return v[2]; " + "end " + "return nil;", - Arrays.asList(getName(), getTimeoutSetName(), getQueueName()), + Arrays.asList(getName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic getTopic() { - return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, getChannelName()); + return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName); } }; - queueTransferService.schedule(getQueueName(), task); + queueTransferService.schedule(queueName, task); this.queueTransferService = queueTransferService; } - private String getChannelName() { - return prefixName("redisson_delay_queue_channel", getName()); - } - - private String getQueueName() { - return prefixName("redisson_delay_queue", getName()); - } - - private String getTimeoutSetName() { - return prefixName("redisson_delay_queue_timeout", getName()); - } - public void offer(V e, long delay, TimeUnit timeUnit) { get(offerAsync(e, delay, timeUnit)); } @@ -115,7 +108,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;" , - Arrays.asList(getName(), getTimeoutSetName(), getQueueName(), getChannelName()), + Arrays.asList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); } @@ -180,7 +173,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(getQueueName()), index)); + Arrays.asList(queueName), index)); } void remove(int index) { @@ -191,7 +184,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "redis.call('lrem', KEYS[1], 1, v);" + "redis.call('zrem', KEYS[2], v);" + "end; ", - Arrays.asList(getQueueName(), getTimeoutSetName()), index)); + Arrays.asList(queueName, timeoutSetName), index)); } @Override @@ -268,7 +261,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "table.insert(result, value);" + "end; " + "return result; ", - Collections.singletonList(getQueueName())); + Collections.singletonList(queueName)); } @Override @@ -294,7 +287,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "end; " + "end;" + "return 0;", - Arrays.asList(getQueueName(), getTimeoutSetName()), encode(o)); + Arrays.asList(queueName, timeoutSetName), encode(o)); } @Override @@ -316,7 +309,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "end; " + "end;" + "return #ARGV == 0 and 1 or 0;", - Collections.singletonList(getQueueName()), encode(c).toArray()); + Collections.singletonList(queueName), encode(c).toArray()); } @Override @@ -356,7 +349,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "i = i + 1;" + "end; " + "return result;", - Arrays.asList(getQueueName(), getTimeoutSetName()), encode(c).toArray()); + Arrays.asList(queueName, timeoutSetName), encode(c).toArray()); } @Override @@ -395,7 +388,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "i = i + 1; " + "end; " + "return changed; ", - Collections.singletonList(getQueueName()), encode(c).toArray()); + Collections.singletonList(queueName), encode(c).toArray()); } @Override @@ -405,9 +398,36 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public RFuture deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getQueueName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, queueName, timeoutSetName); + } + + @Override + public RFuture expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return redis.call('pexpire', KEYS[2], ARGV[1]); ", + Arrays.asList(queueName, timeoutSetName), + timeUnit.toMillis(timeToLive)); + } + + @Override + public RFuture expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpireat', KEYS[1], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[2], ARGV[1]); ", + Arrays.asList(queueName, timeoutSetName), + timestamp); } + @Override + public RFuture clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('persist', KEYS[1]); " + + "return redis.call('persist', KEYS[2]); ", + Arrays.asList(queueName, timeoutSetName)); + } + + @Override public RFuture peekAsync() { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_OBJECT, @@ -417,7 +437,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(getQueueName())); + Arrays.asList(queueName)); } @Override @@ -430,7 +450,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(getQueueName(), getTimeoutSetName())); + Arrays.asList(queueName, timeoutSetName)); } @Override @@ -449,7 +469,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "return value; " + "end " + "return nil;", - Arrays.asList(getQueueName(), getTimeoutSetName(), queueName)); + Arrays.asList(this.queueName, timeoutSetName, queueName)); } @Override @@ -464,12 +484,12 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "end; " + "end;" + "return 0;", - Collections.singletonList(getQueueName()), encode(o)); + Collections.singletonList(queueName), encode(o)); } @Override public RFuture sizeAsync() { - return commandExecutor.readAsync(getName(), codec, RedisCommands.LLEN_INT, getQueueName()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.LLEN_INT, queueName); } @Override @@ -489,7 +509,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public void destroy() { - queueTransferService.remove(getQueueName()); + queueTransferService.remove(queueName); } } diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index f7872cfde..85680ac17 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -42,18 +42,14 @@ public class RedissonFairLock extends RedissonLock implements RLock { private final long threadWaitTime = 5000; private final CommandExecutor commandExecutor; + private final String threadsQueueName; + private final String timeoutSetName; protected RedissonFairLock(CommandExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name, id); this.commandExecutor = commandExecutor; - } - - String getThreadsQueueName() { - return prefixName("redisson_lock_queue", getName()); - } - - String getTimeoutSetName() { - return prefixName("redisson_lock_timeout", getName()); + threadsQueueName = prefixName("redisson_lock_queue", name); + timeoutSetName = prefixName("redisson_lock_timeout", name); } @Override @@ -85,7 +81,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { "end;" + "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('lrem', KEYS[1], 0, ARGV[1]); ", - Arrays.asList(getThreadsQueueName(), getTimeoutSetName()), + Arrays.asList(threadsQueueName, timeoutSetName), getLockName(threadId), threadWaitTime); } @@ -126,7 +122,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { "return nil; " + "end; " + "return 1;", - Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName()), + Arrays.asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime); } @@ -174,7 +170,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { "redis.call('rpush', KEYS[2], ARGV[2]);" + "end; " + "return ttl;", - Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName()), + Arrays.asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime); } @@ -221,7 +217,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; ", - Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()), + Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); } @@ -232,9 +228,39 @@ public class RedissonFairLock extends RedissonLock implements RLock { @Override public RFuture deleteAsync() { - return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getThreadsQueueName(), getTimeoutSetName()); + return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), threadsQueueName, timeoutSetName); } + @Override + public RFuture expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "redis.call('pexpire', KEYS[2], ARGV[1]); " + + "return redis.call('pexpire', KEYS[3], ARGV[1]); ", + Arrays.asList(getName(), threadsQueueName, timeoutSetName), + timeUnit.toMillis(timeToLive)); + } + + @Override + public RFuture expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpireat', KEYS[1], ARGV[1]); " + + "redis.call('pexpireat', KEYS[2], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[3], ARGV[1]); ", + Arrays.asList(getName(), threadsQueueName, timeoutSetName), + timestamp); + } + + @Override + public RFuture clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('persist', KEYS[1]); " + + "redis.call('persist', KEYS[2]); " + + "return redis.call('persist', KEYS[3]); ", + Arrays.asList(getName(), threadsQueueName, timeoutSetName)); + } + + @Override public RFuture forceUnlockAsync() { cancelExpirationRenewal(); @@ -263,7 +289,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { "return 1; " + "end; " + "return 0;", - Arrays.asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()), + Arrays.asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), LockPubSub.unlockMessage, System.currentTimeMillis()); } diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 2d3e70b04..ce9871e60 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -180,8 +180,8 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private RTopic invalidationTopic; private Cache cache; private int invalidateEntryOnChange; - private int invalidationListenerId; - private int invalidationStatusListenerId; + private int syncListenerId; + private int reconnectionListenerId; private volatile long lastInvalidate; private SyncStrategy syncStrategy; private final Codec topicCodec = new LocalCachedMessageCodec(); @@ -218,7 +218,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R invalidationTopic = new RedissonTopic(topicCodec, commandExecutor, suffixName(name, "topic")); if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) { - invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() { + reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) { @@ -272,7 +272,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (options.getSyncStrategy() != SyncStrategy.NONE) { - invalidationListenerId = invalidationTopic.addListener(new MessageListener() { + syncListenerId = invalidationTopic.addListener(new MessageListener() { @Override public void onMessage(String channel, Object msg) { if (msg instanceof LocalCachedMapClear) { @@ -481,11 +481,11 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public void destroy() { - if (invalidationListenerId != 0) { - invalidationTopic.removeListener(invalidationListenerId); + if (syncListenerId != 0) { + invalidationTopic.removeListener(syncListenerId); } - if (invalidationStatusListenerId != 0) { - invalidationTopic.removeListener(invalidationStatusListenerId); + if (reconnectionListenerId != 0) { + invalidationTopic.removeListener(reconnectionListenerId); } } diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 8741bc213..00dc71702 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -53,7 +53,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen protected RedissonPermitExpirableSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { super(commandExecutor, name); - this.timeoutName = "{" + name + "}:timeout"; + this.timeoutName = suffixName(name, "timeout"); this.commandExecutor = commandExecutor; this.semaphorePubSub = semaphorePubSub; } @@ -635,6 +635,32 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), timeoutName); } + @Override + public RFuture expireAsync(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpire', KEYS[1], ARGV[1]); " + + "return redis.call('pexpire', KEYS[2], ARGV[1]); ", + Arrays.asList(getName(), timeoutName), + timeUnit.toMillis(timeToLive)); + } + + @Override + public RFuture expireAtAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('pexpireat', KEYS[1], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[2], ARGV[1]); ", + Arrays.asList(getName(), timeoutName), + timestamp); + } + + @Override + public RFuture clearExpireAsync() { + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('persist', KEYS[1]); " + + "return redis.call('persist', KEYS[2]); ", + Arrays.asList(getName(), timeoutName)); + } + @Override public RFuture releaseAsync(final String permitId) { final RPromise result = new RedissonPromise();