expire, expireAt and clearExpire aren't implemented properly for RDelayedQueue, RFairLock, RLocalCachedMap and RPermitExpirableSemaphore objects

pull/1253/merge
Nikita 7 years ago
parent b177e94d5e
commit 38238cfe69

@ -33,7 +33,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonPromise;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
@ -44,9 +43,15 @@ import io.netty.util.internal.ThreadLocalRandom;
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
private final QueueTransferService queueTransferService;
private final String channelName;
private final String queueName;
private final String timeoutSetName;
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getName());
queueName = prefixName("redisson_delay_queue", getName());
timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
@ -68,33 +73,21 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getQueueName()),
Arrays.<Object>asList(getName(), timeoutSetName, queueName),
System.currentTimeMillis(), 100);
}
@Override
protected RTopic<Long> getTopic() {
return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, getChannelName());
return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);
}
};
queueTransferService.schedule(getQueueName(), task);
queueTransferService.schedule(queueName, task);
this.queueTransferService = queueTransferService;
}
private String getChannelName() {
return prefixName("redisson_delay_queue_channel", getName());
}
private String getQueueName() {
return prefixName("redisson_delay_queue", getName());
}
private String getTimeoutSetName() {
return prefixName("redisson_delay_queue_timeout", getName());
}
public void offer(V e, long delay, TimeUnit timeUnit) {
get(offerAsync(e, delay, timeUnit));
}
@ -115,7 +108,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;"
,
Arrays.<Object>asList(getName(), getTimeoutSetName(), getQueueName(), getChannelName()),
Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
}
@ -180,7 +173,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getQueueName()), index));
Arrays.<Object>asList(queueName), index));
}
void remove(int index) {
@ -191,7 +184,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"redis.call('lrem', KEYS[1], 1, v);" +
"redis.call('zrem', KEYS[2], v);" +
"end; ",
Arrays.<Object>asList(getQueueName(), getTimeoutSetName()), index));
Arrays.<Object>asList(queueName, timeoutSetName), index));
}
@Override
@ -268,7 +261,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "table.insert(result, value);"
+ "end; "
+ "return result; ",
Collections.<Object>singletonList(getQueueName()));
Collections.<Object>singletonList(queueName));
}
@Override
@ -294,7 +287,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return 0;",
Arrays.<Object>asList(getQueueName(), getTimeoutSetName()), encode(o));
Arrays.<Object>asList(queueName, timeoutSetName), encode(o));
}
@Override
@ -316,7 +309,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return #ARGV == 0 and 1 or 0;",
Collections.<Object>singletonList(getQueueName()), encode(c).toArray());
Collections.<Object>singletonList(queueName), encode(c).toArray());
}
@Override
@ -356,7 +349,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1;"
+ "end; "
+ "return result;",
Arrays.<Object>asList(getQueueName(), getTimeoutSetName()), encode(c).toArray());
Arrays.<Object>asList(queueName, timeoutSetName), encode(c).toArray());
}
@Override
@ -395,7 +388,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "i = i + 1; "
+ "end; "
+ "return changed; ",
Collections.<Object>singletonList(getQueueName()), encode(c).toArray());
Collections.<Object>singletonList(queueName), encode(c).toArray());
}
@Override
@ -405,9 +398,36 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getQueueName(), getTimeoutSetName());
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, queueName, timeoutSetName);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return redis.call('pexpire', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(queueName, timeoutSetName),
timeUnit.toMillis(timeToLive));
}
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpireat', KEYS[1], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(queueName, timeoutSetName),
timestamp);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('persist', KEYS[1]); " +
"return redis.call('persist', KEYS[2]); ",
Arrays.<Object>asList(queueName, timeoutSetName));
}
@Override
public RFuture<V> peekAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
@ -417,7 +437,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getQueueName()));
Arrays.<Object>asList(queueName));
}
@Override
@ -430,7 +450,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getQueueName(), getTimeoutSetName()));
Arrays.<Object>asList(queueName, timeoutSetName));
}
@Override
@ -449,7 +469,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "return value; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(getQueueName(), getTimeoutSetName(), queueName));
Arrays.<Object>asList(this.queueName, timeoutSetName, queueName));
}
@Override
@ -464,12 +484,12 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "end; "
+ "end;" +
"return 0;",
Collections.<Object>singletonList(getQueueName()), encode(o));
Collections.<Object>singletonList(queueName), encode(o));
}
@Override
public RFuture<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.LLEN_INT, getQueueName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.LLEN_INT, queueName);
}
@Override
@ -489,7 +509,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
public void destroy() {
queueTransferService.remove(getQueueName());
queueTransferService.remove(queueName);
}
}

@ -42,18 +42,14 @@ public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime = 5000;
private final CommandExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
protected RedissonFairLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
}
String getThreadsQueueName() {
return prefixName("redisson_lock_queue", getName());
}
String getTimeoutSetName() {
return prefixName("redisson_lock_timeout", getName());
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
@Override
@ -85,7 +81,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"end;" +
"redis.call('zrem', KEYS[2], ARGV[1]); " +
"redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()),
Arrays.<Object>asList(threadsQueueName, timeoutSetName),
getLockName(threadId), threadWaitTime);
}
@ -126,7 +122,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"return nil; " +
"end; " +
"return 1;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()),
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime);
}
@ -174,7 +170,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()),
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
}
@ -221,7 +217,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()),
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
@ -232,9 +228,39 @@ public class RedissonFairLock extends RedissonLock implements RLock {
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getThreadsQueueName(), getTimeoutSetName());
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), threadsQueueName, timeoutSetName);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"redis.call('pexpire', KEYS[2], ARGV[1]); " +
"return redis.call('pexpire', KEYS[3], ARGV[1]); ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
timeUnit.toMillis(timeToLive));
}
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpireat', KEYS[1], ARGV[1]); " +
"redis.call('pexpireat', KEYS[2], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[3], ARGV[1]); ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),
timestamp);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('persist', KEYS[1]); " +
"redis.call('persist', KEYS[2]); " +
"return redis.call('persist', KEYS[3]); ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName));
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal();
@ -263,7 +289,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"return 1; " +
"end; " +
"return 0;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()),
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.unlockMessage, System.currentTimeMillis());
}

@ -180,8 +180,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private RTopic<Object> invalidationTopic;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
private int invalidationListenerId;
private int invalidationStatusListenerId;
private int syncListenerId;
private int reconnectionListenerId;
private volatile long lastInvalidate;
private SyncStrategy syncStrategy;
private final Codec topicCodec = new LocalCachedMessageCodec();
@ -218,7 +218,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, "topic"));
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
@ -272,7 +272,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (options.getSyncStrategy() != SyncStrategy.NONE) {
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
syncListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
@ -481,11 +481,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public void destroy() {
if (invalidationListenerId != 0) {
invalidationTopic.removeListener(invalidationListenerId);
if (syncListenerId != 0) {
invalidationTopic.removeListener(syncListenerId);
}
if (invalidationStatusListenerId != 0) {
invalidationTopic.removeListener(invalidationStatusListenerId);
if (reconnectionListenerId != 0) {
invalidationTopic.removeListener(reconnectionListenerId);
}
}

@ -53,7 +53,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
protected RedissonPermitExpirableSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
super(commandExecutor, name);
this.timeoutName = "{" + name + "}:timeout";
this.timeoutName = suffixName(name, "timeout");
this.commandExecutor = commandExecutor;
this.semaphorePubSub = semaphorePubSub;
}
@ -635,6 +635,32 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), timeoutName);
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return redis.call('pexpire', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(getName(), timeoutName),
timeUnit.toMillis(timeToLive));
}
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpireat', KEYS[1], ARGV[1]); " +
"return redis.call('pexpireat', KEYS[2], ARGV[1]); ",
Arrays.<Object>asList(getName(), timeoutName),
timestamp);
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('persist', KEYS[1]); " +
"return redis.call('persist', KEYS[2]); ",
Arrays.<Object>asList(getName(), timeoutName));
}
@Override
public RFuture<Void> releaseAsync(final String permitId) {
final RPromise<Void> result = new RedissonPromise<Void>();

Loading…
Cancel
Save