diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 925341f40..b7bb53ac9 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -39,6 +39,7 @@ import org.redisson.core.RBatch; import org.redisson.core.RBitSet; import org.redisson.core.RBlockingQueue; import org.redisson.core.RBucket; +import org.redisson.core.RCache; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; import org.redisson.core.RHyperLogLog; @@ -188,6 +189,11 @@ public class Redisson implements RedissonClient { return new RedissonMap(commandExecutor, name); } + @Override + public RCache getCache(String name) { + return new RedissonCache(commandExecutor, name); + } + @Override public RMap getMap(String name, Codec codec) { return new RedissonMap(codec, commandExecutor, name); diff --git a/src/main/java/org/redisson/RedissonCache.java b/src/main/java/org/redisson/RedissonCache.java new file mode 100644 index 000000000..0232c83ee --- /dev/null +++ b/src/main/java/org/redisson/RedissonCache.java @@ -0,0 +1,354 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.LongReplayConvertor; +import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.decoder.MapGetAllDecoder; +import org.redisson.core.RCache; + +import io.netty.util.concurrent.Future; + +/** + * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} + * and {@link java.util.Map} + * + * @author Nikita Koksharov + * + * @param key + * @param value + */ +// TODO override expire methods +public class RedissonCache extends RedissonMap implements RCache { + + private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 5, ValueType.MAP); + private static final RedisCommand EVAL_PUT = EVAL_REPLACE; + private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_GET_TTL = new RedisCommand("EVAL", 6, ValueType.MAP_KEY, ValueType.MAP_VALUE); + + private static final RedisCommand EVAL_CONTAINS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6); + private static final RedisCommand> EVAL_HKEYS = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); + private static final RedisCommand> EVAL_HVALS = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); + private static final RedisCommand> EVAL_HGETALL = new RedisCommand>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP); + private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 2); + + protected RedissonCache(CommandAsyncExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + public RedissonCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + + @Override + public Future sizeAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER, + "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " + + "if table.getn(expiredKeys) > 0 then " + + "expiredKeys = unpack(expiredKeys); " + + "redis.call('zrem', KEYS[2], expiredKeys); " + + "redis.call('hdel', KEYS[1], expiredKeys); " + + "end; " + + "return redis.call('hlen', KEYS[1]);", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); + } + + @Override + public Future containsKeyAsync(Object key) { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS, + "local v = redis.call('hexists', KEYS[1], ARGV[2]); " + + "if v == 1 then " + + "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDate ~= false and expireDate <= ARGV[1] then " + + "redis.call('zrem', KEYS[2], ARGV[2]); " + + "redis.call('hdel', KEYS[1], ARGV[2]); " + + "return false;" + + "end;" + + "return true;" + + "end;" + + "return false;", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key); + } + + @Override + public Future containsValueAsync(Object value) { + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 6), + "local s = redis.call('hgetall', KEYS[1]);" + + "for i, v in ipairs(s) do " + + "if ARGV[2] == v and i % 2 == 0 then " + + "local key = s[i-1];" + + "local expireDate = redis.call('zscore', KEYS[2], key); " + + "if expireDate ~= false and expireDate <= ARGV[1] then " + + "redis.call('zrem', KEYS[2], key); " + + "redis.call('hdel', KEYS[1], key); " + + "return false;" + + "end;" + + "return true " + + "end " + + "end;" + + "return false", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), value); + } + + @Override + public Future> getAllAsync(Set keys) { + if (keys.isEmpty()) { + return newSucceededFuture(Collections.emptyMap()); + } + + List args = new ArrayList(keys.size() + 2); + args.add(System.currentTimeMillis()); + args.addAll(keys); + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand>("EVAL", new MapGetAllDecoder(args), 6, ValueType.MAP_KEY, ValueType.MAP_VALUE), + "local maxDate = table.remove(ARGV, 1); " + // index is the first parameter + "for i, key in ipairs(ARGV) do " + + "local expireDate = redis.call('zscore', KEYS[2], key); " + + "if expireDate ~= false and expireDate <= maxDate then " + + "print ('delete ' .. key)" + + "redis.call('zrem', KEYS[2], key); " + + "redis.call('hdel', KEYS[1], key); " + + "end;" + + "end;" + + "return redis.call('hmget', KEYS[1], unpack(ARGV));", + Arrays.asList(getName(), getTimeoutSetName()), args.toArray()); + } + + @Override + public Future> keySetAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HKEYS, + "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " + + "if table.getn(expiredKeys) > 0 then " + + "expiredKeys = unpack(expiredKeys); " + + "redis.call('zrem', KEYS[2], expiredKeys); " + + "redis.call('hdel', KEYS[1], expiredKeys); " + + "end; " + + "return redis.call('hkeys', KEYS[1]);", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); + } + + @Override + public Future> valuesAsync() { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HVALS, + "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " + + "if table.getn(expiredKeys) > 0 then " + + "expiredKeys = unpack(expiredKeys); " + + "redis.call('zrem', KEYS[2], expiredKeys); " + + "redis.call('hdel', KEYS[1], expiredKeys); " + + "end; " + + "return redis.call('hvals', KEYS[1]);", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); + } + + @Override + public Set> entrySet() { + Future> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_HGETALL, + "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " + + "if table.getn(expiredKeys) > 0 then " + + "expiredKeys = unpack(expiredKeys); " + + "redis.call('zrem', KEYS[2], expiredKeys); " + + "redis.call('hdel', KEYS[1], expiredKeys); " + + "end; " + + "return redis.call('hgetall', KEYS[1]);", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); + + Map map = get(f); + return map.entrySet(); + } + + public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) { + return get(putIfAbsent(key, value, ttl, unit)); + } + + public Future putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit) { + long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl); + + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT, + "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "return nil " + + "else " + + "return redis.call('hget', KEYS[1], ARGV[1]) " + + "end", + Arrays.asList(getName(), getTimeoutSetName()), timeoutDate, key, value); + } + + @Override + public Future removeAsync(Object key, Object value) { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE, + "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + + "redis.call('zrem', KEYS[2], ARGV[1]); " + + "return redis.call('hdel', KEYS[1], ARGV[1]); " + + "else " + + "return 0 " + + "end", + Arrays.asList(getName(), getTimeoutSetName()), key, value); + } + + public Future getAsync(K key) { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL, + "local v = redis.call('hget', KEYS[1], ARGV[2]); " + + "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDate ~= false and expireDate <= ARGV[1] then " + + "redis.call('zrem', KEYS[2], ARGV[2]); " + + "redis.call('hdel', KEYS[1], ARGV[2]); " + + "return nil;" + + "end;" + + "return v;", + Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key); + } + + public V put(K key, V value, long ttl, TimeUnit unit) { + return get(putAsync(key, value, ttl, unit)); + } + + public Future putAsync(K key, V value, long ttl, TimeUnit unit) { + long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl); + + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL, + "local v = redis.call('hget', KEYS[1], ARGV[2]); " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + + "return v", + Arrays.asList(getName(), getTimeoutSetName()), timeoutDate, key, value); + } + + String getTimeoutSetName() { + return "redisson__timeout__set__{" + getName() + "}"; + } + + + @Override + public Future removeAsync(K key) { + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE, + "local v = redis.call('hget', KEYS[1], ARGV[1]); " + + "redis.call('zrem', KEYS[2], ARGV[1]); " + + "redis.call('hdel', KEYS[1], ARGV[1]); " + + "return v", + Arrays.asList(getName(), getTimeoutSetName()), key); + } + + @Override + public Future fastRemoveAsync(K ... keys) { + if (keys == null || keys.length == 0) { + return newSucceededFuture(0L); + } + + List args = new ArrayList(keys.length); + args.addAll(Arrays.asList(keys)); + return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE, + "redis.call('zrem', KEYS[2], unpack(ARGV)); " + + "return redis.call('hdel', KEYS[1], unpack(ARGV)); ", + Arrays.asList(getName(), getTimeoutSetName()), args.toArray()); + } + + MapScanResult scanIterator(InetSocketAddress client, long startPos) { + Future> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.HSCAN, getName(), startPos); + return get(f); + } + +// @Override +// public Iterator> entryIterator() { +// return new RedissonMapIterator>(this); +// } +// +// @Override +// public Iterator valueIterator() { +// return new RedissonMapIterator(this) { +// @Override +// V getValue(java.util.Map.Entry entry) { +// return entry.getValue(); +// } +// }; +// } +// +// @Override +// public Iterator keyIterator() { +// return new RedissonMapIterator(this) { +// @Override +// K getValue(java.util.Map.Entry entry) { +// return entry.getKey(); +// } +// }; +// } + + + @Override + public boolean equals(Object o) { + if (o == this) + return true; + + if (!(o instanceof Map)) + return false; + Map m = (Map) o; + if (m.size() != size()) + return false; + + try { + Iterator> i = entrySet().iterator(); + while (i.hasNext()) { + Entry e = i.next(); + K key = e.getKey(); + V value = e.getValue(); + if (value == null) { + if (!(m.get(key)==null && m.containsKey(key))) + return false; + } else { + if (!value.equals(m.get(key))) + return false; + } + } + } catch (ClassCastException unused) { + return false; + } catch (NullPointerException unused) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int h = 0; + Iterator> i = entrySet().iterator(); + while (i.hasNext()) + h += i.next().hashCode(); + return h; + } + +} diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index ea77bce36..632c2aa9c 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -26,6 +26,7 @@ import org.redisson.core.RBatch; import org.redisson.core.RBitSet; import org.redisson.core.RBlockingQueue; import org.redisson.core.RBucket; +import org.redisson.core.RCache; import org.redisson.core.RCountDownLatch; import org.redisson.core.RDeque; import org.redisson.core.RHyperLogLog; @@ -51,6 +52,8 @@ import org.redisson.core.RTopic; */ public interface RedissonClient { + RCache getCache(String name); + /** * Returns object holder instance by name. * diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 0ea20b256..6fac24fc0 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -87,41 +87,41 @@ public class RedisCommand { this(name, subName, null, null, objectParamIndex); } - public RedisCommand(String name, int encodeParamIndex) { - this(name, null, null, null, encodeParamIndex); + public RedisCommand(String name, int inParamIndex) { + this(name, null, null, null, inParamIndex); } - public RedisCommand(String name, int encodeParamIndex, ValueType inParamType, ValueType outParamType) { - this(name, null, null, null, encodeParamIndex); + public RedisCommand(String name, int inParamIndex, ValueType inParamType, ValueType outParamType) { + this(name, null, null, null, inParamIndex); this.inParamType = Arrays.asList(inParamType); this.outParamType = outParamType; } - public RedisCommand(String name, int encodeParamIndex, List inParamType, ValueType outParamType) { - this(name, null, null, null, encodeParamIndex); + public RedisCommand(String name, int inParamIndex, List inParamType, ValueType outParamType) { + this(name, null, null, null, inParamIndex); this.inParamType = inParamType; this.outParamType = outParamType; } - public RedisCommand(String name, Decoder reponseDecoder, int encodeParamIndex, List inParamType, ValueType outParamType) { - this(name, null, null, reponseDecoder, encodeParamIndex); + public RedisCommand(String name, Decoder reponseDecoder, int inParamIndex, List inParamType, ValueType outParamType) { + this(name, null, null, reponseDecoder, inParamIndex); this.inParamType = inParamType; this.outParamType = outParamType; } - public RedisCommand(String name, Decoder reponseDecoder, int encodeParamIndex, List inParamType) { - this(name, null, null, reponseDecoder, encodeParamIndex); + public RedisCommand(String name, Decoder reponseDecoder, int inParamIndex, List inParamType) { + this(name, null, null, reponseDecoder, inParamIndex); this.inParamType = inParamType; } - public RedisCommand(String name, Convertor convertor, int encodeParamIndex, ValueType inParamType) { - this(name, null, null, null, encodeParamIndex); + public RedisCommand(String name, Convertor convertor, int inParamIndex, ValueType inParamType) { + this(name, null, null, null, inParamIndex); this.convertor = convertor; this.inParamType = Arrays.asList(inParamType); } - public RedisCommand(String name, Convertor convertor, int encodeParamIndex, List inParamTypes) { - this(name, null, null, null, encodeParamIndex); + public RedisCommand(String name, Convertor convertor, int inParamIndex, List inParamTypes) { + this(name, null, null, null, inParamIndex); this.convertor = convertor; this.inParamType = inParamTypes; } @@ -130,8 +130,8 @@ public class RedisCommand { this(name, convertor, -1); } - public RedisCommand(String name, Convertor convertor, int encodeParamIndex) { - this(name, null, null, null, encodeParamIndex); + public RedisCommand(String name, Convertor convertor, int inParamIndex) { + this(name, null, null, null, inParamIndex); this.convertor = convertor; } @@ -183,13 +183,13 @@ public class RedisCommand { this(name, subName, replayMultiDecoder, null, objectParamIndex); } - public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, Decoder reponseDecoder, int objectParamIndex) { + public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, Decoder reponseDecoder, int inParamIndex) { super(); this.name = name; this.subName = subName; this.replayMultiDecoder = replayMultiDecoder; this.replayDecoder = reponseDecoder; - this.inParamIndex = objectParamIndex; + this.inParamIndex = inParamIndex; } public String getSubName() { diff --git a/src/main/java/org/redisson/core/RCache.java b/src/main/java/org/redisson/core/RCache.java new file mode 100644 index 000000000..300c08a58 --- /dev/null +++ b/src/main/java/org/redisson/core/RCache.java @@ -0,0 +1,35 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +/** + * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} + * and {@link java.util.Map} + * + * @author Nikita Koksharov + * + * @param key + * @param value + */ +public interface RCache extends RMap, RCacheAsync { + + V putIfAbsent(K key, V value, long ttl, TimeUnit unit); + + V put(K key, V value, long ttl, TimeUnit unit); + +} diff --git a/src/main/java/org/redisson/core/RCacheAsync.java b/src/main/java/org/redisson/core/RCacheAsync.java new file mode 100644 index 000000000..2ef8d2dfe --- /dev/null +++ b/src/main/java/org/redisson/core/RCacheAsync.java @@ -0,0 +1,36 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.concurrent.TimeUnit; + +import io.netty.util.concurrent.Future; + +/** + * Async map functions + * + * @author Nikita Koksharov + * + * @param key + * @param value + */ +public interface RCacheAsync extends RMapAsync { + + Future putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit); + + Future putAsync(K key, V value, long ttl, TimeUnit unit); + +}