Merge branch 'mrniko/master'
commit
93847fd92f
@ -0,0 +1,232 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.core.RListMultimapCache;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <K> key
|
||||
* @param <V> value
|
||||
*/
|
||||
public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V> implements RListMultimapCache<K, V> {
|
||||
|
||||
private final RedissonMultimapCache<K> baseCache;
|
||||
|
||||
RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
|
||||
super(connectionManager, name);
|
||||
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
|
||||
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
|
||||
}
|
||||
|
||||
RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
|
||||
super(codec, connectionManager, name);
|
||||
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
|
||||
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
|
||||
}
|
||||
|
||||
public Future<Boolean> containsKeyAsync(Object key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
|
||||
String valuesName = getValuesName(keyHash);
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
||||
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
|
||||
"if value ~= false then " +
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('llen', ARGV[3]) > 0 and 1 or 0;" +
|
||||
"end;" +
|
||||
"return 0; ",
|
||||
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), keyState, valuesName);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
String getTimeoutSetName() {
|
||||
return "redisson_list_multimap_ttl{" + getName() + "}";
|
||||
}
|
||||
|
||||
|
||||
public Future<Boolean> containsValueAsync(Object value) {
|
||||
try {
|
||||
byte[] valueState = codec.getMapValueEncoder().encode(value);
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
||||
"local keys = redis.call('hgetall', KEYS[1]); " +
|
||||
"for i, v in ipairs(keys) do " +
|
||||
"if i % 2 == 0 then " +
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], keys[i-1]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate > tonumber(ARGV[2]) then " +
|
||||
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
|
||||
|
||||
"local items = redis.call('lrange', name, 0, -1) " +
|
||||
"for i=1,#items do " +
|
||||
"if items[i] == ARGV[1] then " +
|
||||
"return 1; " +
|
||||
"end; " +
|
||||
"end; " +
|
||||
|
||||
"end; " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"return 0; ",
|
||||
Arrays.<Object>asList(getName(), getTimeoutSetName()), valueState, System.currentTimeMillis());
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Future<Boolean> containsEntryAsync(Object key, Object value) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
byte[] valueState = codec.getMapValueEncoder().encode(value);
|
||||
|
||||
String valuesName = getValuesName(keyHash);
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate > tonumber(ARGV[1]) then " +
|
||||
"local items = redis.call('lrange', KEYS[1], 0, -1); " +
|
||||
"for i=0, #items do " +
|
||||
"if items[i] == ARGV[3] then " +
|
||||
"return 1; " +
|
||||
"end; " +
|
||||
"end; " +
|
||||
"end; " +
|
||||
"return 0; ",
|
||||
Arrays.<Object>asList(valuesName, getTimeoutSetName()), System.currentTimeMillis(), keyState, valueState);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<V> get(K key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
String valuesName = getValuesName(keyHash);
|
||||
|
||||
return new RedissonListMultimapValues<V>(codec, commandExecutor, valuesName, getTimeoutSetName(), key);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Future<Collection<V>> getAllAsync(K key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
String valuesName = getValuesName(keyHash);
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LIST,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate > tonumber(ARGV[1]) then " +
|
||||
"return redis.call('lrange', KEYS[1], 0, -1); " +
|
||||
"end; " +
|
||||
"return {}; ",
|
||||
Arrays.<Object>asList(valuesName, getTimeoutSetName()), System.currentTimeMillis(), keyState);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Future<Collection<V>> removeAllAsync(Object key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
|
||||
String valuesName = getValuesName(keyHash);
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_SET,
|
||||
"redis.call('hdel', KEYS[1], ARGV[1]); " +
|
||||
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
|
||||
"redis.call('del', KEYS[2]); " +
|
||||
"redis.call('zrem', KEYS[3], ARGV[1]); " +
|
||||
"return members; ",
|
||||
Arrays.<Object>asList(getName(), valuesName, getTimeoutSetName()), keyState);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean expireKey(K key, long timeToLive, TimeUnit timeUnit) {
|
||||
return get(expireKeyAsync(key, timeToLive, timeUnit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
|
||||
return baseCache.expireKeyAsync(key, timeToLive, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> deleteAsync() {
|
||||
return baseCache.deleteAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
|
||||
return baseCache.expireAsync(timeToLive, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> expireAtAsync(long timestamp) {
|
||||
return baseCache.expireAtAsync(timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> clearExpireAsync() {
|
||||
return baseCache.clearExpireAsync();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,688 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT;
|
||||
import static org.redisson.client.protocol.RedisCommands.LPOP;
|
||||
import static org.redisson.client.protocol.RedisCommands.LPUSH_BOOLEAN;
|
||||
import static org.redisson.client.protocol.RedisCommands.RPUSH_BOOLEAN;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.protocol.RedisCommand;
|
||||
import org.redisson.client.protocol.RedisCommand.ValueType;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
|
||||
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
|
||||
import org.redisson.client.protocol.convertor.Convertor;
|
||||
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
|
||||
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.core.RList;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* List based Multimap Cache values holder
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <V> the type of elements held in this collection
|
||||
*/
|
||||
public class RedissonListMultimapValues<V> extends RedissonExpirable implements RList<V> {
|
||||
|
||||
private static final RedisCommand<Integer> LAST_INDEX = new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE));
|
||||
private static final RedisCommand<Integer> EVAL_SIZE = new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 6, ValueType.MAP_KEY);
|
||||
private static final RedisCommand<Integer> EVAL_GET = new RedisCommand<Integer>("EVAL", 7, ValueType.MAP_KEY);
|
||||
private static final RedisCommand<Set<Object>> EVAL_READALL = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), 6, ValueType.MAP_KEY);
|
||||
private static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE));
|
||||
private static final RedisCommand<Boolean> EVAL_CONTAINS_ALL_WITH_VALUES = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.OBJECTS);
|
||||
|
||||
|
||||
public static final RedisCommand<Boolean> EVAL_BOOLEAN_ARGS2 = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.OBJECTS);
|
||||
|
||||
private final Object key;
|
||||
private final String timeoutSetName;
|
||||
|
||||
public RedissonListMultimapValues(Codec codec, CommandAsyncExecutor commandExecutor, String name, String timeoutSetName, Object key) {
|
||||
super(codec, commandExecutor, name);
|
||||
this.timeoutSetName = timeoutSetName;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return get(sizeAsync());
|
||||
}
|
||||
|
||||
public Future<Integer> sizeAsync() {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_SIZE,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('llen', KEYS[2]);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return size() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {
|
||||
return get(containsAsync(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<V> iterator() {
|
||||
return listIterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
List<V> list = readAll();
|
||||
return list.toArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<V> readAll() {
|
||||
return get(readAllAsync());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<List<V>> readAllAsync() {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_READALL,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return {};"
|
||||
+ "end; "
|
||||
+ "return redis.call('lrange', KEYS[2], 0, -1);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
List<V> list = readAll();
|
||||
return list.toArray(a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(V e) {
|
||||
return get(addAsync(e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> addAsync(V e) {
|
||||
return commandExecutor.writeAsync(getName(), codec, RPUSH_BOOLEAN, getName(), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
return get(removeAsync(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> removeAsync(Object o) {
|
||||
return removeAsync(o, 1);
|
||||
}
|
||||
|
||||
protected Future<Boolean> removeAsync(Object o, int count) {
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('lrem', KEYS[2], ARGV[2], ARGV[4]) > 0 and 1 or 0;",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), count, key, o);
|
||||
}
|
||||
|
||||
protected boolean remove(Object o, int count) {
|
||||
return get(removeAsync(o, count));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> containsAllAsync(Collection<?> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 2);
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
args.add(System.currentTimeMillis());
|
||||
args.add(keyState);
|
||||
args.addAll(c);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; " +
|
||||
"local items = redis.call('lrange', KEYS[2], 0, -1);" +
|
||||
"for i = 0, #items, 1 do " +
|
||||
"for j = 2, table.getn(ARGV), 1 do "
|
||||
+ "if ARGV[j] == items[i] "
|
||||
+ "then table.remove(ARGV, j) end "
|
||||
+ "end; "
|
||||
+ "end;"
|
||||
+ "return table.getn(ARGV) == 2 and 1 or 0; ",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
return get(containsAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends V> c) {
|
||||
return get(addAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> addAllAsync(final Collection<? extends V> c) {
|
||||
if (c.isEmpty()) {
|
||||
return newSucceededFuture(false);
|
||||
}
|
||||
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 1);
|
||||
args.add(getName());
|
||||
args.addAll(c);
|
||||
return commandExecutor.writeAsync(getName(), codec, RPUSH_BOOLEAN, args.toArray());
|
||||
}
|
||||
|
||||
public Future<Boolean> addAllAsync(int index, Collection<? extends V> coll) {
|
||||
if (index < 0) {
|
||||
throw new IndexOutOfBoundsException("index: " + index);
|
||||
}
|
||||
|
||||
if (coll.isEmpty()) {
|
||||
return newSucceededFuture(false);
|
||||
}
|
||||
|
||||
if (index == 0) { // prepend elements to list
|
||||
List<Object> elements = new ArrayList<Object>(coll);
|
||||
Collections.reverse(elements);
|
||||
elements.add(0, getName());
|
||||
|
||||
return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, elements.toArray());
|
||||
}
|
||||
|
||||
List<Object> args = new ArrayList<Object>(coll.size() + 1);
|
||||
args.add(index);
|
||||
args.addAll(coll);
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_BOOLEAN_ARGS2,
|
||||
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
|
||||
"local size = redis.call('llen', KEYS[1]); " +
|
||||
"assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " +
|
||||
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
|
||||
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
|
||||
"for i=1, #ARGV, 5000 do "
|
||||
+ "redis.call('rpush', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); "
|
||||
+ "end " +
|
||||
"if #tail > 0 then " +
|
||||
"for i=1, #tail, 5000 do "
|
||||
+ "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); "
|
||||
+ "end "
|
||||
+ "end;" +
|
||||
"return 1;",
|
||||
Collections.<Object>singletonList(getName()), args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(int index, Collection<? extends V> coll) {
|
||||
return get(addAllAsync(index, coll));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> removeAllAsync(Collection<?> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 2);
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
args.add(System.currentTimeMillis());
|
||||
args.add(keyState);
|
||||
args.addAll(c);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; " +
|
||||
|
||||
"local v = 0 " +
|
||||
"for i = 2, #ARGV, 1 do "
|
||||
+ "if redis.call('lrem', KEYS[2], 0, ARGV[i]) == 1 "
|
||||
+ "then v = 1 end "
|
||||
+"end "
|
||||
+ "return v ",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
return get(removeAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
return get(retainAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> retainAllAsync(Collection<?> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 2);
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
args.add(System.currentTimeMillis());
|
||||
args.add(keyState);
|
||||
args.addAll(c);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; " +
|
||||
|
||||
"local changed = 0; " +
|
||||
"local s = redis.call('lrange', KEYS[2], 0, -1); "
|
||||
+ "local i = 0; "
|
||||
+ "while i <= #s do "
|
||||
+ "local element = s[i]; "
|
||||
+ "local isInAgrs = false; "
|
||||
+ "for j = 2, #ARGV, 1 do "
|
||||
+ "if ARGV[j] == element then "
|
||||
+ "isInAgrs = true; "
|
||||
+ "break; "
|
||||
+ "end; "
|
||||
+ "end; "
|
||||
+ "if isInAgrs == false then "
|
||||
+ "redis.call('lrem', KEYS[2], 0, element); "
|
||||
+ "changed = 1; "
|
||||
+ "end; "
|
||||
+ "i = i + 1; "
|
||||
+ "end; "
|
||||
+ "return changed; ",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<V> getAsync(int index) {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_GET,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore); "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return nil;"
|
||||
+ "end; "
|
||||
+ "return redis.call('lindex', KEYS[2], ARGV[2]);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), index, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V get(int index) {
|
||||
checkIndex(index);
|
||||
return getValue(index);
|
||||
}
|
||||
|
||||
V getValue(int index) {
|
||||
return get(getAsync(index));
|
||||
}
|
||||
|
||||
private void checkIndex(int index) {
|
||||
int size = size();
|
||||
if (!isInRange(index, size))
|
||||
throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
|
||||
}
|
||||
|
||||
private boolean isInRange(int index, int size) {
|
||||
return index >= 0 && index < size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V set(int index, V element) {
|
||||
checkIndex(index);
|
||||
return get(setAsync(index, element));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<V> setAsync(int index, V element) {
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", 5),
|
||||
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
|
||||
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
|
||||
"return v",
|
||||
Collections.<Object>singletonList(getName()), index, element);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fastSet(int index, V element) {
|
||||
get(fastSetAsync(index, element));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> fastSetAsync(int index, V element) {
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LSET, getName(), index, element);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(int index, V element) {
|
||||
addAll(index, Collections.singleton(element));
|
||||
}
|
||||
|
||||
@Override
|
||||
public V remove(int index) {
|
||||
checkIndex(index);
|
||||
|
||||
if (index == 0) {
|
||||
Future<V> f = commandExecutor.writeAsync(getName(), codec, LPOP, getName());
|
||||
return get(f);
|
||||
}
|
||||
|
||||
Future<V> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
|
||||
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
|
||||
"local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" +
|
||||
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
|
||||
"if #tail > 0 then " +
|
||||
"for i=1, #tail, 5000 do "
|
||||
+ "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); "
|
||||
+ "end "
|
||||
+ "end;" +
|
||||
"return v",
|
||||
Collections.<Object>singletonList(getName()), index);
|
||||
return get(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int indexOf(Object o) {
|
||||
return get(indexOfAsync(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> containsAsync(Object o) {
|
||||
return indexOfAsync(o, new BooleanNumberReplayConvertor(-1L));
|
||||
}
|
||||
|
||||
private <R> Future<R> indexOfAsync(Object o, Convertor<R> convertor) {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<R>("EVAL", convertor, 6, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE)),
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore); "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return -1;"
|
||||
+ "end; " +
|
||||
|
||||
"local items = redis.call('lrange', KEYS[2], 0, -1); " +
|
||||
"for i=1,#items do " +
|
||||
"if items[i] == ARGV[3] then " +
|
||||
"return i - 1; " +
|
||||
"end; " +
|
||||
"end; " +
|
||||
"return -1;",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Integer> indexOfAsync(Object o) {
|
||||
return indexOfAsync(o, new IntegerReplayConvertor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lastIndexOf(Object o) {
|
||||
return get(lastIndexOfAsync(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Integer> lastIndexOfAsync(Object o) {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, LAST_INDEX,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore); "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return -1;"
|
||||
+ "end; " +
|
||||
|
||||
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
|
||||
"for i = #items, 0, -1 do " +
|
||||
"if items[i] == ARGV[1] then " +
|
||||
"return i - 1 " +
|
||||
"end " +
|
||||
"end " +
|
||||
"return -1",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trim(int fromIndex, int toIndex) {
|
||||
get(trimAsync(fromIndex, toIndex));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> trimAsync(int fromIndex, int toIndex) {
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LTRIM, getName(), fromIndex, toIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListIterator<V> listIterator() {
|
||||
return listIterator(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListIterator<V> listIterator(final int ind) {
|
||||
return new ListIterator<V>() {
|
||||
|
||||
private V prevCurrentValue;
|
||||
private V nextCurrentValue;
|
||||
private V currentValueHasRead;
|
||||
private int currentIndex = ind - 1;
|
||||
private boolean hasBeenModified = true;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
V val = RedissonListMultimapValues.this.getValue(currentIndex+1);
|
||||
if (val != null) {
|
||||
nextCurrentValue = val;
|
||||
}
|
||||
return val != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V next() {
|
||||
if (nextCurrentValue == null && !hasNext()) {
|
||||
throw new NoSuchElementException("No such element at index " + currentIndex);
|
||||
}
|
||||
currentIndex++;
|
||||
currentValueHasRead = nextCurrentValue;
|
||||
nextCurrentValue = null;
|
||||
hasBeenModified = false;
|
||||
return currentValueHasRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
if (currentValueHasRead == null) {
|
||||
throw new IllegalStateException("Neither next nor previous have been called");
|
||||
}
|
||||
if (hasBeenModified) {
|
||||
throw new IllegalStateException("Element been already deleted");
|
||||
}
|
||||
RedissonListMultimapValues.this.remove(currentIndex);
|
||||
currentIndex--;
|
||||
hasBeenModified = true;
|
||||
currentValueHasRead = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrevious() {
|
||||
if (currentIndex < 0) {
|
||||
return false;
|
||||
}
|
||||
V val = RedissonListMultimapValues.this.getValue(currentIndex);
|
||||
if (val != null) {
|
||||
prevCurrentValue = val;
|
||||
}
|
||||
return val != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public V previous() {
|
||||
if (prevCurrentValue == null && !hasPrevious()) {
|
||||
throw new NoSuchElementException("No such element at index " + currentIndex);
|
||||
}
|
||||
currentIndex--;
|
||||
hasBeenModified = false;
|
||||
currentValueHasRead = prevCurrentValue;
|
||||
prevCurrentValue = null;
|
||||
return currentValueHasRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextIndex() {
|
||||
return currentIndex + 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int previousIndex() {
|
||||
return currentIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(V e) {
|
||||
if (hasBeenModified) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
RedissonListMultimapValues.this.fastSet(currentIndex, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(V e) {
|
||||
RedissonListMultimapValues.this.add(currentIndex+1, e);
|
||||
currentIndex++;
|
||||
hasBeenModified = true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public RList<V> subList(int fromIndex, int toIndex) {
|
||||
int size = size();
|
||||
if (fromIndex < 0 || toIndex > size) {
|
||||
throw new IndexOutOfBoundsException("fromIndex: " + fromIndex + " toIndex: " + toIndex + " size: " + size);
|
||||
}
|
||||
if (fromIndex > toIndex) {
|
||||
throw new IllegalArgumentException("fromIndex: " + fromIndex + " toIndex: " + toIndex);
|
||||
}
|
||||
|
||||
return new RedissonSubList<V>(codec, commandExecutor, getName(), fromIndex, toIndex);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
Iterator<V> it = iterator();
|
||||
if (! it.hasNext())
|
||||
return "[]";
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append('[');
|
||||
for (;;) {
|
||||
V e = it.next();
|
||||
sb.append(e == this ? "(this Collection)" : e);
|
||||
if (! it.hasNext())
|
||||
return sb.append(']').toString();
|
||||
sb.append(',').append(' ');
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == this)
|
||||
return true;
|
||||
if (!(o instanceof List))
|
||||
return false;
|
||||
|
||||
Iterator<V> e1 = iterator();
|
||||
Iterator<?> e2 = ((List<?>) o).iterator();
|
||||
while (e1.hasNext() && e2.hasNext()) {
|
||||
V o1 = e1.next();
|
||||
Object o2 = e2.next();
|
||||
if (!(o1==null ? o2==null : o1.equals(o2)))
|
||||
return false;
|
||||
}
|
||||
return !(e1.hasNext() || e2.hasNext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int hashCode = 1;
|
||||
for (V e : this) {
|
||||
hashCode = 31*hashCode + (e==null ? 0 : e.hashCode());
|
||||
}
|
||||
return hashCode;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,129 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.codec.LongCodec;
|
||||
import org.redisson.client.protocol.RedisCommand;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.client.protocol.RedisCommand.ValueType;
|
||||
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
||||
public class RedissonMultimapCache<K> {
|
||||
|
||||
private static final RedisCommand<Boolean> EVAL_EXPIRE_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY);
|
||||
|
||||
private final CommandAsyncExecutor commandExecutor;
|
||||
private final String name;
|
||||
private final Codec codec;
|
||||
private final String timeoutSetName;
|
||||
|
||||
public RedissonMultimapCache(CommandAsyncExecutor commandExecutor, String name, Codec codec, String timeoutSetName) {
|
||||
this.commandExecutor = commandExecutor;
|
||||
this.name = name;
|
||||
this.codec = codec;
|
||||
this.timeoutSetName = timeoutSetName;
|
||||
}
|
||||
|
||||
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
|
||||
long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive);
|
||||
|
||||
return commandExecutor.evalWriteAsync(name, codec, EVAL_EXPIRE_KEY,
|
||||
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then "
|
||||
+ "if tonumber(ARGV[1]) > 0 then "
|
||||
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " +
|
||||
"else " +
|
||||
"redis.call('zrem', KEYS[2], ARGV[2]); "
|
||||
+ "end; "
|
||||
+ "return 1; "
|
||||
+ "else "
|
||||
+ "return 0; "
|
||||
+ "end",
|
||||
Arrays.<Object>asList(name, timeoutSetName), ttlTimeout, key);
|
||||
}
|
||||
|
||||
public Future<Boolean> deleteAsync() {
|
||||
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN_AMOUNT,
|
||||
"local entries = redis.call('hgetall', KEYS[1]); " +
|
||||
"local keys = {KEYS[1], KEYS[2]}; " +
|
||||
"for i, v in ipairs(entries) do " +
|
||||
"if i % 2 == 0 then " +
|
||||
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
|
||||
"table.insert(keys, name); " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
|
||||
"local n = 0 "
|
||||
+ "for i=1, #keys,5000 do "
|
||||
+ "n = n + redis.call('del', unpack(keys, i, math.min(i+4999, table.getn(keys)))) "
|
||||
+ "end; "
|
||||
+ "return n;",
|
||||
Arrays.<Object>asList(name, timeoutSetName));
|
||||
}
|
||||
|
||||
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
|
||||
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
||||
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag'); " +
|
||||
"local entries = redis.call('hgetall', KEYS[1]); " +
|
||||
"for i, v in ipairs(entries) do " +
|
||||
"if i % 2 == 0 then " +
|
||||
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
|
||||
"redis.call('pexpire', name, ARGV[1]); " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"redis.call('pexpire', KEYS[2], ARGV[1]); " +
|
||||
"return redis.call('pexpire', KEYS[1], ARGV[1]); ",
|
||||
Arrays.<Object>asList(name, timeoutSetName), timeUnit.toMillis(timeToLive));
|
||||
}
|
||||
|
||||
public Future<Boolean> expireAtAsync(long timestamp) {
|
||||
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
||||
"redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" +
|
||||
"local entries = redis.call('hgetall', KEYS[1]); " +
|
||||
"for i, v in ipairs(entries) do " +
|
||||
"if i % 2 == 0 then " +
|
||||
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
|
||||
"redis.call('pexpireat', name, ARGV[1]); " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"redis.call('pexpireat', KEYS[2], ARGV[1]); " +
|
||||
"return redis.call('pexpireat', KEYS[1], ARGV[1]); ",
|
||||
Arrays.<Object>asList(name, timeoutSetName), timestamp);
|
||||
}
|
||||
|
||||
public Future<Boolean> clearExpireAsync() {
|
||||
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
||||
"redis.call('zrem', KEYS[2], 'redisson__expiretag'); " +
|
||||
"local entries = redis.call('hgetall', KEYS[1]); " +
|
||||
"for i, v in ipairs(entries) do " +
|
||||
"if i % 2 == 0 then " +
|
||||
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
|
||||
"redis.call('persist', name); " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"redis.call('persist', KEYS[2]); " +
|
||||
"return redis.call('persist', KEYS[1]); ",
|
||||
Arrays.<Object>asList(name, timeoutSetName));
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,165 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.redisson.client.RedisException;
|
||||
import org.redisson.client.RedisTimeoutException;
|
||||
import org.redisson.core.MessageListener;
|
||||
import org.redisson.core.RBlockingQueue;
|
||||
import org.redisson.core.RRemoteService;
|
||||
import org.redisson.core.RTopic;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.ThreadLocalRandom;
|
||||
|
||||
public class RedissonRemoteService implements RRemoteService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
|
||||
|
||||
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
|
||||
|
||||
private final Redisson redisson;
|
||||
|
||||
public RedissonRemoteService(Redisson redisson) {
|
||||
this.redisson = redisson;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void register(Class<T> remoteInterface, T object) {
|
||||
register(remoteInterface, object, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void register(Class<T> remoteInterface, T object, int executorsAmount) {
|
||||
if (executorsAmount < 1) {
|
||||
throw new IllegalArgumentException("executorsAmount can't be lower than 1");
|
||||
}
|
||||
for (Method method : remoteInterface.getMethods()) {
|
||||
RemoteServiceMethod value = new RemoteServiceMethod(method, object);
|
||||
RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName());
|
||||
if (beans.put(key, value) != null) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < executorsAmount; i++) {
|
||||
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
|
||||
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
|
||||
subscribe(remoteInterface, requestQueue);
|
||||
}
|
||||
}
|
||||
|
||||
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
|
||||
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
|
||||
take.addListener(new FutureListener<RemoteServiceRequest>() {
|
||||
@Override
|
||||
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
|
||||
RemoteServiceRequest request = future.getNow();
|
||||
RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
|
||||
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + request.getRequestId();
|
||||
RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
|
||||
RemoteServiceResponse response;
|
||||
try {
|
||||
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
|
||||
response = new RemoteServiceResponse(result);
|
||||
} catch (Exception e) {
|
||||
response = new RemoteServiceResponse(e.getCause());
|
||||
log.error("Can't execute: " + request, e);
|
||||
}
|
||||
|
||||
long clients = topic.publish(response);
|
||||
if (clients == 0) {
|
||||
log.error("None of clients has not received a response: {} for request: {}", response, request);
|
||||
}
|
||||
|
||||
subscribe(remoteInterface, requestQueue);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T get(Class<T> remoteInterface) {
|
||||
return get(remoteInterface, -1, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T get(final Class<T> remoteInterface, final int timeout, final TimeUnit timeUnit) {
|
||||
InvocationHandler handler = new InvocationHandler() {
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
String requestId = generateRequestId();
|
||||
|
||||
String requestQueueName = "redisson_remote_service:{" + remoteInterface.getName() + "}";
|
||||
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
|
||||
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args);
|
||||
requestQueue.add(request);
|
||||
|
||||
String responseName = "redisson_remote_service:{" + remoteInterface.getName() + "}:" + requestId;
|
||||
final RTopic<RemoteServiceResponse> topic = redisson.getTopic(responseName);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<RemoteServiceResponse> response = new AtomicReference<RemoteServiceResponse>();
|
||||
int listenerId = topic.addListener(new MessageListener<RemoteServiceResponse>() {
|
||||
@Override
|
||||
public void onMessage(String channel, RemoteServiceResponse msg) {
|
||||
response.set(msg);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
if (timeout == -1) {
|
||||
latch.await();
|
||||
} else {
|
||||
if (!latch.await(timeout, timeUnit)) {
|
||||
topic.removeListener(listenerId);
|
||||
throw new RedisTimeoutException("No response after " + timeUnit.toMillis(timeout) + "ms for request: " + request);
|
||||
}
|
||||
}
|
||||
topic.removeListener(listenerId);
|
||||
RemoteServiceResponse msg = response.get();
|
||||
if (msg.getError() != null) {
|
||||
throw msg.getError();
|
||||
}
|
||||
return msg.getResult();
|
||||
}
|
||||
};
|
||||
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] {remoteInterface}, handler);
|
||||
}
|
||||
|
||||
private String generateRequestId() {
|
||||
byte[] id = new byte[16];
|
||||
// TODO JDK UPGRADE replace to native ThreadLocalRandom
|
||||
ThreadLocalRandom.current().nextBytes(id);
|
||||
return ByteBufUtil.hexDump(id);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,224 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.core.RSetMultimapCache;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <K> key
|
||||
* @param <V> value
|
||||
*/
|
||||
public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> implements RSetMultimapCache<K, V> {
|
||||
|
||||
private final RedissonMultimapCache<K> baseCache;
|
||||
|
||||
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
|
||||
super(connectionManager, name);
|
||||
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
|
||||
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
|
||||
}
|
||||
|
||||
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
|
||||
super(codec, connectionManager, name);
|
||||
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
|
||||
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
|
||||
}
|
||||
|
||||
public Future<Boolean> containsKeyAsync(Object key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
|
||||
String setName = getValuesName(keyHash);
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
||||
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
|
||||
"if value ~= false then " +
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('scard', ARGV[3]) > 0 and 1 or 0;" +
|
||||
"end;" +
|
||||
"return 0; ",
|
||||
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), keyState, setName);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
String getTimeoutSetName() {
|
||||
return "redisson_set_multimap_ttl{" + getName() + "}";
|
||||
}
|
||||
|
||||
|
||||
public Future<Boolean> containsValueAsync(Object value) {
|
||||
try {
|
||||
byte[] valueState = codec.getMapValueEncoder().encode(value);
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
||||
"local keys = redis.call('hgetall', KEYS[1]); " +
|
||||
"for i, v in ipairs(keys) do " +
|
||||
"if i % 2 == 0 then " +
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], keys[i-1]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate > tonumber(ARGV[2]) then " +
|
||||
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
|
||||
"if redis.call('sismember', name, ARGV[1]) == 1 then "
|
||||
+ "return 1; " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"return 0; ",
|
||||
Arrays.<Object>asList(getName(), getTimeoutSetName()), valueState, System.currentTimeMillis());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Future<Boolean> containsEntryAsync(Object key, Object value) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
byte[] valueState = codec.getMapValueEncoder().encode(value);
|
||||
|
||||
String setName = getValuesName(keyHash);
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate > tonumber(ARGV[1]) then " +
|
||||
"if redis.call('sismember', KEYS[1], ARGV[3]) == 1 then "
|
||||
+ "return 1; " +
|
||||
"end;" +
|
||||
"end; " +
|
||||
"return 0; ",
|
||||
Arrays.<Object>asList(setName, getTimeoutSetName()), System.currentTimeMillis(), keyState, valueState);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<V> get(K key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
String setName = getValuesName(keyHash);
|
||||
|
||||
return new RedissonSetMultimapValues<V>(codec, commandExecutor, setName, getTimeoutSetName(), key);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Future<Collection<V>> getAllAsync(K key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
String setName = getValuesName(keyHash);
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_SET,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate > tonumber(ARGV[1]) then " +
|
||||
"return redis.call('smembers', KEYS[1]); " +
|
||||
"end; " +
|
||||
"return {}; ",
|
||||
Arrays.<Object>asList(setName, getTimeoutSetName()), System.currentTimeMillis(), keyState);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Future<Collection<V>> removeAllAsync(Object key) {
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
String keyHash = hash(keyState);
|
||||
|
||||
String setName = getValuesName(keyHash);
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_SET,
|
||||
"redis.call('hdel', KEYS[1], ARGV[1]); " +
|
||||
"local members = redis.call('smembers', KEYS[2]); " +
|
||||
"redis.call('del', KEYS[2]); " +
|
||||
"redis.call('zrem', KEYS[3], ARGV[1]); " +
|
||||
"return members; ",
|
||||
Arrays.<Object>asList(getName(), setName, getTimeoutSetName()), keyState);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean expireKey(K key, long timeToLive, TimeUnit timeUnit) {
|
||||
return get(expireKeyAsync(key, timeToLive, timeUnit));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
|
||||
return baseCache.expireKeyAsync(key, timeToLive, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> deleteAsync() {
|
||||
return baseCache.deleteAsync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
|
||||
return baseCache.expireAsync(timeToLive, timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> expireAtAsync(long timestamp) {
|
||||
return baseCache.expireAtAsync(timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> clearExpireAsync() {
|
||||
return baseCache.clearExpireAsync();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,446 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
|
||||
import org.redisson.client.codec.Codec;
|
||||
import org.redisson.client.protocol.RedisCommand;
|
||||
import org.redisson.client.protocol.RedisCommand.ValueType;
|
||||
import org.redisson.client.protocol.RedisCommands;
|
||||
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
|
||||
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
|
||||
import org.redisson.client.protocol.decoder.ListScanResult;
|
||||
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
|
||||
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
|
||||
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
|
||||
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
|
||||
import org.redisson.command.CommandAsyncExecutor;
|
||||
import org.redisson.core.RSet;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* Set based Multimap Cache values holder
|
||||
*
|
||||
* @author Nikita Koksharov
|
||||
*
|
||||
* @param <V> value
|
||||
*/
|
||||
public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> {
|
||||
|
||||
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), 7, ValueType.MAP_KEY, ValueType.OBJECT);
|
||||
private static final RedisCommand<Integer> EVAL_SIZE = new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 6, ValueType.MAP_KEY);
|
||||
private static final RedisCommand<Set<Object>> EVAL_READALL = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), 6, ValueType.MAP_KEY);
|
||||
private static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE));
|
||||
private static final RedisCommand<Boolean> EVAL_CONTAINS_ALL_WITH_VALUES = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.OBJECTS);
|
||||
|
||||
private final Object key;
|
||||
private final String timeoutSetName;
|
||||
|
||||
public RedissonSetMultimapValues(Codec codec, CommandAsyncExecutor commandExecutor, String name, String timeoutSetName, Object key) {
|
||||
super(codec, commandExecutor, name);
|
||||
this.timeoutSetName = timeoutSetName;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return get(sizeAsync());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Integer> sizeAsync() {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_SIZE,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('scard', KEYS[2]);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return size() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {
|
||||
return get(containsAsync(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> containsAsync(Object o) {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_VALUE,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('sismember', KEYS[2], ARGV[3]);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
|
||||
}
|
||||
|
||||
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
|
||||
Future<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return {0, {}};"
|
||||
+ "end;"
|
||||
|
||||
+ "return redis.call('sscan', KEYS[2], ARGV[2]);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), startPos, key);
|
||||
return get(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<V> iterator() {
|
||||
return new Iterator<V>() {
|
||||
|
||||
private List<V> firstValues;
|
||||
private Iterator<V> iter;
|
||||
private InetSocketAddress client;
|
||||
private long nextIterPos;
|
||||
|
||||
private boolean currentElementRemoved;
|
||||
private boolean removeExecuted;
|
||||
private V value;
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
if (iter == null || !iter.hasNext()) {
|
||||
if (nextIterPos == -1) {
|
||||
return false;
|
||||
}
|
||||
long prevIterPos = nextIterPos;
|
||||
ListScanResult<V> res = scanIterator(client, nextIterPos);
|
||||
client = res.getRedisClient();
|
||||
if (nextIterPos == 0 && firstValues == null) {
|
||||
firstValues = res.getValues();
|
||||
} else if (res.getValues().equals(firstValues)) {
|
||||
return false;
|
||||
}
|
||||
iter = res.getValues().iterator();
|
||||
nextIterPos = res.getPos();
|
||||
if (prevIterPos == nextIterPos && !removeExecuted) {
|
||||
nextIterPos = -1;
|
||||
}
|
||||
}
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public V next() {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException("No such element at index");
|
||||
}
|
||||
|
||||
value = iter.next();
|
||||
currentElementRemoved = false;
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
if (currentElementRemoved) {
|
||||
throw new IllegalStateException("Element been already deleted");
|
||||
}
|
||||
if (iter == null) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
iter.remove();
|
||||
RedissonSetMultimapValues.this.remove(value);
|
||||
currentElementRemoved = true;
|
||||
removeExecuted = true;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Set<V>> readAllAsync() {
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_READALL,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return {};"
|
||||
+ "end; "
|
||||
+ "return redis.call('smembers', KEYS[2]);",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<V> readAll() {
|
||||
return get(readAllAsync());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
Set<Object> res = (Set<Object>) get(readAllAsync());
|
||||
return res.toArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
Set<Object> res = (Set<Object>) get(readAllAsync());
|
||||
return res.toArray(a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(V e) {
|
||||
return get(addAsync(e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> addAsync(V e) {
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public V removeRandom() {
|
||||
return get(removeRandomAsync());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<V> removeRandomAsync() {
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> removeAsync(Object o) {
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; "
|
||||
+ "return redis.call('srem', KEYS[2], ARGV[3]) > 0 and 1 or 0;",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object value) {
|
||||
return get(removeAsync((V)value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> moveAsync(String destination, V member) {
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SMOVE, getName(), destination, member);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean move(String destination, V member) {
|
||||
return get(moveAsync(destination, member));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
return get(containsAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> containsAllAsync(Collection<?> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 2);
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
args.add(System.currentTimeMillis());
|
||||
args.add(keyState);
|
||||
args.addAll(c);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; " +
|
||||
"local s = redis.call('smembers', KEYS[2]);" +
|
||||
"for i = 0, table.getn(s), 1 do " +
|
||||
"for j = 2, table.getn(ARGV), 1 do "
|
||||
+ "if ARGV[j] == s[i] "
|
||||
+ "then table.remove(ARGV, j) end "
|
||||
+ "end; "
|
||||
+ "end;"
|
||||
+ "return table.getn(ARGV) == 2 and 1 or 0; ",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends V> c) {
|
||||
if (c.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return get(addAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> addAllAsync(Collection<? extends V> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 1);
|
||||
args.add(getName());
|
||||
args.addAll(c);
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_BOOL, args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
return get(retainAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> retainAllAsync(Collection<?> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 2);
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
args.add(System.currentTimeMillis());
|
||||
args.add(keyState);
|
||||
args.addAll(c);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; " +
|
||||
|
||||
"local changed = 0 " +
|
||||
"local s = redis.call('smembers', KEYS[2]) "
|
||||
+ "local i = 0 "
|
||||
+ "while i <= table.getn(s) do "
|
||||
+ "local element = s[i] "
|
||||
+ "local isInAgrs = false "
|
||||
+ "for j = 2, table.getn(ARGV), 1 do "
|
||||
+ "if ARGV[j] == element then "
|
||||
+ "isInAgrs = true "
|
||||
+ "break "
|
||||
+ "end "
|
||||
+ "end "
|
||||
+ "if isInAgrs == false then "
|
||||
+ "redis.call('SREM', KEYS[2], element) "
|
||||
+ "changed = 1 "
|
||||
+ "end "
|
||||
+ "i = i + 1 "
|
||||
+ "end "
|
||||
+ "return changed ",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Boolean> removeAllAsync(Collection<?> c) {
|
||||
List<Object> args = new ArrayList<Object>(c.size() + 2);
|
||||
try {
|
||||
byte[] keyState = codec.getMapKeyEncoder().encode(key);
|
||||
args.add(System.currentTimeMillis());
|
||||
args.add(keyState);
|
||||
args.addAll(c);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
|
||||
"local expireDate = 92233720368547758; " +
|
||||
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
|
||||
+ "if expireDateScore ~= false then "
|
||||
+ "expireDate = tonumber(expireDateScore) "
|
||||
+ "end; "
|
||||
+ "if expireDate <= tonumber(ARGV[1]) then "
|
||||
+ "return 0;"
|
||||
+ "end; " +
|
||||
|
||||
"local v = 0 " +
|
||||
"for i = 2, table.getn(ARGV), 1 do "
|
||||
+ "if redis.call('srem', KEYS[2], ARGV[i]) == 1 "
|
||||
+ "then v = 1 end "
|
||||
+"end "
|
||||
+ "return v ",
|
||||
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
return get(removeAllAsync(c));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int union(String... names) {
|
||||
return get(unionAsync(names));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Integer> unionAsync(String... names) {
|
||||
List<Object> args = new ArrayList<Object>(names.length + 1);
|
||||
args.add(getName());
|
||||
args.addAll(Arrays.asList(names));
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNIONSTORE_INT, args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<V> readUnion(String... names) {
|
||||
return get(readUnionAsync(names));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Set<V>> readUnionAsync(String... names) {
|
||||
List<Object> args = new ArrayList<Object>(names.length + 1);
|
||||
args.add(getName());
|
||||
args.addAll(Arrays.asList(names));
|
||||
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNION, args.toArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
delete();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
public class RemoteServiceKey {
|
||||
|
||||
private final Class<?> serviceInterface;
|
||||
private final String methodName;
|
||||
|
||||
public RemoteServiceKey(Class<?> serviceInterface, String methodName) {
|
||||
super();
|
||||
this.serviceInterface = serviceInterface;
|
||||
this.methodName = methodName;
|
||||
}
|
||||
|
||||
public String getMethodName() {
|
||||
return methodName;
|
||||
}
|
||||
|
||||
public Class<?> getServiceInterface() {
|
||||
return serviceInterface;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((methodName == null) ? 0 : methodName.hashCode());
|
||||
result = prime * result + ((serviceInterface == null) ? 0 : serviceInterface.getName().hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
RemoteServiceKey other = (RemoteServiceKey) obj;
|
||||
if (methodName == null) {
|
||||
if (other.methodName != null)
|
||||
return false;
|
||||
} else if (!methodName.equals(other.methodName))
|
||||
return false;
|
||||
if (serviceInterface == null) {
|
||||
if (other.serviceInterface != null)
|
||||
return false;
|
||||
} else if (!serviceInterface.equals(other.serviceInterface))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
public class RemoteServiceMethod {
|
||||
|
||||
private final Object bean;
|
||||
private final Method method;
|
||||
|
||||
public RemoteServiceMethod(Method method, Object bean) {
|
||||
super();
|
||||
this.method = method;
|
||||
this.bean = bean;
|
||||
}
|
||||
|
||||
public Object getBean() {
|
||||
return bean;
|
||||
}
|
||||
|
||||
public Method getMethod() {
|
||||
return method;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class RemoteServiceRequest {
|
||||
|
||||
private String requestId;
|
||||
private String methodName;
|
||||
private Object[] args;
|
||||
|
||||
public RemoteServiceRequest() {
|
||||
}
|
||||
|
||||
public RemoteServiceRequest(String requestId, String methodName, Object[] args) {
|
||||
super();
|
||||
this.requestId = requestId;
|
||||
this.methodName = methodName;
|
||||
this.args = args;
|
||||
}
|
||||
|
||||
public String getRequestId() {
|
||||
return requestId;
|
||||
}
|
||||
|
||||
public Object[] getArgs() {
|
||||
return args;
|
||||
}
|
||||
|
||||
public String getMethodName() {
|
||||
return methodName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RemoteServiceRequest[requestId=" + requestId + ", methodName=" + methodName + ", args="
|
||||
+ Arrays.toString(args) + "]";
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson;
|
||||
|
||||
public class RemoteServiceResponse {
|
||||
|
||||
private Object result;
|
||||
private Throwable error;
|
||||
|
||||
public RemoteServiceResponse() {
|
||||
}
|
||||
|
||||
public RemoteServiceResponse(Object result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public RemoteServiceResponse(Throwable error) {
|
||||
this.error = error;
|
||||
}
|
||||
|
||||
public Throwable getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
public Object getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RemoteServiceResponse [result=" + result + ", error=" + error + "]";
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.core;
|
||||
|
||||
public interface RListMultimapCache<K, V> extends RListMultimap<K, V>, RMultimapCache<K, V> {
|
||||
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.core;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface RMultimapCache<K, V> extends RMultimap<K, V>, RMultimapCacheAsync<K, V> {
|
||||
|
||||
/**
|
||||
* Set a timeout for key. After the timeout has expired,
|
||||
* the key and its values will automatically be deleted.
|
||||
*
|
||||
* @param key
|
||||
* @param timeToLive - timeout before key will be deleted
|
||||
* @param timeUnit - timeout time unit
|
||||
* @return <code>true</code> if key exists and the timeout was set and <code>false</code> if key not exists
|
||||
*/
|
||||
boolean expireKey(K key, long timeToLive, TimeUnit timeUnit);
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.core;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.util.concurrent.Future;
|
||||
|
||||
public interface RMultimapCacheAsync<K, V> extends RMultimapAsync<K, V> {
|
||||
|
||||
/**
|
||||
* Set a timeout for key in async mode. After the timeout has expired,
|
||||
* the key and its values will automatically be deleted.
|
||||
*
|
||||
* @param key
|
||||
* @param timeToLive - timeout before key will be deleted
|
||||
* @param timeUnit - timeout time unit
|
||||
* @return <code>true</code> if key exists and the timeout was set and <code>false</code> if key not exists
|
||||
*/
|
||||
Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit);
|
||||
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.core;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public interface RRemoteService {
|
||||
|
||||
/**
|
||||
* Register remote service with single executor
|
||||
*
|
||||
* @param remoteInterface
|
||||
* @param object
|
||||
*/
|
||||
<T> void register(Class<T> remoteInterface, T object);
|
||||
|
||||
/**
|
||||
* Register remote service with custom executors amount
|
||||
*
|
||||
* @param remoteInterface
|
||||
* @param object
|
||||
* @param executorsAmount
|
||||
*/
|
||||
<T> void register(Class<T> remoteInterface, T object, int executorsAmount);
|
||||
|
||||
/**
|
||||
* Get remote service object for remote invocations
|
||||
*
|
||||
* @param remoteInterface
|
||||
* @return
|
||||
*/
|
||||
<T> T get(Class<T> remoteInterface);
|
||||
|
||||
/**
|
||||
* Get remote service object for remote invocations
|
||||
* with specified invocation timeout
|
||||
*
|
||||
* @param remoteInterface
|
||||
* @param timeout - invocation timeout
|
||||
* @param timeUnit
|
||||
* @return
|
||||
*/
|
||||
<T> T get(Class<T> remoteInterface, int timeout, TimeUnit timeUnit);
|
||||
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/**
|
||||
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.redisson.core;
|
||||
|
||||
public interface RSetMultimapCache<K, V> extends RSetMultimap<K, V>, RMultimapCache<K, V> {
|
||||
|
||||
}
|
@ -1,22 +1,635 @@
|
||||
package org.redisson;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class RedisRunner {
|
||||
|
||||
private static final String redisFolder = "C:\\Devel\\projects\\redis\\Redis-x64-3.0.500\\";
|
||||
public enum REDIS_OPTIONS {
|
||||
|
||||
public static Process runRedis(String configPath) throws IOException, InterruptedException {
|
||||
BINARY_PATH,
|
||||
DAEMONIZE,
|
||||
PIDFILE,
|
||||
PORT,
|
||||
TCP_BACKLOG,
|
||||
BIND(true),
|
||||
UNIXSOCKET,
|
||||
UNIXSOCKETPERM,
|
||||
TIMEOUT,
|
||||
TCP_KEEPALIVE,
|
||||
LOGLEVEL,
|
||||
LOGFILE,
|
||||
SYSLOG_ENABLED,
|
||||
SYSLOG_IDENT,
|
||||
SYSLOG_FACILITY,
|
||||
DATABASES,
|
||||
SAVE(true),
|
||||
STOP_WRITES_ON_BGSAVE_ERROR,
|
||||
RDBCOMPRESSION,
|
||||
RDBCHECKSUM,
|
||||
DBFILENAME,
|
||||
DIR,
|
||||
SLAVEOF,
|
||||
MASTERAUTH,
|
||||
SLAVE_SERVE_STALE_DATA,
|
||||
SLAVE_READ_ONLY,
|
||||
REPL_DISKLESS_SYNC,
|
||||
REPL_DISKLESS_SYNC_DELAY,
|
||||
REPL_PING_SLAVE_PERIOD,
|
||||
REPL_TIMEOUT,
|
||||
REPL_DISABLE_TCP_NODELAY,
|
||||
REPL_BACKLOG_SIZE,
|
||||
REPL_BACKLOG_TTL,
|
||||
SLAVE_PRIORITY,
|
||||
MIN_SLAVES_TO_WRITE,
|
||||
MIN_SLAVES_MAX_LAG,
|
||||
REQUREPASS,
|
||||
RENAME_COMMAND(true),
|
||||
MAXCLIENTS,
|
||||
MAXMEMORY,
|
||||
MAXMEMORY_POLICY,
|
||||
MAXMEMORY_SAMPLE,
|
||||
APPENDONLY,
|
||||
APPENDFILENAME,
|
||||
APPENDFSYNC,
|
||||
NO_APPENDFSYNC_ON_REWRITE,
|
||||
AUTO_AOF_REWRITE_PERCENTAGE,
|
||||
AUTO_AOF_REWRITE_MIN_SIZE,
|
||||
AOF_LOAD_TRUNCATED,
|
||||
LUA_TIME_LIMIT,
|
||||
CLUSTER_ENABLED,
|
||||
CLUSTER_CONFIG_FILE,
|
||||
CLUSTER_NODE_TIMEOUT,
|
||||
CLUSTER_SLAVE_VALIDITY_FACTOR,
|
||||
CLUSTER_MIGRATION_BARRIER,
|
||||
CLUSTER_REQUIRE_FULL_COVERAGE,
|
||||
SLOWLOG_LOG_SLOWER_THAN,
|
||||
SLOWLOG_MAX_LEN,
|
||||
LATENCY_MONITOR_THRESHOLD,
|
||||
NOFITY_KEYSPACE_EVENTS,
|
||||
HASH_MAX_ZIPLIST_ENTRIES,
|
||||
HASH_MAX_ZIPLIST_VALUE,
|
||||
LIST_MAX_ZIPLIST_ENTRIES,
|
||||
LIST_MAX_ZIPLIST_VALUE,
|
||||
SET_MAX_INTSET_ENTRIES,
|
||||
ZSET_MAX_ZIPLIST_ENTRIES,
|
||||
ZSET_MAX_ZIPLIST_VALUE,
|
||||
HLL_SPARSE_MAX_BYTES,
|
||||
ACTIVEREHASHING,
|
||||
CLIENT_OUTPUT_BUFFER_LIMIT$NORMAL,
|
||||
CLIENT_OUTPUT_BUFFER_LIMIT$SLAVE,
|
||||
CLIENT_OUTPUT_BUFFER_LIMIT$PUBSUB,
|
||||
HZ,
|
||||
AOF_REWRITE_INCREMENTAL_FSYNC;
|
||||
|
||||
private final boolean allowMutiple;
|
||||
|
||||
private REDIS_OPTIONS() {
|
||||
this.allowMutiple = false;
|
||||
}
|
||||
|
||||
private REDIS_OPTIONS(boolean allowMutiple) {
|
||||
this.allowMutiple = allowMutiple;
|
||||
}
|
||||
|
||||
public boolean isAllowMultiple() {
|
||||
return allowMutiple;
|
||||
}
|
||||
}
|
||||
|
||||
public enum LOGLEVEL_OPTIONS {
|
||||
|
||||
DEBUG,
|
||||
VERBOSE,
|
||||
NOTICE,
|
||||
WARNING
|
||||
}
|
||||
|
||||
public enum SYSLOG_FACILITY_OPTIONS {
|
||||
|
||||
USER,
|
||||
LOCAL0,
|
||||
LOCAL1,
|
||||
LOCAL2,
|
||||
LOCAL3,
|
||||
LOCAL4,
|
||||
LOCAL5,
|
||||
LOCAL6,
|
||||
LOCAL7
|
||||
}
|
||||
|
||||
public enum MAX_MEMORY_POLICY_OPTIONS {
|
||||
|
||||
VOLATILE_LRU,
|
||||
ALLKEYS_LRU,
|
||||
VOLATILE_RANDOM,
|
||||
ALLKEYS_RANDOM,
|
||||
VOLATILE_TTL,
|
||||
NOEVICTION
|
||||
}
|
||||
|
||||
public enum APPEND_FSYNC_MODE_OPTIONS {
|
||||
|
||||
ALWAYS,
|
||||
EVERYSEC,
|
||||
NO
|
||||
}
|
||||
|
||||
public enum KEYSPACE_EVENTS_OPTIONS {
|
||||
|
||||
K,
|
||||
E,
|
||||
g,
|
||||
$,
|
||||
l,
|
||||
s,
|
||||
h,
|
||||
z,
|
||||
x,
|
||||
e,
|
||||
A
|
||||
}
|
||||
|
||||
private static final String redisBinary;
|
||||
|
||||
private final LinkedHashMap<REDIS_OPTIONS, String> options = new LinkedHashMap<>();
|
||||
|
||||
static {
|
||||
redisBinary = Optional.ofNullable(System.getProperty("redisBinary"))
|
||||
.orElse("C:\\Devel\\projects\\redis\\Redis-x64-3.0.500\\redis-server.exe");
|
||||
}
|
||||
|
||||
{
|
||||
this.options.put(REDIS_OPTIONS.BINARY_PATH, redisBinary);
|
||||
}
|
||||
|
||||
/**
|
||||
* To change the <b>redisBinary</b> system property for running the test,
|
||||
* use <i>argLine</i> option from surefire plugin:
|
||||
*
|
||||
* $ mvn -DargLine="-DredisBinary=`which redis-server`" -Punit-test clean \
|
||||
* verify
|
||||
*
|
||||
* @param configPath
|
||||
* @return Process running redis instance
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @see
|
||||
* <a href="http://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#argLine">
|
||||
* http://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#argLine</a>
|
||||
*/
|
||||
public static RedisProcess runRedisWithConfigFile(String configPath) throws IOException, InterruptedException {
|
||||
URL resource = RedisRunner.class.getResource(configPath);
|
||||
return runWithOptions(redisBinary, resource.getFile());
|
||||
}
|
||||
|
||||
ProcessBuilder master = new ProcessBuilder(redisFolder + "redis-server.exe", resource.getFile().substring(1));
|
||||
master.directory(new File(redisFolder));
|
||||
private static RedisProcess runWithOptions(String... options) throws IOException, InterruptedException {
|
||||
List<String> launchOptions = Arrays.stream(options)
|
||||
.map(x -> Arrays.asList(x.split(" "))).flatMap(x -> x.stream())
|
||||
.collect(Collectors.toList());
|
||||
System.out.println("REDIS LAUNCH OPTIONS: " + Arrays.toString(launchOptions.toArray()));
|
||||
ProcessBuilder master = new ProcessBuilder(launchOptions)
|
||||
.redirectErrorStream(true)
|
||||
.directory(new File(redisBinary).getParentFile());
|
||||
Process p = master.start();
|
||||
new Thread(() -> {
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
|
||||
String line;
|
||||
try {
|
||||
while (p.isAlive() && (line = reader.readLine()) != null) {
|
||||
System.out.println("REDIS PROCESS: " + line);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
System.out.println("Exception: " + ex.getLocalizedMessage());
|
||||
}
|
||||
}).start();
|
||||
Thread.sleep(1000);
|
||||
return p;
|
||||
return new RedisProcess(p);
|
||||
}
|
||||
|
||||
public RedisProcess run() throws IOException, InterruptedException {
|
||||
return runWithOptions(options.values().toArray(new String[0]));
|
||||
}
|
||||
|
||||
private void addConfigOption(REDIS_OPTIONS option, Object... args) {
|
||||
StringBuilder sb = new StringBuilder("--")
|
||||
.append(option.toString()
|
||||
.replaceAll("_", "-")
|
||||
.replaceAll("\\$", " ")
|
||||
.toLowerCase())
|
||||
.append(" ")
|
||||
.append(Arrays.stream(args).map(Object::toString)
|
||||
.collect(Collectors.joining(" ")));
|
||||
this.options.put(option,
|
||||
option.isAllowMultiple()
|
||||
? sb.insert(0, this.options.getOrDefault(option, "")).toString()
|
||||
: sb.toString());
|
||||
}
|
||||
|
||||
private String convertBoolean(boolean b) {
|
||||
return b ? "yes" : "no";
|
||||
}
|
||||
|
||||
public RedisRunner daemonize(boolean daemonize) {
|
||||
addConfigOption(REDIS_OPTIONS.DAEMONIZE, convertBoolean(daemonize));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner pidfile(String pidfile) {
|
||||
addConfigOption(REDIS_OPTIONS.PIDFILE, pidfile);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner port(int port) {
|
||||
addConfigOption(REDIS_OPTIONS.PORT, port);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner tcpBacklog(long tcpBacklog) {
|
||||
addConfigOption(REDIS_OPTIONS.TCP_BACKLOG, tcpBacklog);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner bind(String bind) {
|
||||
addConfigOption(REDIS_OPTIONS.BIND, bind);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner unixsocket(String unixsocket) {
|
||||
addConfigOption(REDIS_OPTIONS.UNIXSOCKET, unixsocket);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner unixsocketperm(int unixsocketperm) {
|
||||
addConfigOption(REDIS_OPTIONS.UNIXSOCKETPERM, unixsocketperm);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner timeout(long timeout) {
|
||||
addConfigOption(REDIS_OPTIONS.TIMEOUT, timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner tcpKeepalive(long tcpKeepalive) {
|
||||
addConfigOption(REDIS_OPTIONS.TCP_KEEPALIVE, tcpKeepalive);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner loglevel(LOGLEVEL_OPTIONS loglevel) {
|
||||
addConfigOption(REDIS_OPTIONS.LOGLEVEL, loglevel.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner logfile(String logfile) {
|
||||
addConfigOption(REDIS_OPTIONS.LOGLEVEL, logfile);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner syslogEnabled(boolean syslogEnabled) {
|
||||
addConfigOption(REDIS_OPTIONS.SYSLOG_ENABLED, convertBoolean(syslogEnabled));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner syslogIdent(String syslogIdent) {
|
||||
addConfigOption(REDIS_OPTIONS.SYSLOG_IDENT, syslogIdent);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner syslogFacility(SYSLOG_FACILITY_OPTIONS syslogFacility) {
|
||||
addConfigOption(REDIS_OPTIONS.SYSLOG_IDENT, syslogFacility.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner databases(int databases) {
|
||||
addConfigOption(REDIS_OPTIONS.DATABASES, databases);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner save(long seconds, long changes) {
|
||||
addConfigOption(REDIS_OPTIONS.SAVE, seconds, changes);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner stopWritesOnBgsaveError(boolean stopWritesOnBgsaveError) {
|
||||
addConfigOption(REDIS_OPTIONS.STOP_WRITES_ON_BGSAVE_ERROR, convertBoolean(stopWritesOnBgsaveError));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner rdbcompression(boolean rdbcompression) {
|
||||
addConfigOption(REDIS_OPTIONS.RDBCOMPRESSION, convertBoolean(rdbcompression));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner rdbchecksum(boolean rdbchecksum) {
|
||||
addConfigOption(REDIS_OPTIONS.RDBCHECKSUM, convertBoolean(rdbchecksum));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner dbfilename(String dbfilename) {
|
||||
addConfigOption(REDIS_OPTIONS.DBFILENAME, dbfilename);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner dir(String dir) {
|
||||
addConfigOption(REDIS_OPTIONS.DIR, dir);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner slaveof(Inet4Address masterip, int port) {
|
||||
addConfigOption(REDIS_OPTIONS.SLAVEOF, masterip.getHostAddress(), port);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner masterauth(String masterauth) {
|
||||
addConfigOption(REDIS_OPTIONS.MASTERAUTH, masterauth);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner slaveServeStaleData(boolean slaveServeStaleData) {
|
||||
addConfigOption(REDIS_OPTIONS.SLAVE_SERVE_STALE_DATA, convertBoolean(slaveServeStaleData));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner slaveReadOnly(boolean slaveReadOnly) {
|
||||
addConfigOption(REDIS_OPTIONS.SLAVE_READ_ONLY, convertBoolean(slaveReadOnly));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replDisklessSync(boolean replDisklessSync) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_DISKLESS_SYNC, convertBoolean(replDisklessSync));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replDisklessSyncDelay(long replDisklessSyncDelay) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_DISKLESS_SYNC_DELAY, replDisklessSyncDelay);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replPingSlavePeriod(long replPingSlavePeriod) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_PING_SLAVE_PERIOD, replPingSlavePeriod);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replTimeout(long replTimeout) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_TIMEOUT, replTimeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replDisableTcpNodelay(boolean replDisableTcpNodelay) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_DISABLE_TCP_NODELAY, convertBoolean(replDisableTcpNodelay));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replBacklogSize(String replBacklogSize) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_BACKLOG_SIZE, replBacklogSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner replBacklogTtl(long replBacklogTtl) {
|
||||
addConfigOption(REDIS_OPTIONS.REPL_BACKLOG_TTL, replBacklogTtl);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner slavePriority(long slavePriority) {
|
||||
addConfigOption(REDIS_OPTIONS.SLAVE_PRIORITY, slavePriority);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner minSlaveToWrite(long minSlaveToWrite) {
|
||||
addConfigOption(REDIS_OPTIONS.MIN_SLAVES_TO_WRITE, minSlaveToWrite);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner minSlaveMaxLag(long minSlaveMaxLag) {
|
||||
addConfigOption(REDIS_OPTIONS.MIN_SLAVES_MAX_LAG, minSlaveMaxLag);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner requirepass(String requirepass) {
|
||||
addConfigOption(REDIS_OPTIONS.REQUREPASS, requirepass);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner renameCommand(String renameCommand) {
|
||||
addConfigOption(REDIS_OPTIONS.RENAME_COMMAND, renameCommand);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner maxclients(long maxclients) {
|
||||
addConfigOption(REDIS_OPTIONS.MAXCLIENTS, maxclients);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner maxmemory(String maxmemory) {
|
||||
addConfigOption(REDIS_OPTIONS.MAXMEMORY, maxmemory);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner maxmemoryPolicy(MAX_MEMORY_POLICY_OPTIONS maxmemoryPolicy) {
|
||||
addConfigOption(REDIS_OPTIONS.MAXMEMORY, maxmemoryPolicy.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner maxmemorySamples(long maxmemorySamples) {
|
||||
addConfigOption(REDIS_OPTIONS.MAXMEMORY, maxmemorySamples);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner appendonly(boolean appendonly) {
|
||||
addConfigOption(REDIS_OPTIONS.APPENDONLY, convertBoolean(appendonly));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner appendfilename(String appendfilename) {
|
||||
addConfigOption(REDIS_OPTIONS.APPENDFILENAME, appendfilename);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner appendfsync(APPEND_FSYNC_MODE_OPTIONS appendfsync) {
|
||||
addConfigOption(REDIS_OPTIONS.APPENDFSYNC, appendfsync.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner noAppendfsyncOnRewrite(boolean noAppendfsyncOnRewrite) {
|
||||
addConfigOption(REDIS_OPTIONS.NO_APPENDFSYNC_ON_REWRITE, convertBoolean(noAppendfsyncOnRewrite));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner autoAofRewritePercentage(int autoAofRewritePercentage) {
|
||||
addConfigOption(REDIS_OPTIONS.AUTO_AOF_REWRITE_PERCENTAGE, autoAofRewritePercentage);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner autoAofRewriteMinSize(String autoAofRewriteMinSize) {
|
||||
addConfigOption(REDIS_OPTIONS.AUTO_AOF_REWRITE_MIN_SIZE, autoAofRewriteMinSize);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner aofLoadTruncated(boolean aofLoadTruncated) {
|
||||
addConfigOption(REDIS_OPTIONS.AOF_LOAD_TRUNCATED, convertBoolean(aofLoadTruncated));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner luaTimeLimit(long luaTimeLimit) {
|
||||
addConfigOption(REDIS_OPTIONS.AOF_LOAD_TRUNCATED, luaTimeLimit);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clusterEnabled(boolean clusterEnabled) {
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_ENABLED, convertBoolean(clusterEnabled));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clusterConfigFile(String clusterConfigFile) {
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_CONFIG_FILE, clusterConfigFile);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clusterNodeTimeout(long clusterNodeTimeout) {
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_NODE_TIMEOUT, clusterNodeTimeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clusterSlaveValidityFactor(long clusterSlaveValidityFactor) {
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_SLAVE_VALIDITY_FACTOR, clusterSlaveValidityFactor);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clusterMigrationBarrier(long clusterMigrationBarrier) {
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_MIGRATION_BARRIER, clusterMigrationBarrier);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clusterRequireFullCoverage(boolean clusterRequireFullCoverage) {
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_REQUIRE_FULL_COVERAGE, convertBoolean(clusterRequireFullCoverage));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner slowlogLogSlowerThan(long slowlogLogSlowerThan) {
|
||||
addConfigOption(REDIS_OPTIONS.SLOWLOG_LOG_SLOWER_THAN, slowlogLogSlowerThan);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner slowlogMaxLen(long slowlogMaxLen) {
|
||||
addConfigOption(REDIS_OPTIONS.SLOWLOG_MAX_LEN, slowlogMaxLen);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner latencyMonitorThreshold(long latencyMonitorThreshold) {
|
||||
addConfigOption(REDIS_OPTIONS.LATENCY_MONITOR_THRESHOLD, latencyMonitorThreshold);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner notifyKeyspaceEvents(KEYSPACE_EVENTS_OPTIONS notifyKeyspaceEvents) {
|
||||
String existing = this.options.getOrDefault(REDIS_OPTIONS.CLUSTER_CONFIG_FILE, "");
|
||||
addConfigOption(REDIS_OPTIONS.CLUSTER_CONFIG_FILE,
|
||||
existing.contains(notifyKeyspaceEvents.toString())
|
||||
? existing
|
||||
: (existing + notifyKeyspaceEvents.toString()));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner hashMaxZiplistEntries(long hashMaxZiplistEntries) {
|
||||
addConfigOption(REDIS_OPTIONS.HASH_MAX_ZIPLIST_ENTRIES, hashMaxZiplistEntries);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner hashMaxZiplistValue(long hashMaxZiplistValue) {
|
||||
addConfigOption(REDIS_OPTIONS.HASH_MAX_ZIPLIST_VALUE, hashMaxZiplistValue);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner listMaxZiplistEntries(long listMaxZiplistEntries) {
|
||||
addConfigOption(REDIS_OPTIONS.LIST_MAX_ZIPLIST_ENTRIES, listMaxZiplistEntries);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner listMaxZiplistValue(long listMaxZiplistValue) {
|
||||
addConfigOption(REDIS_OPTIONS.LIST_MAX_ZIPLIST_VALUE, listMaxZiplistValue);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner setMaxIntsetEntries(long setMaxIntsetEntries) {
|
||||
addConfigOption(REDIS_OPTIONS.SET_MAX_INTSET_ENTRIES, setMaxIntsetEntries);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner zsetMaxZiplistEntries(long zsetMaxZiplistEntries) {
|
||||
addConfigOption(REDIS_OPTIONS.ZSET_MAX_ZIPLIST_ENTRIES, zsetMaxZiplistEntries);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner zsetMaxZiplistValue(long zsetMaxZiplistValue) {
|
||||
addConfigOption(REDIS_OPTIONS.ZSET_MAX_ZIPLIST_VALUE, zsetMaxZiplistValue);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner hllSparseMaxBytes(long hllSparseMaxBytes) {
|
||||
addConfigOption(REDIS_OPTIONS.HLL_SPARSE_MAX_BYTES, hllSparseMaxBytes);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner activerehashing(boolean activerehashing) {
|
||||
addConfigOption(REDIS_OPTIONS.ACTIVEREHASHING, convertBoolean(activerehashing));
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clientOutputBufferLimit$Normal(String hardLimit, String softLimit, long softSeconds) {
|
||||
addConfigOption(REDIS_OPTIONS.CLIENT_OUTPUT_BUFFER_LIMIT$NORMAL, hardLimit, softLimit, softSeconds);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clientOutputBufferLimit$Slave(String hardLimit, String softLimit, long softSeconds) {
|
||||
addConfigOption(REDIS_OPTIONS.CLIENT_OUTPUT_BUFFER_LIMIT$SLAVE, hardLimit, softLimit, softSeconds);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner clientOutputBufferLimit$Pubsub(String hardLimit, String softLimit, long softSeconds) {
|
||||
addConfigOption(REDIS_OPTIONS.CLIENT_OUTPUT_BUFFER_LIMIT$PUBSUB, hardLimit, softLimit, softSeconds);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner hz(int hz) {
|
||||
addConfigOption(REDIS_OPTIONS.HZ, hz);
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisRunner aofRewriteIncrementalFsync(boolean aofRewriteIncrementalFsync) {
|
||||
addConfigOption(REDIS_OPTIONS.AOF_REWRITE_INCREMENTAL_FSYNC, convertBoolean(aofRewriteIncrementalFsync));
|
||||
return this;
|
||||
}
|
||||
|
||||
public static final class RedisProcess {
|
||||
|
||||
private final Process redisProcess;
|
||||
|
||||
private RedisProcess(Process redisProcess) {
|
||||
this.redisProcess = redisProcess;
|
||||
}
|
||||
|
||||
public int stop() throws InterruptedException {
|
||||
redisProcess.destroy();
|
||||
int exitCode = redisProcess.waitFor();
|
||||
return exitCode == 1 && isWindows() ? 0 : exitCode;
|
||||
}
|
||||
|
||||
public Process getRedisProcess() {
|
||||
return redisProcess;
|
||||
}
|
||||
|
||||
private boolean isWindows() {
|
||||
return System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH).contains("win");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,139 @@
|
||||
package org.redisson;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.redisson.core.RMultimapCache;
|
||||
|
||||
public class RedissonListMultimapCacheTest extends BaseTest {
|
||||
|
||||
@Test
|
||||
public void testContains() {
|
||||
RMultimapCache<String, String> multimap = redisson.getListMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
|
||||
assertThat(multimap.containsKey("1")).isTrue();
|
||||
assertThat(multimap.containsKey("2")).isFalse();
|
||||
|
||||
assertThat(multimap.containsValue("1")).isTrue();
|
||||
assertThat(multimap.containsValue("3")).isTrue();
|
||||
assertThat(multimap.containsValue("4")).isFalse();
|
||||
|
||||
assertThat(multimap.containsEntry("1", "1")).isTrue();
|
||||
assertThat(multimap.containsEntry("1", "3")).isTrue();
|
||||
assertThat(multimap.containsEntry("1", "4")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainsExpired() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getListMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.expireKey("1", 1, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(multimap.containsKey("1")).isFalse();
|
||||
assertThat(multimap.containsKey("2")).isFalse();
|
||||
|
||||
assertThat(multimap.containsValue("1")).isFalse();
|
||||
assertThat(multimap.containsValue("3")).isFalse();
|
||||
assertThat(multimap.containsValue("4")).isFalse();
|
||||
|
||||
assertThat(multimap.containsEntry("1", "1")).isFalse();
|
||||
assertThat(multimap.containsEntry("1", "3")).isFalse();
|
||||
assertThat(multimap.containsEntry("1", "4")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAll() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getListMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
|
||||
assertThat(multimap.getAll("1")).containsOnlyOnce("1", "2", "3");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllExpired() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getListMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.expireKey("1", 1, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(multimap.getAll("1")).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValues() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getListMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.put("1", "3");
|
||||
|
||||
assertThat(multimap.get("1").size()).isEqualTo(4);
|
||||
assertThat(multimap.get("1")).containsExactly("1", "2", "3", "3");
|
||||
assertThat(multimap.get("1").remove("3")).isTrue();
|
||||
assertThat(multimap.get("1").remove("3")).isTrue();
|
||||
assertThat(multimap.get("1").contains("3")).isFalse();
|
||||
assertThat(multimap.get("1").contains("2")).isTrue();
|
||||
assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isTrue();
|
||||
assertThat(multimap.get("1").containsAll(Arrays.asList("1", "2"))).isTrue();
|
||||
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isTrue();
|
||||
assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValuesExpired() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getListMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.expireKey("1", 1, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(multimap.get("1").size()).isZero();
|
||||
assertThat(multimap.get("1")).contains();
|
||||
assertThat(multimap.get("1").remove("3")).isFalse();
|
||||
assertThat(multimap.get("1").contains("3")).isFalse();
|
||||
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isFalse();
|
||||
assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isFalse();
|
||||
assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduler() throws InterruptedException {
|
||||
RMultimapCache<String, String> cache = redisson.getListMultimapCache("simple33");
|
||||
assertThat(cache.put("1", "1")).isTrue();
|
||||
assertThat(cache.put("1", "2")).isTrue();
|
||||
assertThat(cache.put("1", "3")).isTrue();
|
||||
assertThat(cache.put("2", "1")).isTrue();
|
||||
assertThat(cache.put("2", "2")).isTrue();
|
||||
assertThat(cache.put("2", "3")).isTrue();
|
||||
|
||||
assertThat(cache.expireKey("1", 2, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(cache.expireKey("2", 3, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(cache.expireKey("3", 3, TimeUnit.SECONDS)).isFalse();
|
||||
|
||||
assertThat(cache.size()).isEqualTo(6);
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
assertThat(cache.size()).isZero();
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,108 @@
|
||||
package org.redisson;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.redisson.client.RedisTimeoutException;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RedissonRemoteServiceTest extends BaseTest {
|
||||
|
||||
public interface RemoteInterface {
|
||||
|
||||
void voidMethod(String name, Long param);
|
||||
|
||||
Long resultMethod(Long value);
|
||||
|
||||
void errorMethod() throws IOException;
|
||||
|
||||
void errorMethodWithCause();
|
||||
|
||||
void timeoutMethod() throws InterruptedException;
|
||||
|
||||
}
|
||||
|
||||
public class RemoteImpl implements RemoteInterface {
|
||||
|
||||
@Override
|
||||
public void voidMethod(String name, Long param) {
|
||||
System.out.println(name + " " + param);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long resultMethod(Long value) {
|
||||
return value*2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void errorMethod() throws IOException {
|
||||
throw new IOException("Checking error throw");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void errorMethodWithCause() {
|
||||
try {
|
||||
int s = 2 / 0;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Checking error throw", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void timeoutMethod() throws InterruptedException {
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test(expected = RedisTimeoutException.class)
|
||||
public void testTimeout() throws InterruptedException {
|
||||
RedissonClient r1 = Redisson.create();
|
||||
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
|
||||
|
||||
RedissonClient r2 = Redisson.create();
|
||||
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class, 1, TimeUnit.SECONDS);
|
||||
|
||||
try {
|
||||
ri.timeoutMethod();
|
||||
} finally {
|
||||
r1.shutdown();
|
||||
r2.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvocations() {
|
||||
RedissonClient r1 = Redisson.create();
|
||||
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl());
|
||||
|
||||
RedissonClient r2 = Redisson.create();
|
||||
RemoteInterface ri = r2.getRemoteSerivce().get(RemoteInterface.class);
|
||||
|
||||
ri.voidMethod("someName", 100L);
|
||||
assertThat(ri.resultMethod(100L)).isEqualTo(200);
|
||||
|
||||
try {
|
||||
ri.errorMethod();
|
||||
Assert.fail();
|
||||
} catch (IOException e) {
|
||||
assertThat(e.getMessage()).isEqualTo("Checking error throw");
|
||||
}
|
||||
|
||||
try {
|
||||
ri.errorMethodWithCause();
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
assertThat(e.getCause()).isInstanceOf(ArithmeticException.class);
|
||||
assertThat(e.getCause().getMessage()).isEqualTo("/ by zero");
|
||||
}
|
||||
|
||||
r1.shutdown();
|
||||
r2.shutdown();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,190 @@
|
||||
package org.redisson;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.redisson.core.RMultimapCache;
|
||||
import org.redisson.core.RSetMultimap;
|
||||
|
||||
public class RedissonSetMultimapCacheTest extends BaseTest {
|
||||
|
||||
@Test
|
||||
public void testContains() {
|
||||
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
|
||||
assertThat(multimap.containsKey("1")).isTrue();
|
||||
assertThat(multimap.containsKey("2")).isFalse();
|
||||
|
||||
assertThat(multimap.containsValue("1")).isTrue();
|
||||
assertThat(multimap.containsValue("3")).isTrue();
|
||||
assertThat(multimap.containsValue("4")).isFalse();
|
||||
|
||||
assertThat(multimap.containsEntry("1", "1")).isTrue();
|
||||
assertThat(multimap.containsEntry("1", "3")).isTrue();
|
||||
assertThat(multimap.containsEntry("1", "4")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainsExpired() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.expireKey("1", 1, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(multimap.containsKey("1")).isFalse();
|
||||
assertThat(multimap.containsKey("2")).isFalse();
|
||||
|
||||
assertThat(multimap.containsValue("1")).isFalse();
|
||||
assertThat(multimap.containsValue("3")).isFalse();
|
||||
assertThat(multimap.containsValue("4")).isFalse();
|
||||
|
||||
assertThat(multimap.containsEntry("1", "1")).isFalse();
|
||||
assertThat(multimap.containsEntry("1", "3")).isFalse();
|
||||
assertThat(multimap.containsEntry("1", "4")).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAll() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
|
||||
assertThat(multimap.getAll("1")).containsOnlyOnce("1", "2", "3");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllExpired() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.expireKey("1", 1, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(multimap.getAll("1")).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValues() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.put("1", "3");
|
||||
|
||||
assertThat(multimap.get("1").size()).isEqualTo(3);
|
||||
assertThat(multimap.get("1")).containsOnlyOnce("1", "2", "3");
|
||||
assertThat(multimap.get("1").remove("3")).isTrue();
|
||||
assertThat(multimap.get("1").contains("3")).isFalse();
|
||||
assertThat(multimap.get("1").contains("2")).isTrue();
|
||||
assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isTrue();
|
||||
assertThat(multimap.get("1").containsAll(Arrays.asList("1", "2"))).isTrue();
|
||||
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isTrue();
|
||||
assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValuesExpired() throws InterruptedException {
|
||||
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
|
||||
multimap.put("1", "1");
|
||||
multimap.put("1", "2");
|
||||
multimap.put("1", "3");
|
||||
multimap.expireKey("1", 1, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(multimap.get("1").size()).isZero();
|
||||
assertThat(multimap.get("1")).contains();
|
||||
assertThat(multimap.get("1").remove("3")).isFalse();
|
||||
assertThat(multimap.get("1").contains("3")).isFalse();
|
||||
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isFalse();
|
||||
assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isFalse();
|
||||
assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScheduler() throws InterruptedException {
|
||||
RMultimapCache<String, String> cache = redisson.getSetMultimapCache("simple33");
|
||||
assertThat(cache.put("1", "1")).isTrue();
|
||||
assertThat(cache.put("1", "2")).isTrue();
|
||||
assertThat(cache.put("1", "3")).isTrue();
|
||||
assertThat(cache.put("2", "1")).isTrue();
|
||||
assertThat(cache.put("2", "2")).isTrue();
|
||||
assertThat(cache.put("2", "3")).isTrue();
|
||||
|
||||
assertThat(cache.expireKey("1", 2, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(cache.expireKey("2", 3, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(cache.expireKey("3", 3, TimeUnit.SECONDS)).isFalse();
|
||||
|
||||
assertThat(cache.size()).isEqualTo(6);
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
assertThat(cache.size()).isZero();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpire() throws InterruptedException {
|
||||
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
|
||||
map.put("1", "2");
|
||||
map.put("2", "3");
|
||||
|
||||
map.expire(100, TimeUnit.MILLISECONDS);
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
assertThat(map.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpireAt() throws InterruptedException {
|
||||
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
|
||||
map.put("1", "2");
|
||||
map.put("2", "3");
|
||||
|
||||
map.expireAt(System.currentTimeMillis() + 100);
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
assertThat(map.size()).isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClearExpire() throws InterruptedException {
|
||||
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
|
||||
map.put("1", "2");
|
||||
map.put("2", "3");
|
||||
|
||||
map.expireAt(System.currentTimeMillis() + 100);
|
||||
|
||||
map.clearExpire();
|
||||
|
||||
Thread.sleep(500);
|
||||
|
||||
assertThat(map.size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelete() {
|
||||
RSetMultimap<String, String> map = redisson.getSetMultimapCache("simple");
|
||||
map.put("1", "2");
|
||||
map.put("2", "3");
|
||||
assertThat(map.delete()).isTrue();
|
||||
|
||||
RSetMultimap<String, String> map2 = redisson.getSetMultimapCache("simple1");
|
||||
assertThat(map2.delete()).isFalse();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue