RSetCache with TTL added. #319

pull/337/head
Nikita 9 years ago
parent 02ea5da718
commit 8b080e4be0

@ -19,7 +19,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
@ -40,7 +39,6 @@ import org.redisson.core.RBatch;
import org.redisson.core.RBitSet;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBucket;
import org.redisson.core.RMapCache;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
@ -49,12 +47,14 @@ import org.redisson.core.RLexSortedSet;
import org.redisson.core.RList;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
import org.redisson.core.RSetCache;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
@ -143,11 +143,6 @@ public class Redisson implements RedissonClient {
return new RedissonReactive(config);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(commandExecutor, name, id);
}
@Override
public <V> RBucket<V> getBucket(String name) {
return new RedissonBucket<V>(commandExecutor, name);
@ -196,6 +191,16 @@ public class Redisson implements RedissonClient {
return new RedissonMap<K, V>(commandExecutor, name);
}
@Override
public <V> RSetCache<V> getSetCache(String name) {
return new RedissonSetCache<V>(commandExecutor, name);
}
@Override
public <V> RSetCache<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, commandExecutor, name);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(commandExecutor, name);
@ -216,6 +221,11 @@ public class Redisson implements RedissonClient {
return new RedissonLock(commandExecutor, name, id);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(commandExecutor, name, id);
}
@Override
public <V> RSet<V> getSet(String name) {
return new RedissonSet<V>(commandExecutor, name);
@ -326,6 +336,11 @@ public class Redisson implements RedissonClient {
return new RedissonKeys(commandExecutor);
}
@Override
public RBatch createBatch() {
return new RedissonBatch(connectionManager);
}
@Override
public void shutdown() {
connectionManager.shutdown();
@ -357,10 +372,5 @@ public class Redisson implements RedissonClient {
commandExecutor.get(commandExecutor.writeAllAsync(RedisCommands.FLUSHALL));
}
@Override
public RBatch createBatch() {
return new RedissonBatch(connectionManager);
}
}

@ -36,10 +36,17 @@ import org.redisson.core.RQueueAsync;
import org.redisson.core.RScoredSortedSetAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RSetCacheAsync;
import org.redisson.core.RTopicAsync;
import io.netty.util.concurrent.Future;
/**
*
*
* @author Nikita Koksharov
*
*/
public class RedissonBatch implements RBatch {
private final CommandBatchService executorService;
@ -183,6 +190,16 @@ public class RedissonBatch implements RBatch {
return new RedissonKeys(executorService);
}
@Override
public <V> RSetCacheAsync<V> getSetCache(String name) {
return new RedissonSetCache<V>(executorService, name);
}
@Override
public <V> RSetCacheAsync<V> getSetCache(String name, Codec codec) {
return new RedissonSetCache<V>(codec, executorService, name);
}
@Override
public List<?> execute() {
return executorService.execute();
@ -193,4 +210,5 @@ public class RedissonBatch implements RBatch {
return executorService.executeAsync();
}
}

@ -41,6 +41,7 @@ import org.redisson.core.RReadWriteLock;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
import org.redisson.core.RSetCache;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
@ -54,12 +55,28 @@ import org.redisson.core.RTopic;
public interface RedissonClient {
/**
* Returns readWriteLock instance by name.
* Returns set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
RReadWriteLock getReadWriteLock(String name);
<V> RSetCache<V> getSetCache(String name);
/**
* Returns set-based cache instance by <code>name</code>.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCache<V> getSetCache(String name, Codec codec);
/**
* Returns map-based cache instance by <code>name</code>
@ -170,6 +187,14 @@ public interface RedissonClient {
*/
RLock getLock(String name);
/**
* Returns readWriteLock instance by name.
*
* @param name
* @return
*/
RReadWriteLock getReadWriteLock(String name);
/**
* Returns set instance by name.
*

@ -38,7 +38,9 @@ import io.netty.util.internal.PlatformDependent;
* @author Nikita Koksharov
*
*/
public class RedissonCacheEvictionScheduler {
public class RedissonEvictionScheduler {
public static final RedissonEvictionScheduler INSTANCE = new RedissonEvictionScheduler();
public static class RedissonCacheTask implements Runnable {
@ -86,7 +88,7 @@ public class RedissonCacheEvictionScheduler {
if (sizeHistory.size() == 2) {
if (sizeHistory.peekFirst() > sizeHistory.peekLast()
&& sizeHistory.peekLast() > size) {
delay = Math.min(maxDelay, delay*2);
delay = Math.min(maxDelay, (int)(delay*1.5));
}
// if (sizeHistory.peekFirst() < sizeHistory.peekLast()
@ -97,10 +99,10 @@ public class RedissonCacheEvictionScheduler {
if (sizeHistory.peekFirst() == sizeHistory.peekLast()
&& sizeHistory.peekLast() == size) {
if (size == keysLimit) {
delay = Math.max(minDelay, delay/2);
delay = Math.max(minDelay, delay/4);
}
if (size == 0) {
delay = Math.min(maxDelay, delay*2);
delay = Math.min(maxDelay, (int)(delay*1.5));
}
}
@ -115,6 +117,9 @@ public class RedissonCacheEvictionScheduler {
}
private RedissonEvictionScheduler() {
}
private final ConcurrentMap<String, RedissonCacheTask> tasks = PlatformDependent.newConcurrentHashMap();
public void schedule(String name, String timeoutSetName, CommandAsyncExecutor executor) {

@ -291,7 +291,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public Future<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) {
return commandExecutor.getConnectionManager().getGroup().next().newSucceededFuture(0L);
return newSucceededFuture(0L);
}
List<Object> args = new ArrayList<Object>(keys.length + 1);

@ -48,14 +48,14 @@ import io.netty.util.concurrent.Promise;
/**
* <p>Map-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)}
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} methods.
* And therefore has an complex lua-scripts inside.</p>
*
* <p>Current redis implementation doesnt have eviction functionality.
* <p>Current redis implementation doesnt have map entry eviction functionality.
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>
@ -77,16 +77,14 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
private static final RedissonCacheEvictionScheduler SCHEDULER = new RedissonCacheEvictionScheduler();
protected RedissonMapCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
}
public RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
}
@Override

@ -24,9 +24,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RSet;
@ -42,8 +40,6 @@ import io.netty.util.concurrent.Future;
*/
public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<Boolean> EVAL_OBJECTS = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4);
protected RedissonSet(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
@ -200,7 +196,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_OBJECTS,
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local s = redis.call('smembers', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
@ -236,7 +232,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[1]) "
+ "local i = 0 "
@ -261,7 +257,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECTS,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[1], ARGV[i]) == 1 "

@ -0,0 +1,513 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RSetCache;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import net.openhft.hashing.LongHashFunction;
/**
* <p>Set-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} method.
* And therefore has an complex lua-scripts inside.
* Uses map (value_hash, value) to tie with expiration sorted set under the hood.
* </p>
*
* <p>Current Redis implementation doesn't have set entry eviction functionality.
* Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p>
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V> {
private static final RedisCommand<Boolean> EVAL_ADD = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5);
private static final RedisCommand<Boolean> EVAL_ADD_TTL = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7);
private static final RedisCommand<Boolean> EVAL_OBJECTS = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
private static final RedisStrictCommand<Boolean> HDEL = new RedisStrictCommand<Boolean>("HDEL", new BooleanReplayConvertor());
protected RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
}
protected RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean contains(Object o) {
return get(containsAsync(o));
}
private byte[] hash(Object o) {
if (o == null) {
throw new NullPointerException("Value can't be null");
}
try {
byte[] objectState = codec.getValueEncoder().encode(o);
long h1 = LongHashFunction.farmUo().hashBytes(objectState);
long h2 = LongHashFunction.xx_r39().hashBytes(objectState);
return ByteBuffer.allocate((2 * Long.SIZE) / Byte.SIZE)
.putLong(h1)
.putLong(h2)
.array();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
String getTimeoutSetName() {
return "redisson__timeout__set__{" + getName() + "}";
}
@Override
public Future<Boolean> containsAsync(Object o) {
Promise<Boolean> result = newPromise();
byte[] key = hash(o);
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_KEY,
"local value = redis.call('hexists', KEYS[1], KEYS[3]); " +
"local expireDate = 92233720368547758; " +
"if value == 1 then " +
"local expireDateScore = redis.call('zscore', KEYS[2], KEYS[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; " +
"end;" +
"return {expireDate, value}; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), key));
addExpireListener(result, future, new BooleanReplayConvertor(), false);
return result;
}
private <T> void addExpireListener(final Promise<T> result, Future<List<Object>> future, final Convertor<T> convertor, final T nullValue) {
future.addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
List<Object> res = future.getNow();
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
result.setSuccess(nullValue);
expireMap(currentDate);
return;
}
if (convertor != null) {
result.setSuccess((T) convertor.convert(res.get(1)));
} else {
result.setSuccess((T) res.get(1));
}
}
});
}
private void expireMap(long currentDate) {
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 100); "
+ "if #expiredKeys > 0 then "
+ "local s = redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}
ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
Future<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_SSCAN,
"local result = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then "
+ "local key = res[2][i-1]; "
+ "local expireDate = redis.call('zscore', KEYS[2], key); "
+ "if (expireDate == false) or (expireDate ~= false and expireDate > ARGV[2]) then "
+ "table.insert(result, value); "
+ "end; "
+ "end; "
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName(), getTimeoutSetName()), startPos, System.currentTimeMillis());
return get(f);
}
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
RedissonSetCache.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
};
}
private Future<Collection<V>> readAllAsync() {
final Promise<Collection<V>> result = newPromise();
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>()),
"local keys = redis.call('hkeys', KEYS[1]);" +
"local expireSize = redis.call('zcard', KEYS[2]); " +
"local maxDate = ARGV[1]; " +
"local minExpireDate = 92233720368547758;" +
"if expireSize > 0 then " +
"for i, key in pairs(keys) do " +
"local expireDate = redis.call('zscore', KEYS[2], key); " +
"if expireDate ~= false and expireDate <= maxDate then " +
"minExpireDate = math.min(tonumber(expireDate), minExpireDate); " +
"table.remove(keys, i); " +
"end;" +
"end;" +
"end; " +
"return {minExpireDate, redis.call('hmget', KEYS[1], unpack(keys))};",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
future.addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
List<Object> res = future.getNow();
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
expireMap(currentDate);
}
result.setSuccess((Collection<V>) res.get(1));
}
});
return result;
}
@Override
public Object[] toArray() {
List<Object> res = (List<Object>) get(readAllAsync());
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<Object> res = (List<Object>) get(readAllAsync());
return res.toArray(a);
}
@Override
public boolean add(V e) {
return get(addAsync(e));
}
@Override
public boolean add(V value, long ttl, TimeUnit unit) {
return get(addAsync(value, ttl, unit));
}
@Override
public Future<Boolean> addAsync(V value, long ttl, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("TimeUnit param can't be null");
}
byte[] key = hash(value);
long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_ADD_TTL,
"redis.call('zadd', KEYS[2], ARGV[1], KEYS[3]); " +
"if redis.call('hexists', KEYS[1], KEYS[3]) == 0 then " +
"redis.call('hset', KEYS[1], KEYS[3], ARGV[2]); " +
"return 1; " +
"end;" +
"return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), key), timeoutDate, value);
}
@Override
public Future<Boolean> addAsync(V e) {
byte[] key = hash(e);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_ADD,
"if redis.call('hexists', KEYS[1], KEYS[2]) == 0 then " +
"redis.call('hset', KEYS[1], KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0; ",
Arrays.<Object>asList(getName(), key), e);
}
@Override
public Future<Boolean> removeAsync(Object o) {
byte[] key = hash(o);
return commandExecutor.writeAsync(getName(), codec, HDEL, getName(), key);
}
@Override
public boolean remove(Object value) {
return get(removeAsync((V)value));
}
@Override
public boolean containsAll(Collection<?> c) {
return get(containsAllAsync(c));
}
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_OBJECTS,
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] then "
+ "table.remove(ARGV, j) "
+ "end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0 and 1 or 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean addAll(Collection<? extends V> c) {
return get(addAllAsync(c));
}
@Override
public Future<Boolean> addAllAsync(Collection<? extends V> c) {
if (c.isEmpty()) {
return newSucceededFuture(false);
}
List<ValueType> inParamTypes = new ArrayList<ValueType>(c.size()*2);
List<Object> params = new ArrayList<Object>(c.size()*2 + 1);
params.add(getName());
for (V value : c) {
inParamTypes.add(ValueType.BINARY);
inParamTypes.add(ValueType.OBJECTS);
params.add(hash(value));
params.add(value);
}
RedisCommand<Void> command = new RedisCommand<Void>("HMSET", new VoidReplayConvertor(), 2, inParamTypes);
return commandExecutor.writeAsync(getName(), codec, command, params.toArray());
}
@Override
public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
List<byte[]> hashes = new ArrayList<byte[]>(c.size());
for (Object object : c) {
hashes.add(hash(object));
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local keys = redis.call('hkeys', KEYS[1]); " +
"local i=1;" +
"while i <= #keys do "
+ "local changed = false;"
+ "local element = keys[i];"
+ "for j, argElement in pairs(ARGV) do "
+ "if argElement == element then "
+ "changed = true;"
+ "table.remove(keys, i); "
+ "table.remove(ARGV, j); "
+ "break; "
+ "end; "
+ "end; " +
"if changed == false then " +
"i = i + 1 " +
"end " +
"end " +
"if #keys > 0 then "
+ "for i=1, table.getn(keys),5000 do "
+ "redis.call('hdel', KEYS[1], unpack(keys, i, math.min(i+4999, table.getn(keys)))); "
+ "redis.call('zrem', KEYS[2], unpack(keys, i, math.min(i+4999, table.getn(keys)))); "
+ "end "
+ "return 1;"
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), hashes.toArray());
}
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
List<Object> hashes = new ArrayList<Object>(c.size()+1);
hashes.add(getName());
for (Object object : c) {
hashes.add(hash(object));
}
return commandExecutor.writeAsync(getName(), codec, HDEL, hashes.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return get(removeAllAsync(c));
}
@Override
public void clear() {
delete();
}
@Override
public Future<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName());
}
@Override
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpire', KEYS[2], ARGV[1]); "
+ "return redis.call('pexpire', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive));
}
@Override
public Future<Boolean> expireAtAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('pexpireat', KEYS[2], ARGV[1]); "
+ "return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timestamp);
}
@Override
public Future<Boolean> clearExpireAsync() {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('persist', KEYS[2]); "
+ "return redis.call('persist', KEYS[1]); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()));
}
}

@ -23,10 +23,9 @@ import org.redisson.client.codec.Codec;
/**
* Interface for using pipeline feature.
*
* All methods invocations via Reactive objects
* which have gotten from this interface are batched
* to separate queue and could be executed later
* with <code>execute()</code> or <code>executeReactive()</code> methods.
* All method invocations on objects
* from this interface are batched to separate queue and could be executed later
* with <code>execute()</code> method.
*
*
* @author Nikita Koksharov
@ -201,6 +200,6 @@ public interface RBatchReactive {
*
* @return List with result object for each command
*/
Publisher<List<?>> executeReactive();
Publisher<List<?>> execute();
}

@ -21,14 +21,14 @@ import org.reactivestreams.Publisher;
/**
* <p>Map-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)}
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} method.
* And therefore has an complex lua-scripts inside.</p>
*
* <p>Current redis implementation doesnt have eviction functionality.
* <p>Current redis implementation doesnt have map entry eviction functionality.
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>

@ -15,9 +15,12 @@
*/
package org.redisson.client.handler;
import java.util.List;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.StringParamsEncoder;
import org.redisson.client.protocol.DefaultParamsEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -39,11 +42,11 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
private final Logger log = LoggerFactory.getLogger(getClass());
private final Encoder paramsEncoder = new StringParamsEncoder();
private final Encoder paramsEncoder = new DefaultParamsEncoder();
final char ARGS_PREFIX = '*';
final char BYTES_PREFIX = '$';
final byte[] CRLF = "\r\n".getBytes();
private static final char ARGS_PREFIX = '*';
private static final char BYTES_PREFIX = '$';
private static final byte[] CRLF = "\r\n".getBytes();
@Override
protected void encode(ChannelHandlerContext ctx, CommandData<Object, Object> msg, ByteBuf out) throws Exception {
@ -66,12 +69,12 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
if (msg.getCommand().getInParamIndex() == i && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
encoder = encoder(msg, i - msg.getCommand().getInParamIndex());
encoder = selectEncoder(msg, i - msg.getCommand().getInParamIndex());
}
} else {
int paramNum = i - msg.getCommand().getInParamIndex();
if (msg.getCommand().getInParamIndex() <= i) {
encoder = encoder(msg, paramNum);
int paramNum = i - msg.getCommand().getInParamIndex();
encoder = selectEncoder(msg, paramNum);
}
}
@ -85,27 +88,31 @@ public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Obj
}
}
private Encoder encoder(CommandData<Object, Object> msg, int param) {
private Encoder selectEncoder(CommandData<Object, Object> msg, int param) {
int typeIndex = 0;
if (msg.getCommand().getInParamType().size() > 1) {
List<ValueType> inParamType = msg.getCommand().getInParamType();
if (inParamType.size() > 1) {
typeIndex = param;
}
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP) {
if (inParamType.get(typeIndex) == ValueType.MAP) {
if (param % 2 != 0) {
return msg.getCodec().getMapValueEncoder();
} else {
return msg.getCodec().getMapKeyEncoder();
}
}
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_KEY) {
if (inParamType.get(typeIndex) == ValueType.MAP_KEY) {
return msg.getCodec().getMapKeyEncoder();
}
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_VALUE) {
if (inParamType.get(typeIndex) == ValueType.MAP_VALUE) {
return msg.getCodec().getMapValueEncoder();
}
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.OBJECTS) {
if (inParamType.get(typeIndex) == ValueType.OBJECTS) {
return msg.getCodec().getValueEncoder();
}
if (inParamType.get(typeIndex) == ValueType.BINARY) {
return ByteArrayCodec.INSTANCE.getValueEncoder();
}
throw new IllegalStateException();
}

@ -17,10 +17,13 @@ package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
public class StringParamsEncoder implements Encoder {
public class DefaultParamsEncoder implements Encoder {
@Override
public byte[] encode(Object in) {
if (in instanceof byte[]) {
return (byte[]) in;
}
try {
return in.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {

@ -24,7 +24,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder;
public class RedisCommand<R> {
public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP}
public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY}
private ValueType outParamType = ValueType.OBJECT;
private List<ValueType> inParamType = Arrays.asList(ValueType.OBJECT);

@ -103,6 +103,7 @@ public interface RedisCommands {
RedisCommand<Boolean> SREM_SINGLE = new RedisCommand<Boolean>("SREM", new BooleanReplayConvertor(), 2);
RedisCommand<List<Object>> SMEMBERS = new RedisCommand<List<Object>>("SMEMBERS", new ObjectListReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Integer> SCARD_INT = new RedisStrictCommand<Integer>("SCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD");
@ -144,6 +145,7 @@ public interface RedisCommands {
RedisStrictCommand<List<Boolean>> SCRIPT_EXISTS = new RedisStrictCommand<List<Boolean>>("SCRIPT", "EXISTS", new ObjectListReplayDecoder<Boolean>(), new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EVAL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EVAL_BOOLEAN_WITH_VALUES = new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4);
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder());
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");
@ -173,7 +175,7 @@ public interface RedisCommands {
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY);
RedisStrictCommand<Integer> HLEN = new RedisStrictCommand<Integer>("HLEN", new IntegerReplayConvertor());
RedisCommand<Set<Object>> HKEYS = new RedisCommand<Set<Object>>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
RedisCommand<String> HMSET = new RedisCommand<String>("HMSET", new StringReplayDecoder(), 2, ValueType.MAP);
RedisCommand<Void> HMSET = new RedisCommand<Void>("HMSET", new VoidReplayConvertor(), 2, ValueType.MAP);
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder<Object>(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Long> HDEL = new RedisStrictCommand<Long>("HDEL", 2, ValueType.MAP_KEY);

@ -37,6 +37,8 @@ public interface CommandAsyncExecutor {
<V> V get(Future<V> future);
<T, R> Future<R> evalScriptReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Future<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);

@ -25,9 +25,8 @@ import io.netty.util.concurrent.Future;
/**
* Interface for using pipeline feature.
*
* All methods invocations via async objects
* which have gotten from this interface are batched
* to separate queue and could be executed later
* All method invocations on objects
* from this interface are batched to separate queue and could be executed later
* with <code>execute()</code> or <code>executeAsync()</code> methods.
*
*
@ -36,6 +35,33 @@ import io.netty.util.concurrent.Future;
*/
public interface RBatch {
/**
* Returns set-based cache instance by <code>name</code>.
* Uses map (value_hash, value) under the hood for minimal memory consumption.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCacheAsync<V> getSetCache(String name);
/**
* Returns set-based cache instance by <code>name</code>
* using provided <code>codec</code> for values.
* Uses map (value_hash, value) under the hood for minimal memory consumption.
* Supports value eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<V> RSetCacheAsync<V> getSetCache(String name, Codec codec);
/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.

@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit;
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)}
* And therefore has an complex lua-scripts inside.</p>
*
* <p>Current redis implementation doesnt have eviction functionality.
* <p>Current redis implementation doesnt have map entry eviction functionality.
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>

@ -28,7 +28,7 @@ import io.netty.util.concurrent.Future;
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* <p>Set-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} method.
* And therefore has an complex lua-scripts inside.
* Uses map (value_hash, value) to tie with expiration sorted set under the hood.
* </p>
*
* <p>Current Redis implementation doesn't have set entry eviction functionality.
* Thus values are checked for TTL expiration during any value read operation.
* If entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.</p>
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V> {
boolean add(V value, long ttl, TimeUnit unit);
}

@ -0,0 +1,33 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.Future;
/**
* Async set functions
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public interface RSetCacheAsync<V> extends RCollectionAsync<V> {
Future<Boolean> addAsync(V value, long ttl, TimeUnit unit);
}

@ -173,7 +173,7 @@ public class RedissonBatchReactive implements RBatchReactive {
}
@Override
public Publisher<List<?>> executeReactive() {
public Publisher<List<?>> execute() {
return new NettyFuturePublisher<List<?>>(executorService.executeAsync());
}

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.RedissonCacheEvictionScheduler;
import org.redisson.RedissonEvictionScheduler;
import org.redisson.api.RMapCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
@ -51,14 +51,14 @@ import reactor.rx.action.support.DefaultSubscriber;
/**
* <p>Map-based cache with ability to set TTL for each entry via
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)}
* {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} method.
* And therefore has an complex lua-scripts inside.</p>
*
* <p>Current redis implementation doesnt have eviction functionality.
* <p>Current redis implementation doesnt have map entry eviction functionality.
* Thus entries are checked for TTL expiration during any key/value/entry read operation.
* If key/value/entry expired then it doesn't returns and clean task runs asynchronous.
* Clean task deletes removes 100 expired entries at once.
* In addition there is {@link org.redisson.RedissonCacheEvictionScheduler}. This scheduler
* In addition there is {@link org.redisson.RedissonEvictionScheduler}. This scheduler
* deletes expired entries in time interval between 5 seconds to 2 hours.</p>
*
* <p>If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonMapReactive}.</p>
@ -80,16 +80,14 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);
private static final RedissonCacheEvictionScheduler SCHEDULER = new RedissonCacheEvictionScheduler();
public RedissonMapCacheReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
}
public RedissonMapCacheReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor);
RedissonEvictionScheduler.INSTANCE.schedule(getName(), getTimeoutSetName(), commandExecutor);
}
@Override

@ -22,6 +22,7 @@ import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.MsgPackJacksonCodec;
import org.redisson.core.Predicate;
import org.redisson.core.RMapCache;
import org.redisson.core.RSetCache;
import org.redisson.core.RMap;
import io.netty.util.concurrent.Future;
@ -169,27 +170,6 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(expectedMap, filtered);
}
// @Test
// public void testFilterKeys() {
// RCache<Integer, Integer> map = redisson.getCache("filterKeys");
// map.put(1, 100);
// map.put(2, 200);
// map.put(3, 300);
// map.put(4, 400);
//
// Map<Integer, Integer> filtered = map.filterKeys(new Predicate<Integer>() {
// @Override
// public boolean apply(Integer input) {
// return input >= 2 && input <= 3;
// }
// });
//
// Map<Integer, Integer> expectedMap = new HashMap<Integer, Integer>();
// expectedMap.put(2, 200);
// expectedMap.put(3, 300);
// Assert.assertEquals(expectedMap, filtered);
// }
@Test
public void testExpiredIterator() throws InterruptedException {
RMapCache<String, String> cache = redisson.getMapCache("simple");
@ -632,6 +612,23 @@ public class RedissonMapCacheTest extends BaseTest {
Assert.assertEquals(testMap.hashCode(), map.hashCode());
}
@Test
public void testExpireOverwrite() throws InterruptedException, ExecutionException {
RMapCache<String, Integer> set = redisson.getMapCache("simple");
set.put("123", 3, 1, TimeUnit.SECONDS);
Thread.sleep(800);
set.put("123", 3, 1, TimeUnit.SECONDS);
Thread.sleep(800);
Assert.assertEquals(3, (int)set.get("123"));
Thread.sleep(200);
Assert.assertFalse(set.containsKey("123"));
}
@Test
public void testFastRemoveEmpty() throws Exception {
RMapCache<Integer, Integer> map = redisson.getMapCache("simple");

@ -0,0 +1,354 @@
package org.redisson;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.codec.MsgPackJacksonCodec;
import org.redisson.core.RSetCache;
public class RedissonSetCacheTest extends BaseTest {
public static class SimpleBean implements Serializable {
private Long lng;
public Long getLng() {
return lng;
}
public void setLng(Long lng) {
this.lng = lng;
}
}
@Test
public void testAddBean() throws InterruptedException, ExecutionException {
SimpleBean sb = new SimpleBean();
sb.setLng(1L);
RSetCache<SimpleBean> set = redisson.getSetCache("simple");
set.add(sb);
Assert.assertEquals(sb.getLng(), set.iterator().next().getLng());
}
@Test
public void testAddExpire() throws InterruptedException, ExecutionException {
RSetCache<String> set = redisson.getSetCache("simple");
set.add("123", 1, TimeUnit.SECONDS);
Assert.assertThat(set, Matchers.contains("123"));
Thread.sleep(1000);
Assert.assertFalse(set.contains("123"));
Thread.sleep(50);
Assert.assertEquals(0, set.size());
}
@Test
public void testExpireOverwrite() throws InterruptedException, ExecutionException {
RSetCache<String> set = redisson.getSetCache("simple");
set.add("123", 1, TimeUnit.SECONDS);
Thread.sleep(800);
set.add("123", 1, TimeUnit.SECONDS);
Thread.sleep(800);
Assert.assertTrue(set.contains("123"));
Thread.sleep(200);
Assert.assertFalse(set.contains("123"));
}
@Test
public void testRemove() throws InterruptedException, ExecutionException {
RSetCache<Integer> set = redisson.getSetCache("simple");
set.add(1, 1, TimeUnit.SECONDS);
set.add(3, 2, TimeUnit.SECONDS);
set.add(7, 3, TimeUnit.SECONDS);
Assert.assertTrue(set.remove(1));
Assert.assertFalse(set.contains(1));
Assert.assertThat(set, Matchers.containsInAnyOrder(3, 7));
Assert.assertFalse(set.remove(1));
Assert.assertThat(set, Matchers.containsInAnyOrder(3, 7));
Assert.assertTrue(set.remove(3));
Assert.assertFalse(set.contains(3));
Assert.assertThat(set, Matchers.contains(7));
Assert.assertEquals(1, set.size());
}
@Test
public void testIteratorRemove() throws InterruptedException {
RSetCache<String> set = redisson.getSetCache("list");
set.add("1");
set.add("4", 1, TimeUnit.SECONDS);
set.add("2");
set.add("5", 1, TimeUnit.SECONDS);
set.add("3");
Thread.sleep(1000);
for (Iterator<String> iterator = set.iterator(); iterator.hasNext();) {
String value = iterator.next();
if (value.equals("2")) {
iterator.remove();
}
}
Assert.assertThat(set, Matchers.containsInAnyOrder("1", "3"));
int iteration = 0;
for (Iterator<String> iterator = set.iterator(); iterator.hasNext();) {
iterator.next();
iterator.remove();
iteration++;
}
Assert.assertEquals(2, iteration);
Assert.assertFalse(set.contains("4"));
Assert.assertFalse(set.contains("5"));
Thread.sleep(50);
Assert.assertEquals(0, set.size());
Assert.assertTrue(set.isEmpty());
}
@Test
public void testIteratorSequence() {
RSetCache<Long> set = redisson.getSetCache("set");
for (int i = 0; i < 1000; i++) {
set.add(Long.valueOf(i));
}
Set<Long> setCopy = new HashSet<Long>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Long.valueOf(i));
}
checkIterator(set, setCopy);
}
private void checkIterator(Set<Long> set, Set<Long> setCopy) {
for (Iterator<Long> iterator = set.iterator(); iterator.hasNext();) {
Long value = iterator.next();
if (!setCopy.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testRetainAll() {
RSetCache<Integer> set = redisson.getSetCache("set");
for (int i = 0; i < 10000; i++) {
set.add(i);
set.add(i*10, 10, TimeUnit.SECONDS);
}
Assert.assertTrue(set.retainAll(Arrays.asList(1, 2)));
Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2));
Assert.assertEquals(2, set.size());
}
@Test
public void testIteratorRemoveHighVolume() throws InterruptedException {
RSetCache<Integer> set = redisson.getSetCache("set");
for (int i = 1; i <= 5000; i++) {
set.add(i);
set.add(i*100000, 20, TimeUnit.SECONDS);
}
int cnt = 0;
Iterator<Integer> iterator = set.iterator();
while (iterator.hasNext()) {
Integer integer = iterator.next();
iterator.remove();
cnt++;
}
Assert.assertEquals(10000, cnt);
Assert.assertEquals(0, set.size());
}
@Test
public void testContainsAll() {
RSetCache<Integer> set = redisson.getSetCache("set");
for (int i = 0; i < 200; i++) {
set.add(i);
}
Assert.assertTrue(set.containsAll(Collections.emptyList()));
Assert.assertTrue(set.containsAll(Arrays.asList(30, 11)));
Assert.assertFalse(set.containsAll(Arrays.asList(30, 711, 11)));
}
@Test
public void testToArray() throws InterruptedException {
RSetCache<String> set = redisson.getSetCache("set");
set.add("1");
set.add("4");
set.add("2", 1, TimeUnit.SECONDS);
set.add("5");
set.add("3");
Thread.sleep(1000);
MatcherAssert.assertThat(Arrays.asList(set.toArray()), Matchers.<Object>containsInAnyOrder("1", "4", "5", "3"));
String[] strs = set.toArray(new String[0]);
MatcherAssert.assertThat(Arrays.asList(strs), Matchers.containsInAnyOrder("1", "4", "5", "3"));
}
@Test
public void testContains() throws InterruptedException {
RSetCache<TestObject> set = redisson.getSetCache("set");
set.add(new TestObject("1", "2"));
set.add(new TestObject("1", "2"));
set.add(new TestObject("2", "3"), 1, TimeUnit.SECONDS);
set.add(new TestObject("3", "4"));
set.add(new TestObject("5", "6"));
Thread.sleep(1000);
Assert.assertFalse(set.contains(new TestObject("2", "3")));
Assert.assertTrue(set.contains(new TestObject("1", "2")));
Assert.assertFalse(set.contains(new TestObject("1", "9")));
}
@Test
public void testDuplicates() {
RSetCache<TestObject> set = redisson.getSetCache("set");
set.add(new TestObject("1", "2"));
set.add(new TestObject("1", "2"));
set.add(new TestObject("2", "3"));
set.add(new TestObject("3", "4"));
set.add(new TestObject("5", "6"));
Assert.assertEquals(4, set.size());
}
@Test
public void testSize() {
RSetCache<Integer> set = redisson.getSetCache("set");
set.add(1);
set.add(2);
set.add(3);
set.add(3);
set.add(4);
set.add(5);
set.add(5);
Assert.assertEquals(5, set.size());
}
@Test
public void testRetainAllEmpty() {
RSetCache<Integer> set = redisson.getSetCache("set");
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
Assert.assertTrue(set.retainAll(Collections.<Integer>emptyList()));
Assert.assertEquals(0, set.size());
}
@Test
public void testRetainAllNoModify() {
RSetCache<Integer> set = redisson.getSetCache("set");
set.add(1);
set.add(2);
Assert.assertFalse(set.retainAll(Arrays.asList(1, 2))); // nothing changed
Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2));
}
@Test
public void testExpiredIterator() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple");
cache.add("0");
cache.add("1", 1, TimeUnit.SECONDS);
cache.add("2", 3, TimeUnit.SECONDS);
cache.add("3", 4, TimeUnit.SECONDS);
cache.add("4", 1, TimeUnit.SECONDS);
Thread.sleep(1000);
MatcherAssert.assertThat(cache, Matchers.contains("0", "2", "3"));
}
@Test
public void testExpire() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple");
cache.add("8", 1, TimeUnit.SECONDS);
cache.expire(100, TimeUnit.MILLISECONDS);
Thread.sleep(500);
Assert.assertEquals(0, cache.size());
}
@Test
public void testExpireAt() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple");
cache.add("8", 1, TimeUnit.SECONDS);
cache.expireAt(System.currentTimeMillis() + 100);
Thread.sleep(500);
Assert.assertEquals(0, cache.size());
}
@Test
public void testClearExpire() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple");
cache.add("8", 1, TimeUnit.SECONDS);
cache.expireAt(System.currentTimeMillis() + 100);
cache.clearExpire();
Thread.sleep(500);
Assert.assertEquals(1, cache.size());
}
@Test
public void testScheduler() throws InterruptedException {
RSetCache<String> cache = redisson.getSetCache("simple", new MsgPackJacksonCodec());
Assert.assertFalse(cache.contains("33"));
cache.add("33", 5, TimeUnit.SECONDS);
Thread.sleep(11000);
Assert.assertEquals(0, cache.size());
}
}
Loading…
Cancel
Save