From 339c6e915478e1add38e7834e9dd7ecdfa440374 Mon Sep 17 00:00:00 2001
From: Nikita
Date: Fri, 11 Dec 2015 17:04:34 +0300
Subject: [PATCH] Expired values clean task "flooding" with many requests
amount
---
...nScheduler.java => EvictionScheduler.java} | 66 +++++++++++++++++--
src/main/java/org/redisson/Redisson.java | 12 ++--
src/main/java/org/redisson/RedissonBatch.java | 12 ++--
.../java/org/redisson/RedissonMapCache.java | 29 ++++----
.../java/org/redisson/RedissonReactive.java | 9 +--
.../java/org/redisson/RedissonSetCache.java | 29 ++++----
.../org/redisson/api/RMapCacheReactive.java | 2 +-
.../java/org/redisson/core/RMapCache.java | 2 +-
.../org/redisson/core/RMapCacheAsync.java | 2 +-
.../java/org/redisson/core/RSetCache.java | 2 +-
.../reactive/RedissonBatchReactive.java | 25 +++----
.../reactive/RedissonMapCacheReactive.java | 31 ++++-----
.../RedissonMapCacheReactiveTest.java | 4 +-
.../org/redisson/RedissonMapCacheTest.java | 10 +--
.../org/redisson/RedissonMapReactiveTest.java | 23 ++-----
.../org/redisson/RedissonSetCacheTest.java | 23 ++++++-
16 files changed, 165 insertions(+), 116 deletions(-)
rename src/main/java/org/redisson/{RedissonEvictionScheduler.java => EvictionScheduler.java} (65%)
diff --git a/src/main/java/org/redisson/RedissonEvictionScheduler.java b/src/main/java/org/redisson/EvictionScheduler.java
similarity index 65%
rename from src/main/java/org/redisson/RedissonEvictionScheduler.java
rename to src/main/java/org/redisson/EvictionScheduler.java
index 19e7e1d6d..968964239 100644
--- a/src/main/java/org/redisson/RedissonEvictionScheduler.java
+++ b/src/main/java/org/redisson/EvictionScheduler.java
@@ -18,12 +18,15 @@ package org.redisson;
import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@@ -38,9 +41,9 @@ import io.netty.util.internal.PlatformDependent;
* @author Nikita Koksharov
*
*/
-public class RedissonEvictionScheduler {
+public class EvictionScheduler {
- public static final RedissonEvictionScheduler INSTANCE = new RedissonEvictionScheduler();
+ private static final Logger log = LoggerFactory.getLogger(RedissonSetCache.class);
public static class RedissonCacheTask implements Runnable {
@@ -117,12 +120,18 @@ public class RedissonEvictionScheduler {
}
- private RedissonEvictionScheduler() {
- }
-
private final ConcurrentMap tasks = PlatformDependent.newConcurrentHashMap();
+ private final CommandAsyncExecutor executor;
+
+ private final Map lastExpiredTime = PlatformDependent.newConcurrentHashMap();
+ private final int expireTaskExecutionDelay = 1000;
+ private final int valuesAmountToClean = 100;
+
+ public EvictionScheduler(CommandAsyncExecutor executor) {
+ this.executor = executor;
+ }
- public void schedule(String name, String timeoutSetName, CommandAsyncExecutor executor) {
+ public void schedule(String name, String timeoutSetName) {
RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName, executor);
RedissonCacheTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) {
@@ -130,4 +139,49 @@ public class RedissonEvictionScheduler {
}
}
+
+ public void runCleanTask(final String name, String timeoutSetName, long currentDate) {
+
+ final Long lastExpired = lastExpiredTime.get(name);
+ long now = System.currentTimeMillis();
+ if (lastExpired == null) {
+ if (lastExpiredTime.putIfAbsent(name, now) != null) {
+ return;
+ }
+ } else if (lastExpired + expireTaskExecutionDelay >= now) {
+ if (!lastExpiredTime.replace(name, lastExpired, now)) {
+ return;
+ }
+ } else {
+ return;
+ }
+
+ Future future = executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
+ "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ + "if #expiredKeys > 0 then "
+ + "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ + "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ + "end;"
+ + "return #expiredKeys;",
+ Arrays.asList(name, timeoutSetName), currentDate, valuesAmountToClean);
+
+ future.addListener(new FutureListener() {
+ @Override
+ public void operationComplete(Future future) throws Exception {
+ executor.getConnectionManager().getGroup().schedule(new Runnable() {
+ @Override
+ public void run() {
+ lastExpiredTime.remove(name, lastExpired);
+ }
+ }, expireTaskExecutionDelay*3, TimeUnit.SECONDS);
+
+ if (!future.isSuccess()) {
+ log.warn("Can't execute clean task for expired values. RSetCache name: " + name, future.cause());
+ return;
+ }
+ }
+ });
+ }
+
+
}
diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java
index 3817bb162..7d25bc4bb 100755
--- a/src/main/java/org/redisson/Redisson.java
+++ b/src/main/java/org/redisson/Redisson.java
@@ -69,6 +69,7 @@ import io.netty.util.concurrent.Future;
*/
public class Redisson implements RedissonClient {
+ private final EvictionScheduler evictionScheduler;
private final CommandExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Config config;
@@ -92,6 +93,7 @@ public class Redisson implements RedissonClient {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
commandExecutor = new CommandSyncService(connectionManager);
+ evictionScheduler = new EvictionScheduler(commandExecutor);
}
@@ -193,22 +195,22 @@ public class Redisson implements RedissonClient {
@Override
public RSetCache getSetCache(String name) {
- return new RedissonSetCache(commandExecutor, name);
+ return new RedissonSetCache(evictionScheduler, commandExecutor, name);
}
@Override
public RSetCache getSetCache(String name, Codec codec) {
- return new RedissonSetCache(codec, commandExecutor, name);
+ return new RedissonSetCache(codec, evictionScheduler, commandExecutor, name);
}
@Override
public RMapCache getMapCache(String name) {
- return new RedissonMapCache(commandExecutor, name);
+ return new RedissonMapCache(evictionScheduler, commandExecutor, name);
}
@Override
public RMapCache getMapCache(String name, Codec codec) {
- return new RedissonMapCache(codec, commandExecutor, name);
+ return new RedissonMapCache(codec, evictionScheduler, commandExecutor, name);
}
@Override
@@ -338,7 +340,7 @@ public class Redisson implements RedissonClient {
@Override
public RBatch createBatch() {
- return new RedissonBatch(connectionManager);
+ return new RedissonBatch(evictionScheduler, connectionManager);
}
@Override
diff --git a/src/main/java/org/redisson/RedissonBatch.java b/src/main/java/org/redisson/RedissonBatch.java
index 57fb896ba..ac1ce0fa3 100644
--- a/src/main/java/org/redisson/RedissonBatch.java
+++ b/src/main/java/org/redisson/RedissonBatch.java
@@ -49,10 +49,12 @@ import io.netty.util.concurrent.Future;
*/
public class RedissonBatch implements RBatch {
+ private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
- public RedissonBatch(ConnectionManager connectionManager) {
+ public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
+ this.evictionScheduler = evictionScheduler;
}
@Override
@@ -172,12 +174,12 @@ public class RedissonBatch implements RBatch {
@Override
public RMapCacheAsync getMapCache(String name, Codec codec) {
- return new RedissonMapCache(codec, executorService, name);
+ return new RedissonMapCache(codec, evictionScheduler, executorService, name);
}
@Override
public RMapCacheAsync getMapCache(String name) {
- return new RedissonMapCache(executorService, name);
+ return new RedissonMapCache(evictionScheduler, executorService, name);
}
@Override
@@ -192,12 +194,12 @@ public class RedissonBatch implements RBatch {
@Override
public RSetCacheAsync getSetCache(String name) {
- return new RedissonSetCache(executorService, name);
+ return new RedissonSetCache(evictionScheduler, executorService, name);
}
@Override
public RSetCacheAsync getSetCache(String name, Codec codec) {
- return new RedissonSetCache(codec, executorService, name);
+ return new RedissonSetCache(codec, evictionScheduler, executorService, name);
}
@Override
diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java
index 31d00fc6d..61de85b61 100644
--- a/src/main/java/org/redisson/RedissonMapCache.java
+++ b/src/main/java/org/redisson/RedissonMapCache.java
@@ -55,7 +55,7 @@ import io.netty.util.concurrent.Promise;
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.
@@ -75,16 +75,19 @@ public class RedissonMapCache extends RedissonMap implements RMapCac
private static final RedisCommand> EVAL_CONTAINS_KEY = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), 5, ValueType.MAP_KEY);
private static final RedisCommand> EVAL_CONTAINS_VALUE = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), 5, ValueType.MAP_VALUE);
private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 5, ValueType.MAP_KEY);
- private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5);
- protected RedissonMapCache(CommandAsyncExecutor commandExecutor, String name) {
+ private final EvictionScheduler evictionScheduler;
+
+ protected RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
- RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
+ this.evictionScheduler = evictionScheduler;
+ evictionScheduler.schedule(getName(), getTimeoutSetName());
}
- public RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
+ public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
- RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
+ this.evictionScheduler = evictionScheduler;
+ evictionScheduler.schedule(getName(), getTimeoutSetName());
}
@Override
@@ -173,7 +176,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
- expireMap(currentDate);
+ evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
}
result.setSuccess((Map) res.get(1));
@@ -251,7 +254,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
result.setSuccess(nullValue);
- expireMap(currentDate);
+ evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return;
}
@@ -264,16 +267,6 @@ public class RedissonMapCache extends RedissonMap implements RMapCac
});
}
- private void expireMap(long currentDate) {
- commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
- "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
- + "if #expiredKeys > 0 then "
- + "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
- + "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
- + "end;",
- Arrays.asList(getName(), getTimeoutSetName()), currentDate);
- }
-
@Override
public V put(K key, V value, long ttl, TimeUnit unit) {
return get(putAsync(key, value, ttl, unit));
diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java
index 9198a4665..ba1df5c91 100644
--- a/src/main/java/org/redisson/RedissonReactive.java
+++ b/src/main/java/org/redisson/RedissonReactive.java
@@ -80,6 +80,7 @@ import io.netty.util.concurrent.Future;
*/
public class RedissonReactive implements RedissonReactiveClient {
+ private final EvictionScheduler evictionScheduler;
private final CommandReactiveService commandExecutor;
private final ConnectionManager connectionManager;
private final Config config;
@@ -101,18 +102,18 @@ public class RedissonReactive implements RedissonReactiveClient {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
commandExecutor = new CommandReactiveService(connectionManager);
+ evictionScheduler = new EvictionScheduler(commandExecutor);
}
@Override
public RMapCacheReactive getMapCache(String name, Codec codec) {
- return new RedissonMapCacheReactive(codec, commandExecutor, name);
+ return new RedissonMapCacheReactive(codec, evictionScheduler, commandExecutor, name);
}
-
@Override
public RMapCacheReactive getMapCache(String name) {
- return new RedissonMapCacheReactive(commandExecutor, name);
+ return new RedissonMapCacheReactive(evictionScheduler, commandExecutor, name);
}
@Override
@@ -261,7 +262,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBatchReactive createBatch() {
- return new RedissonBatchReactive(connectionManager);
+ return new RedissonBatchReactive(evictionScheduler, connectionManager);
}
@Override
diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java
index 75ea79b56..02908f71b 100644
--- a/src/main/java/org/redisson/RedissonSetCache.java
+++ b/src/main/java/org/redisson/RedissonSetCache.java
@@ -56,7 +56,7 @@ import net.openhft.hashing.LongHashFunction;
* Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.
@@ -70,18 +70,21 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache<
private static final RedisCommand ADD_ALL = new RedisCommand("HMSET", new VoidReplayConvertor());
private static final RedisCommand EVAL_ADD = new RedisCommand("EVAL", new BooleanReplayConvertor(), 5);
- private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5);
private static final RedisCommand> EVAL_CONTAINS_KEY = new RedisCommand>("EVAL", new ObjectListReplayDecoder());
private static final RedisStrictCommand HDEL = new RedisStrictCommand("HDEL", new BooleanReplayConvertor());
- protected RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) {
+ private final EvictionScheduler evictionScheduler;
+
+ protected RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
- RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
+ this.evictionScheduler = evictionScheduler;
+ evictionScheduler.schedule(getName(), getTimeoutSetName());
}
- protected RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
+ protected RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
- RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
+ this.evictionScheduler = evictionScheduler;
+ evictionScheduler.schedule(getName(), getTimeoutSetName());
}
@Override
@@ -167,7 +170,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache<
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
result.setSuccess(nullValue);
- expireMap(currentDate);
+ evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return;
}
@@ -180,16 +183,6 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache<
});
}
- private void expireMap(long currentDate) {
- commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
- "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
- + "if #expiredKeys > 0 then "
- + "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
- + "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
- + "end;",
- Arrays.asList(getName(), getTimeoutSetName()), currentDate);
- }
-
ListScanResult scanIterator(InetSocketAddress client, long startPos) {
Future> f = commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_SSCAN,
"local result = {}; "
@@ -303,7 +296,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache<
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
- expireMap(currentDate);
+ evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
}
result.setSuccess((Collection) res.get(1));
diff --git a/src/main/java/org/redisson/api/RMapCacheReactive.java b/src/main/java/org/redisson/api/RMapCacheReactive.java
index 8ddf5fe99..6efc85273 100644
--- a/src/main/java/org/redisson/api/RMapCacheReactive.java
+++ b/src/main/java/org/redisson/api/RMapCacheReactive.java
@@ -28,7 +28,7 @@ import org.reactivestreams.Publisher;
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.
diff --git a/src/main/java/org/redisson/core/RMapCache.java b/src/main/java/org/redisson/core/RMapCache.java
index 12f177454..176c6765d 100644
--- a/src/main/java/org/redisson/core/RMapCache.java
+++ b/src/main/java/org/redisson/core/RMapCache.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.
diff --git a/src/main/java/org/redisson/core/RMapCacheAsync.java b/src/main/java/org/redisson/core/RMapCacheAsync.java
index 95d49f9ac..fae0bf58f 100644
--- a/src/main/java/org/redisson/core/RMapCacheAsync.java
+++ b/src/main/java/org/redisson/core/RMapCacheAsync.java
@@ -28,7 +28,7 @@ import io.netty.util.concurrent.Future;
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.
diff --git a/src/main/java/org/redisson/core/RSetCache.java b/src/main/java/org/redisson/core/RSetCache.java
index c1c5cfc7a..22965b5d8 100644
--- a/src/main/java/org/redisson/core/RSetCache.java
+++ b/src/main/java/org/redisson/core/RSetCache.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.
diff --git a/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/src/main/java/org/redisson/reactive/RedissonBatchReactive.java
index 0248156a1..3ce533000 100644
--- a/src/main/java/org/redisson/reactive/RedissonBatchReactive.java
+++ b/src/main/java/org/redisson/reactive/RedissonBatchReactive.java
@@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.util.List;
import org.reactivestreams.Publisher;
+import org.redisson.EvictionScheduler;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBitSetReactive;
@@ -41,9 +42,11 @@ import org.redisson.connection.ConnectionManager;
public class RedissonBatchReactive implements RBatchReactive {
+ private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
- public RedissonBatchReactive(ConnectionManager connectionManager) {
+ public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
+ this.evictionScheduler = evictionScheduler;
this.executorService = new CommandBatchService(connectionManager);
}
@@ -87,6 +90,16 @@ public class RedissonBatchReactive implements RBatchReactive {
return new RedissonMapReactive(codec, executorService, name);
}
+ @Override
+ public RMapCacheReactive getMapCache(String name, Codec codec) {
+ return new RedissonMapCacheReactive(codec, evictionScheduler, executorService, name);
+ }
+
+ @Override
+ public RMapCacheReactive getMapCache(String name) {
+ return new RedissonMapCacheReactive(evictionScheduler, executorService, name);
+ }
+
@Override
public RSetReactive getSet(String name) {
return new RedissonSetReactive(executorService, name);
@@ -177,14 +190,4 @@ public class RedissonBatchReactive implements RBatchReactive {
return new NettyFuturePublisher>(executorService.executeAsync());
}
- @Override
- public RMapCacheReactive getMapCache(String name, Codec codec) {
- return new RedissonMapCacheReactive(codec, executorService, name);
- }
-
- @Override
- public RMapCacheReactive getMapCache(String name) {
- return new RedissonMapCacheReactive(executorService, name);
- }
-
}
diff --git a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java
index 0dace7828..2f4f28190 100644
--- a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java
+++ b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
-import org.redisson.RedissonEvictionScheduler;
+import org.redisson.EvictionScheduler;
import org.redisson.api.RMapCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
@@ -58,7 +58,7 @@ import reactor.rx.action.support.DefaultSubscriber;
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
- * In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
+ * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.
*
* If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.
@@ -78,16 +78,19 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im
private static final RedisCommand> EVAL_CONTAINS_KEY = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), 5, ValueType.MAP_KEY);
private static final RedisCommand> EVAL_CONTAINS_VALUE = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), 5, ValueType.MAP_VALUE);
private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 5, ValueType.MAP_KEY);
- private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5);
- public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name) {
+ private final EvictionScheduler evictionScheduler;
+
+ public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
- RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
+ this.evictionScheduler = evictionScheduler;
+ evictionScheduler.schedule(getName(), getTimeoutSetName());
}
- public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
+ public RedissonMapCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
- RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
+ this.evictionScheduler = evictionScheduler;
+ evictionScheduler.schedule(getName(), getTimeoutSetName());
}
@Override
@@ -176,7 +179,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
- expireMap(currentDate);
+ evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
}
result.onNext((Map) res.get(1));
@@ -257,7 +260,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im
if (expireDate <= currentDate) {
result.onNext(nullValue);
result.onComplete();
- expireMap(currentDate);
+ evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return;
}
@@ -278,16 +281,6 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im
}
- private void expireMap(long currentDate) {
- commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
- "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
- + "if #expiredKeys > 0 then "
- + "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
- + "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
- + "end;",
- Arrays.asList(getName(), getTimeoutSetName()), currentDate);
- }
-
@Override
public Publisher put(K key, V value, long ttl, TimeUnit unit) {
if (unit == null) {
diff --git a/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java b/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java
index ea46509c1..b53ccf678 100644
--- a/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java
+++ b/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java
@@ -244,7 +244,7 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest {
@Test
public void testContainsValue() throws InterruptedException {
- RMapCacheReactive map = redisson.getMapCache("simple", new MsgPackJacksonCodec());
+ RMapCacheReactive map = redisson.getMapCache("simple31", new MsgPackJacksonCodec());
Assert.assertFalse(sync(map.containsValue(new SimpleValue("34"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS));
@@ -302,7 +302,7 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest {
@Test
public void testPutGet() throws InterruptedException {
- RMapCacheReactive map = redisson.getMapCache("simple", new MsgPackJacksonCodec());
+ RMapCacheReactive map = redisson.getMapCache("simple01", new MsgPackJacksonCodec());
Assert.assertNull(sync(map.get(new SimpleKey("33"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS));
diff --git a/src/test/java/org/redisson/RedissonMapCacheTest.java b/src/test/java/org/redisson/RedissonMapCacheTest.java
index 1b70e6263..2d0d019d6 100644
--- a/src/test/java/org/redisson/RedissonMapCacheTest.java
+++ b/src/test/java/org/redisson/RedissonMapCacheTest.java
@@ -266,7 +266,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test
public void testKeySet() throws InterruptedException {
- RMapCache map = redisson.getMapCache("simple");
+ RMapCache map = redisson.getMapCache("simple03");
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
map.put(new SimpleKey("1"), new SimpleValue("2"));
@@ -285,7 +285,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test
public void testValues() throws InterruptedException {
- RMapCache map = redisson.getMapCache("simple");
+ RMapCache map = redisson.getMapCache("simple05");
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
map.put(new SimpleKey("1"), new SimpleValue("2"));
@@ -303,7 +303,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test
public void testContainsValue() throws InterruptedException {
- RMapCache map = redisson.getMapCache("simple", new MsgPackJacksonCodec());
+ RMapCache map = redisson.getMapCache("simple01", new MsgPackJacksonCodec());
Assert.assertFalse(map.containsValue(new SimpleValue("34")));
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
@@ -319,7 +319,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test
public void testContainsKey() throws InterruptedException {
- RMapCache map = redisson.getMapCache("simple");
+ RMapCache map = redisson.getMapCache("simple30");
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
Assert.assertTrue(map.containsKey(new SimpleKey("33")));
@@ -432,7 +432,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test
public void testPutGet() throws InterruptedException {
- RMapCache map = redisson.getMapCache("simple", new MsgPackJacksonCodec());
+ RMapCache map = redisson.getMapCache("simple04", new MsgPackJacksonCodec());
Assert.assertNull(map.get(new SimpleKey("33")));
map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS);
diff --git a/src/test/java/org/redisson/RedissonMapReactiveTest.java b/src/test/java/org/redisson/RedissonMapReactiveTest.java
index e49f1854b..18e9c306b 100644
--- a/src/test/java/org/redisson/RedissonMapReactiveTest.java
+++ b/src/test/java/org/redisson/RedissonMapReactiveTest.java
@@ -9,13 +9,18 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
+import org.redisson.RedissonMapCacheTest.SimpleKey;
+import org.redisson.RedissonMapCacheTest.SimpleValue;
import org.redisson.api.RMapReactive;
+import org.redisson.codec.MsgPackJacksonCodec;
+import org.redisson.core.RMapCache;
import reactor.rx.Streams;
@@ -512,24 +517,6 @@ public class RedissonMapReactiveTest extends BaseReactiveTest {
Assert.assertEquals(1, sync(map.size()).intValue());
}
-// @Test(timeout = 5000)
-// public void testDeserializationErrorReturnsErrorImmediately() throws Exception {
-// redisson.getConfig().setCodec(new JsonJacksonCodec());
-//
-// RMap map = redisson.getMap("deserializationFailure");
-// SimpleObjectWithoutDefaultConstructor object = new SimpleObjectWithoutDefaultConstructor("test-val");
-//
-// Assert.assertEquals("test-val", object.getTestField());
-// map.put("test-key", object);
-//
-// try {
-// map.get("test-key");
-// Assert.fail("Expected exception from map.get() call");
-// } catch (Exception e) {
-// e.printStackTrace();
-// }
-// }
-
public static class SimpleObjectWithoutDefaultConstructor {
private String testField;
diff --git a/src/test/java/org/redisson/RedissonSetCacheTest.java b/src/test/java/org/redisson/RedissonSetCacheTest.java
index 7cfd08680..60c19215d 100644
--- a/src/test/java/org/redisson/RedissonSetCacheTest.java
+++ b/src/test/java/org/redisson/RedissonSetCacheTest.java
@@ -43,7 +43,7 @@ public class RedissonSetCacheTest extends BaseTest {
@Test
public void testAddExpire() throws InterruptedException, ExecutionException {
- RSetCache set = redisson.getSetCache("simple");
+ RSetCache set = redisson.getSetCache("simple3");
set.add("123", 1, TimeUnit.SECONDS);
Assert.assertThat(set, Matchers.contains("123"));
@@ -56,6 +56,27 @@ public class RedissonSetCacheTest extends BaseTest {
Assert.assertEquals(0, set.size());
}
+ @Test
+ public void testAddExpireTwise() throws InterruptedException, ExecutionException {
+ RSetCache set = redisson.getSetCache("simple31");
+ set.add("123", 1, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ Assert.assertFalse(set.contains("123"));
+
+ Thread.sleep(50);
+
+ Assert.assertEquals(0, set.size());
+
+ set.add("4341", 1, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ Assert.assertFalse(set.contains("4341"));
+
+ // can't be evicted due to 1 sec delay
+ Assert.assertEquals(1, set.size());
+ }
+
@Test
public void testExpireOverwrite() throws InterruptedException, ExecutionException {
RSetCache set = redisson.getSetCache("simple");