Expired values clean task "flooding" with many requests amount

pull/337/head
Nikita 9 years ago
parent c96776c551
commit 339c6e9154

@ -18,12 +18,15 @@ package org.redisson;
import java.util.Arrays; import java.util.Arrays;
import java.util.Deque; import java.util.Deque;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
@ -38,9 +41,9 @@ import io.netty.util.internal.PlatformDependent;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class RedissonEvictionScheduler { public class EvictionScheduler {
public static final RedissonEvictionScheduler INSTANCE = new RedissonEvictionScheduler(); private static final Logger log = LoggerFactory.getLogger(RedissonSetCache.class);
public static class RedissonCacheTask implements Runnable { public static class RedissonCacheTask implements Runnable {
@ -117,12 +120,18 @@ public class RedissonEvictionScheduler {
} }
private RedissonEvictionScheduler() {
}
private final ConcurrentMap<String, RedissonCacheTask> tasks = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<String, RedissonCacheTask> tasks = PlatformDependent.newConcurrentHashMap();
private final CommandAsyncExecutor executor;
private final Map<String, Long> lastExpiredTime = PlatformDependent.newConcurrentHashMap();
private final int expireTaskExecutionDelay = 1000;
private final int valuesAmountToClean = 100;
public EvictionScheduler(CommandAsyncExecutor executor) {
this.executor = executor;
}
public void schedule(String name, String timeoutSetName, CommandAsyncExecutor executor) { public void schedule(String name, String timeoutSetName) {
RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName, executor); RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName, executor);
RedissonCacheTask prevTask = tasks.putIfAbsent(name, task); RedissonCacheTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) { if (prevTask == null) {
@ -130,4 +139,49 @@ public class RedissonEvictionScheduler {
} }
} }
public void runCleanTask(final String name, String timeoutSetName, long currentDate) {
final Long lastExpired = lastExpiredTime.get(name);
long now = System.currentTimeMillis();
if (lastExpired == null) {
if (lastExpiredTime.putIfAbsent(name, now) != null) {
return;
}
} else if (lastExpired + expireTaskExecutionDelay >= now) {
if (!lastExpiredTime.replace(name, lastExpired, now)) {
return;
}
} else {
return;
}
Future<Long> future = executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredKeys > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end;"
+ "return #expiredKeys;",
Arrays.<Object>asList(name, timeoutSetName), currentDate, valuesAmountToClean);
future.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
executor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
lastExpiredTime.remove(name, lastExpired);
}
}, expireTaskExecutionDelay*3, TimeUnit.SECONDS);
if (!future.isSuccess()) {
log.warn("Can't execute clean task for expired values. RSetCache name: " + name, future.cause());
return;
}
}
});
}
} }

@ -69,6 +69,7 @@ import io.netty.util.concurrent.Future;
*/ */
public class Redisson implements RedissonClient { public class Redisson implements RedissonClient {
private final EvictionScheduler evictionScheduler;
private final CommandExecutor commandExecutor; private final CommandExecutor commandExecutor;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Config config; private final Config config;
@ -92,6 +93,7 @@ public class Redisson implements RedissonClient {
throw new IllegalArgumentException("server(s) address(es) not defined!"); throw new IllegalArgumentException("server(s) address(es) not defined!");
} }
commandExecutor = new CommandSyncService(connectionManager); commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
} }
@ -193,22 +195,22 @@ public class Redisson implements RedissonClient {
@Override @Override
public <V> RSetCache<V> getSetCache(String name) { public <V> RSetCache<V> getSetCache(String name) {
return new RedissonSetCache<V>(commandExecutor, name); return new RedissonSetCache<V>(evictionScheduler, commandExecutor, name);
} }
@Override @Override
public <V> RSetCache<V> getSetCache(String name, Codec codec) { public <V> RSetCache<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, commandExecutor, name); return new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name);
} }
@Override @Override
public <K, V> RMapCache<K, V> getMapCache(String name) { public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(commandExecutor, name); return new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name);
} }
@Override @Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) { public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, commandExecutor, name); return new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name);
} }
@Override @Override
@ -338,7 +340,7 @@ public class Redisson implements RedissonClient {
@Override @Override
public RBatch createBatch() { public RBatch createBatch() {
return new RedissonBatch(connectionManager); return new RedissonBatch(evictionScheduler, connectionManager);
} }
@Override @Override

@ -49,10 +49,12 @@ import io.netty.util.concurrent.Future;
*/ */
public class RedissonBatch implements RBatch { public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService; private final CommandBatchService executorService;
public RedissonBatch(ConnectionManager connectionManager) { public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager); this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
} }
@Override @Override
@ -172,12 +174,12 @@ public class RedissonBatch implements RBatch {
@Override @Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) { public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, executorService, name); return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name);
} }
@Override @Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) { public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(executorService, name); return new RedissonMapCache<K, V>(evictionScheduler, executorService, name);
} }
@Override @Override
@ -192,12 +194,12 @@ public class RedissonBatch implements RBatch {
@Override @Override
public <V> RSetCacheAsync<V> getSetCache(String name) { public <V> RSetCacheAsync<V> getSetCache(String name) {
return new RedissonSetCache<V>(executorService, name); return new RedissonSetCache<V>(evictionScheduler, executorService, name);
} }
@Override @Override
public <V> RSetCacheAsync<V> getSetCache(String name, Codec codec) { public <V> RSetCacheAsync<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, executorService, name); return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name);
} }
@Override @Override

@ -55,7 +55,7 @@ import io.netty.util.concurrent.Promise;
* Thus entries are checked for TTL expiration during any key/value/entry read operation. * Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>
@ -75,16 +75,19 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY); private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_VALUE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_VALUE); private static final RedisCommand<List<Object>> EVAL_CONTAINS_VALUE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY); private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
protected RedissonMapCache(CommandAsyncExecutor commandExecutor, String name) { private final EvictionScheduler evictionScheduler;
protected RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
public RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
@Override @Override
@ -173,7 +176,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
Long expireDate = (Long) res.get(0); Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis(); long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) { if (expireDate <= currentDate) {
expireMap(currentDate); evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
} }
result.setSuccess((Map<K, V>) res.get(1)); result.setSuccess((Map<K, V>) res.get(1));
@ -251,7 +254,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
long currentDate = System.currentTimeMillis(); long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) { if (expireDate <= currentDate) {
result.setSuccess(nullValue); result.setSuccess(nullValue);
expireMap(currentDate); evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return; return;
} }
@ -264,16 +267,6 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}); });
} }
private void expireMap(long currentDate) {
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
+ "if #expiredKeys > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}
@Override @Override
public V put(K key, V value, long ttl, TimeUnit unit) { public V put(K key, V value, long ttl, TimeUnit unit) {
return get(putAsync(key, value, ttl, unit)); return get(putAsync(key, value, ttl, unit));

@ -80,6 +80,7 @@ import io.netty.util.concurrent.Future;
*/ */
public class RedissonReactive implements RedissonReactiveClient { public class RedissonReactive implements RedissonReactiveClient {
private final EvictionScheduler evictionScheduler;
private final CommandReactiveService commandExecutor; private final CommandReactiveService commandExecutor;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Config config; private final Config config;
@ -101,18 +102,18 @@ public class RedissonReactive implements RedissonReactiveClient {
throw new IllegalArgumentException("server(s) address(es) not defined!"); throw new IllegalArgumentException("server(s) address(es) not defined!");
} }
commandExecutor = new CommandReactiveService(connectionManager); commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
} }
@Override @Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) { public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(codec, commandExecutor, name); return new RedissonMapCacheReactive<K, V>(codec, evictionScheduler, commandExecutor, name);
} }
@Override @Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) { public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(commandExecutor, name); return new RedissonMapCacheReactive<K, V>(evictionScheduler, commandExecutor, name);
} }
@Override @Override
@ -261,7 +262,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override @Override
public RBatchReactive createBatch() { public RBatchReactive createBatch() {
return new RedissonBatchReactive(connectionManager); return new RedissonBatchReactive(evictionScheduler, connectionManager);
} }
@Override @Override

@ -56,7 +56,7 @@ import net.openhft.hashing.LongHashFunction;
* Thus values are checked for TTL expiration during any value read operation. * Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs asynchronous. * If entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p>
@ -70,18 +70,21 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
private static final RedisCommand<Void> ADD_ALL = new RedisCommand<Void>("HMSET", new VoidReplayConvertor()); private static final RedisCommand<Void> ADD_ALL = new RedisCommand<Void>("HMSET", new VoidReplayConvertor());
private static final RedisCommand<Boolean> EVAL_ADD = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5); private static final RedisCommand<Boolean> EVAL_ADD = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>()); private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
private static final RedisStrictCommand<Boolean> HDEL = new RedisStrictCommand<Boolean>("HDEL", new BooleanReplayConvertor()); private static final RedisStrictCommand<Boolean> HDEL = new RedisStrictCommand<Boolean>("HDEL", new BooleanReplayConvertor());
protected RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) { private final EvictionScheduler evictionScheduler;
protected RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
protected RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { protected RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
@Override @Override
@ -167,7 +170,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
long currentDate = System.currentTimeMillis(); long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) { if (expireDate <= currentDate) {
result.setSuccess(nullValue); result.setSuccess(nullValue);
expireMap(currentDate); evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return; return;
} }
@ -180,16 +183,6 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}); });
} }
private void expireMap(long currentDate) {
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
+ "if #expiredKeys > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}
ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) { ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
Future<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_SSCAN, Future<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_SSCAN,
"local result = {}; " "local result = {}; "
@ -303,7 +296,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
Long expireDate = (Long) res.get(0); Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis(); long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) { if (expireDate <= currentDate) {
expireMap(currentDate); evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
} }
result.setSuccess((Collection<V>) res.get(1)); result.setSuccess((Collection<V>) res.get(1));

@ -28,7 +28,7 @@ import org.reactivestreams.Publisher;
* Thus entries are checked for TTL expiration during any key/value/entry read operation. * Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* Thus entries are checked for TTL expiration during any key/value/entry read operation. * Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>

@ -28,7 +28,7 @@ import io.netty.util.concurrent.Future;
* Thus entries are checked for TTL expiration during any key/value/entry read operation. * Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>

@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* Thus values are checked for TTL expiration during any value read operation. * Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs asynchronous. * If entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p>

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.util.List; import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.EvictionScheduler;
import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive; import org.redisson.api.RBatchReactive;
import org.redisson.api.RBitSetReactive; import org.redisson.api.RBitSetReactive;
@ -41,9 +42,11 @@ import org.redisson.connection.ConnectionManager;
public class RedissonBatchReactive implements RBatchReactive { public class RedissonBatchReactive implements RBatchReactive {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService; private final CommandBatchService executorService;
public RedissonBatchReactive(ConnectionManager connectionManager) { public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandBatchService(connectionManager); this.executorService = new CommandBatchService(connectionManager);
} }
@ -87,6 +90,16 @@ public class RedissonBatchReactive implements RBatchReactive {
return new RedissonMapReactive<K, V>(codec, executorService, name); return new RedissonMapReactive<K, V>(codec, executorService, name);
} }
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(codec, evictionScheduler, executorService, name);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(evictionScheduler, executorService, name);
}
@Override @Override
public <V> RSetReactive<V> getSet(String name) { public <V> RSetReactive<V> getSet(String name) {
return new RedissonSetReactive<V>(executorService, name); return new RedissonSetReactive<V>(executorService, name);
@ -177,14 +190,4 @@ public class RedissonBatchReactive implements RBatchReactive {
return new NettyFuturePublisher<List<?>>(executorService.executeAsync()); return new NettyFuturePublisher<List<?>>(executorService.executeAsync());
} }
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(codec, executorService, name);
}
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(executorService, name);
}
} }

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.redisson.RedissonEvictionScheduler; import org.redisson.EvictionScheduler;
import org.redisson.api.RMapCacheReactive; import org.redisson.api.RMapCacheReactive;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
@ -58,7 +58,7 @@ import reactor.rx.action.support.DefaultSubscriber;
* Thus entries are checked for TTL expiration during any key/value/entry read operation. * Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous. * If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once. * Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p> * deletes expired entries in time interval between 5 seconds to 2 hours.</p>
* *
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p> * <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>
@ -78,16 +78,19 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY); private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_VALUE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_VALUE); private static final RedisCommand<List<Object>> EVAL_CONTAINS_VALUE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY); private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name) { private final EvictionScheduler evictionScheduler;
public RedissonMapCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name); super(commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { public RedissonMapCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor); this.evictionScheduler = evictionScheduler;
evictionScheduler.schedule(getName(), getTimeoutSetName());
} }
@Override @Override
@ -176,7 +179,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
Long expireDate = (Long) res.get(0); Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis(); long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) { if (expireDate <= currentDate) {
expireMap(currentDate); evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
} }
result.onNext((Map<K, V>) res.get(1)); result.onNext((Map<K, V>) res.get(1));
@ -257,7 +260,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
if (expireDate <= currentDate) { if (expireDate <= currentDate) {
result.onNext(nullValue); result.onNext(nullValue);
result.onComplete(); result.onComplete();
expireMap(currentDate); evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate);
return; return;
} }
@ -278,16 +281,6 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
} }
private void expireMap(long currentDate) {
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
+ "if #expiredKeys > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}
@Override @Override
public Publisher<V> put(K key, V value, long ttl, TimeUnit unit) { public Publisher<V> put(K key, V value, long ttl, TimeUnit unit) {
if (unit == null) { if (unit == null) {

@ -244,7 +244,7 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest {
@Test @Test
public void testContainsValue() throws InterruptedException { public void testContainsValue() throws InterruptedException {
RMapCacheReactive<SimpleKey, SimpleValue> map = redisson.getMapCache("simple", new MsgPackJacksonCodec()); RMapCacheReactive<SimpleKey, SimpleValue> map = redisson.getMapCache("simple31", new MsgPackJacksonCodec());
Assert.assertFalse(sync(map.containsValue(new SimpleValue("34")))); Assert.assertFalse(sync(map.containsValue(new SimpleValue("34"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS)); sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS));
@ -302,7 +302,7 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest {
@Test @Test
public void testPutGet() throws InterruptedException { public void testPutGet() throws InterruptedException {
RMapCacheReactive<SimpleKey, SimpleValue> map = redisson.getMapCache("simple", new MsgPackJacksonCodec()); RMapCacheReactive<SimpleKey, SimpleValue> map = redisson.getMapCache("simple01", new MsgPackJacksonCodec());
Assert.assertNull(sync(map.get(new SimpleKey("33")))); Assert.assertNull(sync(map.get(new SimpleKey("33"))));
sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS)); sync(map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS));

@ -266,7 +266,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test @Test
public void testKeySet() throws InterruptedException { public void testKeySet() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple"); RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple03");
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS); map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
map.put(new SimpleKey("1"), new SimpleValue("2")); map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -285,7 +285,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test @Test
public void testValues() throws InterruptedException { public void testValues() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple"); RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple05");
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS); map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
map.put(new SimpleKey("1"), new SimpleValue("2")); map.put(new SimpleKey("1"), new SimpleValue("2"));
@ -303,7 +303,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test @Test
public void testContainsValue() throws InterruptedException { public void testContainsValue() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple", new MsgPackJacksonCodec()); RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple01", new MsgPackJacksonCodec());
Assert.assertFalse(map.containsValue(new SimpleValue("34"))); Assert.assertFalse(map.containsValue(new SimpleValue("34")));
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS); map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
@ -319,7 +319,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test @Test
public void testContainsKey() throws InterruptedException { public void testContainsKey() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple"); RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple30");
map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS); map.put(new SimpleKey("33"), new SimpleValue("44"), 1, TimeUnit.SECONDS);
Assert.assertTrue(map.containsKey(new SimpleKey("33"))); Assert.assertTrue(map.containsKey(new SimpleKey("33")));
@ -432,7 +432,7 @@ public class RedissonMapCacheTest extends BaseTest {
@Test @Test
public void testPutGet() throws InterruptedException { public void testPutGet() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple", new MsgPackJacksonCodec()); RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple04", new MsgPackJacksonCodec());
Assert.assertNull(map.get(new SimpleKey("33"))); Assert.assertNull(map.get(new SimpleKey("33")));
map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS); map.put(new SimpleKey("33"), new SimpleValue("44"), 2, TimeUnit.SECONDS);

@ -9,13 +9,18 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCacheTest.SimpleKey;
import org.redisson.RedissonMapCacheTest.SimpleValue;
import org.redisson.api.RMapReactive; import org.redisson.api.RMapReactive;
import org.redisson.codec.MsgPackJacksonCodec;
import org.redisson.core.RMapCache;
import reactor.rx.Streams; import reactor.rx.Streams;
@ -512,24 +517,6 @@ public class RedissonMapReactiveTest extends BaseReactiveTest {
Assert.assertEquals(1, sync(map.size()).intValue()); Assert.assertEquals(1, sync(map.size()).intValue());
} }
// @Test(timeout = 5000)
// public void testDeserializationErrorReturnsErrorImmediately() throws Exception {
// redisson.getConfig().setCodec(new JsonJacksonCodec());
//
// RMap<String, SimpleObjectWithoutDefaultConstructor> map = redisson.getMap("deserializationFailure");
// SimpleObjectWithoutDefaultConstructor object = new SimpleObjectWithoutDefaultConstructor("test-val");
//
// Assert.assertEquals("test-val", object.getTestField());
// map.put("test-key", object);
//
// try {
// map.get("test-key");
// Assert.fail("Expected exception from map.get() call");
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
public static class SimpleObjectWithoutDefaultConstructor { public static class SimpleObjectWithoutDefaultConstructor {
private String testField; private String testField;

@ -43,7 +43,7 @@ public class RedissonSetCacheTest extends BaseTest {
@Test @Test
public void testAddExpire() throws InterruptedException, ExecutionException { public void testAddExpire() throws InterruptedException, ExecutionException {
RSetCache<String> set = redisson.getSetCache("simple"); RSetCache<String> set = redisson.getSetCache("simple3");
set.add("123", 1, TimeUnit.SECONDS); set.add("123", 1, TimeUnit.SECONDS);
Assert.assertThat(set, Matchers.contains("123")); Assert.assertThat(set, Matchers.contains("123"));
@ -56,6 +56,27 @@ public class RedissonSetCacheTest extends BaseTest {
Assert.assertEquals(0, set.size()); Assert.assertEquals(0, set.size());
} }
@Test
public void testAddExpireTwise() throws InterruptedException, ExecutionException {
RSetCache<String> set = redisson.getSetCache("simple31");
set.add("123", 1, TimeUnit.SECONDS);
Thread.sleep(1000);
Assert.assertFalse(set.contains("123"));
Thread.sleep(50);
Assert.assertEquals(0, set.size());
set.add("4341", 1, TimeUnit.SECONDS);
Thread.sleep(1000);
Assert.assertFalse(set.contains("4341"));
// can't be evicted due to 1 sec delay
Assert.assertEquals(1, set.size());
}
@Test @Test
public void testExpireOverwrite() throws InterruptedException, ExecutionException { public void testExpireOverwrite() throws InterruptedException, ExecutionException {
RSetCache<String> set = redisson.getSetCache("simple"); RSetCache<String> set = redisson.getSetCache("simple");

Loading…
Cancel
Save