diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index f02e14c0d..060192064 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -57,6 +57,7 @@ import org.redisson.core.RListMultimap; import org.redisson.core.RLock; import org.redisson.core.RMap; import org.redisson.core.RMapCache; +import org.redisson.core.RMultimapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; @@ -66,6 +67,7 @@ import org.redisson.core.RSemaphore; import org.redisson.core.RSet; import org.redisson.core.RSetCache; import org.redisson.core.RSetMultimap; +import org.redisson.core.RSetMultimapCache; import org.redisson.core.RSortedSet; import org.redisson.core.RTopic; @@ -265,6 +267,14 @@ public class Redisson implements RedissonClient { public RSetMultimap getSetMultimap(String name) { return new RedissonSetMultimap(commandExecutor, name); } + + public RSetMultimapCache getSetMultimapCache(String name) { + return new RedissonSetMultimapCache(commandExecutor, name); + } + + public RSetMultimapCache getSetMultimapCache(String name, Codec codec) { + return new RedissonSetMultimapCache(codec, commandExecutor, name); + } @Override public RSetMultimap getSetMultimap(String name, Codec codec) { diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 66fe5b49f..0c1502d4a 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -31,7 +31,6 @@ import org.redisson.core.RBlockingDeque; import org.redisson.core.RBlockingQueue; import org.redisson.core.RBloomFilter; import org.redisson.core.RBucket; -import org.redisson.core.RMapCache; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; import org.redisson.core.RHyperLogLog; @@ -41,6 +40,7 @@ import org.redisson.core.RList; import org.redisson.core.RListMultimap; import org.redisson.core.RLock; import org.redisson.core.RMap; +import org.redisson.core.RMapCache; import org.redisson.core.RPatternTopic; import org.redisson.core.RQueue; import org.redisson.core.RReadWriteLock; @@ -50,6 +50,7 @@ import org.redisson.core.RSemaphore; import org.redisson.core.RSet; import org.redisson.core.RSetCache; import org.redisson.core.RSetMultimap; +import org.redisson.core.RSetMultimapCache; import org.redisson.core.RSortedSet; import org.redisson.core.RTopic; @@ -218,7 +219,7 @@ public interface RedissonClient { RList getList(String name, Codec codec); /** - * Returns List based MultiMap instance by name. + * Returns List based Multimap instance by name. * * @param name * @return @@ -226,7 +227,7 @@ public interface RedissonClient { RListMultimap getListMultimap(String name); /** - * Returns List based MultiMap instance by name + * Returns List based Multimap instance by name * using provided codec for both map keys and values. * * @param name @@ -254,15 +255,15 @@ public interface RedissonClient { RMap getMap(String name, Codec codec); /** - * Returns Set based MultiMap instance by name. + * Returns Set based Multimap instance by name. * * @param name * @return */ RSetMultimap getSetMultimap(String name); - + /** - * Returns Set based MultiMap instance by name + * Returns Set based Multimap instance by name * using provided codec for both map keys and values. * * @param name @@ -271,6 +272,29 @@ public interface RedissonClient { */ RSetMultimap getSetMultimap(String name, Codec codec); + /** + * Returns Set based Multimap instance by name. + * Supports key-entry eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSetMultimap(String)}.

+ * + * @param name + * @return + */ + RSetMultimapCache getSetMultimapCache(String name); + + /** + * Returns Set based Multimap instance by name + * using provided codec for both map keys and values. + * Supports key-entry eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSetMultimap(String, Codec)}.

+ * + * @param name + * @return + */ + RSetMultimapCache getSetMultimapCache(String name, Codec codec); + /** * Returns semaphore instance by name * diff --git a/src/main/java/org/redisson/RedissonSetMultimapCache.java b/src/main/java/org/redisson/RedissonSetMultimapCache.java new file mode 100644 index 000000000..927411875 --- /dev/null +++ b/src/main/java/org/redisson/RedissonSetMultimapCache.java @@ -0,0 +1,216 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.core.RSetMultimapCache; + +import io.netty.util.concurrent.Future; + +/** + * @author Nikita Koksharov + * + * @param key + * @param value + */ +public class RedissonSetMultimapCache extends RedissonSetMultimap implements RSetMultimapCache { + + private static final RedisCommand EVAL_EXPIRE_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, ValueType.MAP_KEY); + + RedissonSetMultimapCache(CommandAsyncExecutor connectionManager, String name) { + super(connectionManager, name); + } + + RedissonSetMultimapCache(Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(codec, connectionManager, name); + } + + public Future containsKeyAsync(Object key) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + String keyHash = hash(keyState); + + String setName = getValuesName(keyHash); + + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('hget', KEYS[1], ARGV[2]); " + + "if value ~= false then " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + "return redis.call('scard', ARGV[3]) > 0 and 1 or 0;" + + "end;" + + "return 0; ", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), keyState, setName); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + String getTimeoutSetName() { + return "redisson_set_multimap_ttl{" + getName() + "}"; + } + + + public Future containsValueAsync(Object value) { + try { + byte[] valueState = codec.getMapValueEncoder().encode(value); + + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "local keys = redis.call('hgetall', KEYS[1]); " + + "for i, v in ipairs(keys) do " + + "if i % 2 == 0 then " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], keys[i-1]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate > tonumber(ARGV[2]) then " + + "local name = '{' .. KEYS[1] .. '}:' .. v; " + + "if redis.call('sismember', name, ARGV[1]) == 1 then " + + "return 1; " + + "end;" + + "end; " + + "end;" + + "end; " + + "return 0; ", + Arrays.asList(getName(), getTimeoutSetName()), valueState, System.currentTimeMillis()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Future containsEntryAsync(Object key, Object value) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + String keyHash = hash(keyState); + byte[] valueState = codec.getMapValueEncoder().encode(value); + + String setName = getValuesName(keyHash); + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate > tonumber(ARGV[1]) then " + + "if redis.call('sismember', KEYS[1], ARGV[3]) == 1 then " + + "return 1; " + + "end;" + + "end; " + + "return 0; ", + Arrays.asList(setName, getTimeoutSetName()), System.currentTimeMillis(), keyState, valueState); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Set get(K key) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + String keyHash = hash(keyState); + String setName = getValuesName(keyHash); + + return new RedissonSetMultimapValues(codec, commandExecutor, setName, getTimeoutSetName(), key); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Future> getAllAsync(K key) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + String keyHash = hash(keyState); + String setName = getValuesName(keyHash); + + return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_SET, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate > tonumber(ARGV[1]) then " + + "return redis.call('smembers', KEYS[1]); " + + "end; " + + "return {}; ", + Arrays.asList(setName, getTimeoutSetName()), System.currentTimeMillis(), keyState); + } catch (IOException e) { + throw new RuntimeException(e); + } + + } + + public Future> removeAllAsync(Object key) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + String keyHash = hash(keyState); + + String setName = getValuesName(keyHash); + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_SET, + "redis.call('hdel', KEYS[1], ARGV[1]); " + + "local members = redis.call('smembers', KEYS[2]); " + + "redis.call('del', KEYS[2]); " + + "redis.call('zrem', KEYS[3], ARGV[1]); " + + "return members; ", + Arrays.asList(getName(), setName, getTimeoutSetName()), keyState); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean expireKey(K key, long timeToLive, TimeUnit timeUnit) { + return get(expireKeyAsync(key, timeToLive, timeUnit)); + } + + @Override + public Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit) { + long ttlTimeout = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); + + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_EXPIRE_KEY, + "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + + "if tonumber(ARGV[1]) > 0 then " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + + "else " + + "redis.call('zrem', KEYS[2], ARGV[2]); " + + "end; " + + "return 1; " + + "else " + + "return 0; " + + "end", + Arrays.asList(getName(), getTimeoutSetName()), ttlTimeout, key); + } + +} diff --git a/src/main/java/org/redisson/RedissonSetMultimapValues.java b/src/main/java/org/redisson/RedissonSetMultimapValues.java new file mode 100644 index 000000000..be9fab67c --- /dev/null +++ b/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -0,0 +1,446 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.IntegerReplayConvertor; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; +import org.redisson.client.protocol.decoder.NestedMultiDecoder; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.core.RSet; + +import io.netty.util.concurrent.Future; + +/** + * Distributed and concurrent implementation of {@link java.util.Set} + * + * @author Nikita Koksharov + * + * @param value + */ +public class RedissonSetMultimapValues extends RedissonExpirable implements RSet { + + private static final RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), 7, ValueType.MAP_KEY, ValueType.OBJECT); + private static final RedisCommand EVAL_SIZE = new RedisCommand("EVAL", new IntegerReplayConvertor(), 6, ValueType.MAP_KEY); + private static final RedisCommand> EVAL_READALL = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), 6, ValueType.MAP_KEY); + private static final RedisCommand EVAL_CONTAINS_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE)); + private static final RedisCommand EVAL_CONTAINS_ALL_WITH_VALUES = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, ValueType.OBJECTS); + + private final Object key; + private final String timeoutSetName; + + public RedissonSetMultimapValues(Codec codec, CommandAsyncExecutor commandExecutor, String name, String timeoutSetName, Object key) { + super(codec, commandExecutor, name); + this.timeoutSetName = timeoutSetName; + this.key = key; + } + + @Override + public int size() { + return get(sizeAsync()); + } + + @Override + public Future sizeAsync() { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_SIZE, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + "return redis.call('scard', KEYS[2]);", + Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), key); + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(Object o) { + return get(containsAsync(o)); + } + + @Override + public Future containsAsync(Object o) { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_VALUE, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + "return redis.call('sismember', KEYS[2], ARGV[3]);", + Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o); + } + + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { + Future> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return {0, {}};" + + "end;" + + + "return redis.call('sscan', KEYS[2], ARGV[2]);", + Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), startPos, key); + return get(f); + } + + @Override + public Iterator iterator() { + return new Iterator() { + + private List firstValues; + private Iterator iter; + private InetSocketAddress client; + private long nextIterPos; + + private boolean currentElementRemoved; + private boolean removeExecuted; + private V value; + + @Override + public boolean hasNext() { + if (iter == null || !iter.hasNext()) { + if (nextIterPos == -1) { + return false; + } + long prevIterPos = nextIterPos; + ListScanResult res = scanIterator(client, nextIterPos); + client = res.getRedisClient(); + if (nextIterPos == 0 && firstValues == null) { + firstValues = res.getValues(); + } else if (res.getValues().equals(firstValues)) { + return false; + } + iter = res.getValues().iterator(); + nextIterPos = res.getPos(); + if (prevIterPos == nextIterPos && !removeExecuted) { + nextIterPos = -1; + } + } + return iter.hasNext(); + } + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException("No such element at index"); + } + + value = iter.next(); + currentElementRemoved = false; + return value; + } + + @Override + public void remove() { + if (currentElementRemoved) { + throw new IllegalStateException("Element been already deleted"); + } + if (iter == null) { + throw new IllegalStateException(); + } + + iter.remove(); + RedissonSetMultimapValues.this.remove(value); + currentElementRemoved = true; + removeExecuted = true; + } + + }; + } + + @Override + public Future> readAllAsync() { + return commandExecutor.evalReadAsync(getName(), codec, EVAL_READALL, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return {};" + + "end; " + + "return redis.call('smembers', KEYS[2]);", + Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), key); + } + + @Override + public Set readAll() { + return get(readAllAsync()); + } + + @Override + public Object[] toArray() { + Set res = (Set) get(readAllAsync()); + return res.toArray(); + } + + @Override + public T[] toArray(T[] a) { + Set res = (Set) get(readAllAsync()); + return res.toArray(a); + } + + @Override + public boolean add(V e) { + return get(addAsync(e)); + } + + @Override + public Future addAsync(V e) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e); + } + + @Override + public V removeRandom() { + return get(removeRandomAsync()); + } + + @Override + public Future removeRandomAsync() { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName()); + } + + @Override + public Future removeAsync(Object o) { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + "return redis.call('srem', KEYS[2], ARGV[3]) > 0 and 1 or 0;", + Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o); + } + + @Override + public boolean remove(Object value) { + return get(removeAsync((V)value)); + } + + @Override + public Future moveAsync(String destination, V member) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SMOVE, getName(), destination, member); + } + + @Override + public boolean move(String destination, V member) { + return get(moveAsync(destination, member)); + } + + @Override + public boolean containsAll(Collection c) { + return get(containsAllAsync(c)); + } + + @Override + public Future containsAllAsync(Collection c) { + List args = new ArrayList(c.size() + 2); + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + args.add(System.currentTimeMillis()); + args.add(keyState); + args.addAll(c); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + "local s = redis.call('smembers', KEYS[2]);" + + "for i = 0, table.getn(s), 1 do " + + "for j = 2, table.getn(ARGV), 1 do " + + "if ARGV[j] == s[i] " + + "then table.remove(ARGV, j) end " + + "end; " + + "end;" + + "return table.getn(ARGV) == 2 and 1 or 0; ", + Arrays.asList(timeoutSetName, getName()), args.toArray()); + } + + @Override + public boolean addAll(Collection c) { + if (c.isEmpty()) { + return false; + } + + return get(addAllAsync(c)); + } + + @Override + public Future addAllAsync(Collection c) { + List args = new ArrayList(c.size() + 1); + args.add(getName()); + args.addAll(c); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SADD_BOOL, args.toArray()); + } + + @Override + public boolean retainAll(Collection c) { + return get(retainAllAsync(c)); + } + + @Override + public Future retainAllAsync(Collection c) { + List args = new ArrayList(c.size() + 2); + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + args.add(System.currentTimeMillis()); + args.add(keyState); + args.addAll(c); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + + "local changed = 0 " + + "local s = redis.call('smembers', KEYS[2]) " + + "local i = 0 " + + "while i <= table.getn(s) do " + + "local element = s[i] " + + "local isInAgrs = false " + + "for j = 2, table.getn(ARGV), 1 do " + + "if ARGV[j] == element then " + + "isInAgrs = true " + + "break " + + "end " + + "end " + + "if isInAgrs == false then " + + "redis.call('SREM', KEYS[2], element) " + + "changed = 1 " + + "end " + + "i = i + 1 " + + "end " + + "return changed ", + Arrays.asList(timeoutSetName, getName()), args.toArray()); + } + + @Override + public Future removeAllAsync(Collection c) { + List args = new ArrayList(c.size() + 2); + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + args.add(System.currentTimeMillis()); + args.add(keyState); + args.addAll(c); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_ALL_WITH_VALUES, + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "return 0;" + + "end; " + + + "local v = 0 " + + "for i = 2, table.getn(ARGV), 1 do " + + "if redis.call('srem', KEYS[2], ARGV[i]) == 1 " + + "then v = 1 end " + +"end " + + "return v ", + Arrays.asList(timeoutSetName, getName()), args.toArray()); + } + + @Override + public boolean removeAll(Collection c) { + return get(removeAllAsync(c)); + } + + @Override + public int union(String... names) { + return get(unionAsync(names)); + } + + @Override + public Future unionAsync(String... names) { + List args = new ArrayList(names.length + 1); + args.add(getName()); + args.addAll(Arrays.asList(names)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNIONSTORE_INT, args.toArray()); + } + + @Override + public Set readUnion(String... names) { + return get(readUnionAsync(names)); + } + + @Override + public Future> readUnionAsync(String... names) { + List args = new ArrayList(names.length + 1); + args.add(getName()); + args.addAll(Arrays.asList(names)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.SUNION, args.toArray()); + } + + @Override + public void clear() { + delete(); + } + +} diff --git a/src/main/java/org/redisson/core/RMultimapCache.java b/src/main/java/org/redisson/core/RMultimapCache.java new file mode 100644 index 000000000..e1438d775 --- /dev/null +++ b/src/main/java/org/redisson/core/RMultimapCache.java @@ -0,0 +1,24 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +public interface RMultimapCache extends RMultimap, RMultimapCacheAsync { + + boolean expireKey(K key, long timeToLive, TimeUnit timeUnit); + +} diff --git a/src/main/java/org/redisson/core/RMultimapCacheAsync.java b/src/main/java/org/redisson/core/RMultimapCacheAsync.java new file mode 100644 index 000000000..f95054215 --- /dev/null +++ b/src/main/java/org/redisson/core/RMultimapCacheAsync.java @@ -0,0 +1,26 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +import io.netty.util.concurrent.Future; + +public interface RMultimapCacheAsync extends RMultimapAsync { + + Future expireKeyAsync(K key, long timeToLive, TimeUnit timeUnit); + +} diff --git a/src/main/java/org/redisson/core/RSetMultimapCache.java b/src/main/java/org/redisson/core/RSetMultimapCache.java new file mode 100644 index 000000000..1029b2c3f --- /dev/null +++ b/src/main/java/org/redisson/core/RSetMultimapCache.java @@ -0,0 +1,20 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +public interface RSetMultimapCache extends RSetMultimap, RMultimapCache { + +} diff --git a/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java b/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java new file mode 100644 index 000000000..66415c84a --- /dev/null +++ b/src/test/java/org/redisson/RedissonSetMultimapCacheTest.java @@ -0,0 +1,115 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.*; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.redisson.core.RMultimapCache; + +public class RedissonSetMultimapCacheTest extends BaseTest { + + @Test + public void testContains() { + RMultimapCache multimap = redisson.getSetMultimapCache("test"); + multimap.put("1", "1"); + multimap.put("1", "2"); + multimap.put("1", "3"); + + assertThat(multimap.containsKey("1")).isTrue(); + assertThat(multimap.containsKey("2")).isFalse(); + + assertThat(multimap.containsValue("1")).isTrue(); + assertThat(multimap.containsValue("3")).isTrue(); + assertThat(multimap.containsValue("4")).isFalse(); + + assertThat(multimap.containsEntry("1", "1")).isTrue(); + assertThat(multimap.containsEntry("1", "3")).isTrue(); + assertThat(multimap.containsEntry("1", "4")).isFalse(); + } + + @Test + public void testContainsExpired() throws InterruptedException { + RMultimapCache multimap = redisson.getSetMultimapCache("test"); + multimap.put("1", "1"); + multimap.put("1", "2"); + multimap.put("1", "3"); + multimap.expireKey("1", 1, TimeUnit.SECONDS); + + Thread.sleep(1000); + + assertThat(multimap.containsKey("1")).isFalse(); + assertThat(multimap.containsKey("2")).isFalse(); + + assertThat(multimap.containsValue("1")).isFalse(); + assertThat(multimap.containsValue("3")).isFalse(); + assertThat(multimap.containsValue("4")).isFalse(); + + assertThat(multimap.containsEntry("1", "1")).isFalse(); + assertThat(multimap.containsEntry("1", "3")).isFalse(); + assertThat(multimap.containsEntry("1", "4")).isFalse(); + } + + @Test + public void testGetAll() throws InterruptedException { + RMultimapCache multimap = redisson.getSetMultimapCache("test"); + multimap.put("1", "1"); + multimap.put("1", "2"); + multimap.put("1", "3"); + + assertThat(multimap.getAll("1")).containsOnlyOnce("1", "2", "3"); + } + + @Test + public void testGetAllExpired() throws InterruptedException { + RMultimapCache multimap = redisson.getSetMultimapCache("test"); + multimap.put("1", "1"); + multimap.put("1", "2"); + multimap.put("1", "3"); + multimap.expireKey("1", 1, TimeUnit.SECONDS); + + Thread.sleep(1000); + + assertThat(multimap.getAll("1")).isEmpty(); + } + + @Test + public void testValues() throws InterruptedException { + RMultimapCache multimap = redisson.getSetMultimapCache("test"); + multimap.put("1", "1"); + multimap.put("1", "2"); + multimap.put("1", "3"); + multimap.put("1", "3"); + + assertThat(multimap.get("1").size()).isEqualTo(3); + assertThat(multimap.get("1")).containsOnlyOnce("1", "2", "3"); + assertThat(multimap.get("1").remove("3")).isTrue(); + assertThat(multimap.get("1").contains("3")).isFalse(); + assertThat(multimap.get("1").contains("2")).isTrue(); + assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isTrue(); + assertThat(multimap.get("1").containsAll(Arrays.asList("1", "2"))).isTrue(); + assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isTrue(); + assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isTrue(); + } + + @Test + public void testValuesExpired() throws InterruptedException { + RMultimapCache multimap = redisson.getSetMultimapCache("test"); + multimap.put("1", "1"); + multimap.put("1", "2"); + multimap.put("1", "3"); + multimap.expireKey("1", 1, TimeUnit.SECONDS); + + Thread.sleep(1000); + + assertThat(multimap.get("1").size()).isZero(); + assertThat(multimap.get("1")).contains(); + assertThat(multimap.get("1").remove("3")).isFalse(); + assertThat(multimap.get("1").contains("3")).isFalse(); + assertThat(multimap.get("1").retainAll(Arrays.asList("1"))).isFalse(); + assertThat(multimap.get("1").containsAll(Arrays.asList("1"))).isFalse(); + assertThat(multimap.get("1").removeAll(Arrays.asList("1"))).isFalse(); + } + +}