RSetMultimapCache added. #428

pull/456/head
Nikita 9 years ago
parent 7c53c1f730
commit dbc0149da2

@ -57,6 +57,7 @@ import org.redisson.core.RListMultimap;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
import org.redisson.core.RMultimapCache;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
@ -66,6 +67,7 @@ import org.redisson.core.RSemaphore;
import org.redisson.core.RSet;
import org.redisson.core.RSetCache;
import org.redisson.core.RSetMultimap;
import org.redisson.core.RSetMultimapCache;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
@ -265,6 +267,14 @@ public class Redisson implements RedissonClient {
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(commandExecutor, name);
}
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(commandExecutor, name);
}
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(codec, commandExecutor, name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {

@ -31,7 +31,6 @@ import org.redisson.core.RBlockingDeque;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBloomFilter;
import org.redisson.core.RBucket;
import org.redisson.core.RMapCache;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
@ -41,6 +40,7 @@ import org.redisson.core.RList;
import org.redisson.core.RListMultimap;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RReadWriteLock;
@ -50,6 +50,7 @@ import org.redisson.core.RSemaphore;
import org.redisson.core.RSet;
import org.redisson.core.RSetCache;
import org.redisson.core.RSetMultimap;
import org.redisson.core.RSetMultimapCache;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
@ -218,7 +219,7 @@ public interface RedissonClient {
<V> RList<V> getList(String name, Codec codec);
/**
* Returns List based MultiMap instance by name.
* Returns List based Multimap instance by name.
*
* @param name
* @return
@ -226,7 +227,7 @@ public interface RedissonClient {
<K, V> RListMultimap<K, V> getListMultimap(String name);
/**
* Returns List based MultiMap instance by name
* Returns List based Multimap instance by name
* using provided codec for both map keys and values.
*
* @param name
@ -254,15 +255,15 @@ public interface RedissonClient {
<K, V> RMap<K, V> getMap(String name, Codec codec);
/**
* Returns Set based MultiMap instance by name.
* Returns Set based Multimap instance by name.
*
* @param name
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name);
/**
* Returns Set based MultiMap instance by name
* Returns Set based Multimap instance by name
* using provided codec for both map keys and values.
*
* @param name
@ -271,6 +272,29 @@ public interface RedissonClient {
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec);
/**
* Returns Set based Multimap instance by name.
* Supports key-entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSetMultimap(String)}.</p>
*
* @param name
* @return
*/
<K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name);
/**
* Returns Set based Multimap instance by name
* using provided codec for both map keys and values.
* Supports key-entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSetMultimap(String, Codec)}.</p>
*
* @param name
* @return
*/
<K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec);
/**
* Returns semaphore instance by name
*

@ -0,0 +1,216 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RSetMultimapCache;
import io.netty.util.concurrent.Future;
/**
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> implements RSetMultimapCache<K, V> {
private static final RedisCommand<Boolean> EVAL_EXPIRE_KEY = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY);
RedissonSetMultimapCache(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
RedissonSetMultimapCache(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
public Future<Boolean> containsKeyAsync(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
"if value ~= false then " +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; "
+ "return redis.call('scard', ARGV[3]) > 0 and 1 or 0;" +
"end;" +
"return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), keyState, setName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
String getTimeoutSetName() {
return "redisson_set_multimap_ttl{" + getName() + "}";
}
public Future<Boolean> containsValueAsync(Object value) {
try {
byte[] valueState = codec.getMapValueEncoder().encode(value);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local keys = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(keys) do " +
"if i % 2 == 0 then " +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], keys[i-1]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate > tonumber(ARGV[2]) then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"if redis.call('sismember', name, ARGV[1]) == 1 then "
+ "return 1; " +
"end;" +
"end; " +
"end;" +
"end; " +
"return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), valueState, System.currentTimeMillis());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> containsEntryAsync(Object key, Object value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then " +
"if redis.call('sismember', KEYS[1], ARGV[3]) == 1 then "
+ "return 1; " +
"end;" +
"end; " +
"return 0; ",
Arrays.<Object>asList(setName, getTimeoutSetName()), System.currentTimeMillis(), keyState, valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Set<V> get(K key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return new RedissonSetMultimapValues<V>(codec, commandExecutor, setName, getTimeoutSetName(), key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Future<Collection<V>> getAllAsync(K key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_SET,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then " +
"return redis.call('smembers', KEYS[1]); " +
"end; " +
"return {}; ",
Arrays.<Object>asList(setName, getTimeoutSetName()), System.currentTimeMillis(), keyState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Collection<V>> removeAllAsync(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_SET,
"redis.call('hdel', KEYS[1], ARGV[1]); " +
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[1]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName, getTimeoutSetName()), keyState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean expireKey(K key, long timeToLive, TimeUnit timeUnit) {
return get(expireKeyAsync(key, timeToLive, timeUnit));
}
@Override
public Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) {
long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " +
"else " +
"redis.call('zrem', KEYS[2], ARGV[2]); "
+ "end; "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), ttlTimeout, key);
}
}

@ -0,0 +1,446 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RSet;
import io.netty.util.concurrent.Future;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
* @author Nikita Koksharov
*
* @param <V> value
*/
public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), 7, ValueType.MAP_KEY, ValueType.OBJECT);
private static final RedisCommand<Integer> EVAL_SIZE = new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 6, ValueType.MAP_KEY);
private static final RedisCommand<Set<Object>> EVAL_READALL = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), 6, ValueType.MAP_KEY);
private static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE));
private static final RedisCommand<Boolean> EVAL_CONTAINS_ALL_WITH_VALUES = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.OBJECTS);
private final Object key;
private final String timeoutSetName;
public RedissonSetMultimapValues(Codec codec, CommandAsyncExecutor commandExecutor, String name, String timeoutSetName, Object key) {
super(codec, commandExecutor, name);
this.timeoutSetName = timeoutSetName;
this.key = key;
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_SIZE,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; "
+ "return redis.call('scard', KEYS[2]);",
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key);
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean contains(Object o) {
return get(containsAsync(o));
}
@Override
public Future<Boolean> containsAsync(Object o) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_VALUE,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; "
+ "return redis.call('sismember', KEYS[2], ARGV[3]);",
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
}
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
Future<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return {0, {}};"
+ "end;"
+ "return redis.call('sscan', KEYS[2], ARGV[2]);",
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), startPos, key);
return get(f);
}
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
RedissonSetMultimapValues.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
};
}
@Override
public Future<Set<V>> readAllAsync() {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_READALL,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return {};"
+ "end; "
+ "return redis.call('smembers', KEYS[2]);",
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key);
}
@Override
public Set<V> readAll() {
return get(readAllAsync());
}
@Override
public Object[] toArray() {
Set<Object> res = (Set<Object>) get(readAllAsync());
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
Set<Object> res = (Set<Object>) get(readAllAsync());
return res.toArray(a);
}
@Override
public boolean add(V e) {
return get(addAsync(e));
}
@Override
public Future<Boolean> addAsync(V e) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e);
}
@Override
public V removeRandom() {
return get(removeRandomAsync());
}
@Override
public Future<V> removeRandomAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName());
}
@Override
public Future<Boolean> removeAsync(Object o) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; "
+ "return redis.call('srem', KEYS[2], ARGV[3]) > 0 and 1 or 0;",
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
}
@Override
public boolean remove(Object value) {
return get(removeAsync((V)value));
}
@Override
public Future<Boolean> moveAsync(String destination, V member) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SMOVE, getName(), destination, member);
}
@Override
public boolean move(String destination, V member) {
return get(moveAsync(destination, member));
}
@Override
public boolean containsAll(Collection<?> c) {
return get(containsAllAsync(c));
}
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
List<Object> args = new ArrayList<Object>(c.size() + 2);
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
args.add(System.currentTimeMillis());
args.add(keyState);
args.addAll(c);
} catch (IOException e) {
throw new RuntimeException(e);
}
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; " +
"local s = redis.call('smembers', KEYS[2]);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 2, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 2 and 1 or 0; ",
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
}
@Override
public boolean addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return false;
}
return get(addAllAsync(c));
}
@Override
public Future<Boolean> addAllAsync(Collection<? extends V> c) {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_BOOL, args.toArray());
}
@Override
public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
List<Object> args = new ArrayList<Object>(c.size() + 2);
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
args.add(System.currentTimeMillis());
args.add(keyState);
args.addAll(c);
} catch (IOException e) {
throw new RuntimeException(e);
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; " +
"local changed = 0 " +
"local s = redis.call('smembers', KEYS[2]) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 2, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('SREM', KEYS[2], element) "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
}
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
List<Object> args = new ArrayList<Object>(c.size() + 2);
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
args.add(System.currentTimeMillis());
args.add(keyState);
args.addAll(c);
} catch (IOException e) {
throw new RuntimeException(e);
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; " +
"local v = 0 " +
"for i = 2, table.getn(ARGV), 1 do "
+ "if redis.call('srem', KEYS[2], ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "
+ "return v ",
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return get(removeAllAsync(c));
}
@Override
public int union(String... names) {
return get(unionAsync(names));
}
@Override
public Future<Integer> unionAsync(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNIONSTORE_INT, args.toArray());
}
@Override
public Set<V> readUnion(String... names) {
return get(readUnionAsync(names));
}
@Override
public Future<Set<V>> readUnionAsync(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNION, args.toArray());
}
@Override
public void clear() {
delete();
}
}

@ -0,0 +1,24 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.TimeUnit;
public interface RMultimapCache<K, V> extends RMultimap<K, V>, RMultimapCacheAsync<K, V> {
boolean expireKey(K key, long timeToLive, TimeUnit timeUnit);
}

@ -0,0 +1,26 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.concurrent.TimeUnit;
import io.netty.util.concurrent.Future;
public interface RMultimapCacheAsync<K, V> extends RMultimapAsync<K, V> {
Future<Boolean> expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit);
}

@ -0,0 +1,20 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public interface RSetMultimapCache<K, V> extends RSetMultimap<K, V>, RMultimapCache<K, V> {
}

@ -0,0 +1,115 @@
package org.redisson;
import static org.assertj.core.api.Assertions.*;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.core.RMultimapCache;
public class RedissonSetMultimapCacheTest extends BaseTest {
@Test
public void testContains() {
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
multimap.put("1", "1");
multimap.put("1", "2");
multimap.put("1", "3");
assertThat(multimap.containsKey("1")).isTrue();
assertThat(multimap.containsKey("2")).isFalse();
assertThat(multimap.containsValue("1")).isTrue();
assertThat(multimap.containsValue("3")).isTrue();
assertThat(multimap.containsValue("4")).isFalse();
assertThat(multimap.containsEntry("1", "1")).isTrue();
assertThat(multimap.containsEntry("1", "3")).isTrue();
assertThat(multimap.containsEntry("1", "4")).isFalse();
}
@Test
public void testContainsExpired() throws InterruptedException {
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
multimap.put("1", "1");
multimap.put("1", "2");
multimap.put("1", "3");
multimap.expireKey("1", 1, TimeUnit.SECONDS);
Thread.sleep(1000);
assertThat(multimap.containsKey("1")).isFalse();
assertThat(multimap.containsKey("2")).isFalse();
assertThat(multimap.containsValue("1")).isFalse();
assertThat(multimap.containsValue("3")).isFalse();
assertThat(multimap.containsValue("4")).isFalse();
assertThat(multimap.containsEntry("1", "1")).isFalse();
assertThat(multimap.containsEntry("1", "3")).isFalse();
assertThat(multimap.containsEntry("1", "4")).isFalse();
}
@Test
public void testGetAll() throws InterruptedException {
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
multimap.put("1", "1");
multimap.put("1", "2");
multimap.put("1", "3");
assertThat(multimap.getAll("1")).containsOnlyOnce("1", "2", "3");
}
@Test
public void testGetAllExpired() throws InterruptedException {
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
multimap.put("1", "1");
multimap.put("1", "2");
multimap.put("1", "3");
multimap.expireKey("1", 1, TimeUnit.SECONDS);
Thread.sleep(1000);
assertThat(multimap.getAll("1")).isEmpty();
}
@Test
public void testValues() throws InterruptedException {
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
multimap.put("1", "1");
multimap.put("1", "2");
multimap.put("1", "3");
multimap.put("1", "3");
assertThat(multimap.get("1").size()).isEqualTo(3);
assertThat(multimap.get("1")).containsOnlyOnce("1", "2", "3");
assertThat(multimap.get("1").remove("3")).isTrue();
assertThat(multimap.get("1").contains("3")).isFalse();
assertThat(multimap.get("1").contains("2")).isTrue();
assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isTrue();
assertThat(multimap.get("1").containsAll(Arrays.asList("1", "2"))).isTrue();
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isTrue();
assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isTrue();
}
@Test
public void testValuesExpired() throws InterruptedException {
RMultimapCache<String, String> multimap = redisson.getSetMultimapCache("test");
multimap.put("1", "1");
multimap.put("1", "2");
multimap.put("1", "3");
multimap.expireKey("1", 1, TimeUnit.SECONDS);
Thread.sleep(1000);
assertThat(multimap.get("1").size()).isZero();
assertThat(multimap.get("1")).contains();
assertThat(multimap.get("1").remove("3")).isFalse();
assertThat(multimap.get("1").contains("3")).isFalse();
assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isFalse();
assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isFalse();
assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isFalse();
}
}
Loading…
Cancel
Save