diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index b7bb53ac9..5706ecbf6 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -194,6 +194,11 @@ public class Redisson implements RedissonClient { return new RedissonCache(commandExecutor, name); } + @Override + public RCache getCache(String name, Codec codec) { + return new RedissonCache(codec, commandExecutor, name); + } + @Override public RMap getMap(String name, Codec codec) { return new RedissonMap(codec, commandExecutor, name); diff --git a/src/main/java/org/redisson/RedissonCache.java b/src/main/java/org/redisson/RedissonCache.java index aaf4b2682..5106d0a69 100644 --- a/src/main/java/org/redisson/RedissonCache.java +++ b/src/main/java/org/redisson/RedissonCache.java @@ -18,7 +18,6 @@ package org.redisson; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -27,20 +26,25 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.core.RCache; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; /** * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} @@ -59,50 +63,60 @@ public class RedissonCache extends RedissonMap implements RCache EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 5, ValueType.MAP); private static final RedisCommand EVAL_PUT = EVAL_REPLACE; private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_GET_TTL = new RedisCommand("EVAL", 6, ValueType.MAP_KEY, ValueType.MAP_VALUE); + private static final RedisCommand> EVAL_GET_TTL = new RedisCommand>("EVAL", new TTLMapValueReplayDecoder(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE); + private static final RedisCommand> EVAL_CONTAINS = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), 5); - private static final RedisCommand EVAL_CONTAINS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 6); - private static final RedisCommand> EVAL_HKEYS = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); - private static final RedisCommand> EVAL_HVALS = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); private static final RedisCommand> EVAL_HGETALL = new RedisCommand>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP); - private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 2); + private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 2, ValueType.MAP_KEY); + + private static final RedisCommand EVAL_REMOVE_EXPIRED = new RedisCommand("EVAL", 5); protected RedissonCache(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); } - public RedissonCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + public RedissonCache(Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + +// commandExecutor.getConnectionManager().getGroup().scheduleWithFixedDelay(new Runnable() { +// @Override +// public void run() { +// commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, +// "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 1000); " +// + "if table.getn(expiredKeys) > 0 then " +// + "expiredKeys = unpack(expiredKeys); " +// + "redis.call('zrem', KEYS[2], expiredKeys); " +// + "redis.call('hdel', KEYS[1], expiredKeys); " +// + "end; ", +// Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); +// } +// }, initialDelay, delay, unit) } @Override public Future sizeAsync() { - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER, - "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " - + "if table.getn(expiredKeys) > 0 then " - + "expiredKeys = unpack(expiredKeys); " - + "redis.call('zrem', KEYS[2], expiredKeys); " - + "redis.call('hdel', KEYS[1], expiredKeys); " - + "end; " - + "return redis.call('hlen', KEYS[1]);", - Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); + return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName()); } @Override public Future containsKeyAsync(Object key) { - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS, - "local v = redis.call('hexists', KEYS[1], ARGV[2]); " - + "if v == 1 then " - + "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); " - + "if expireDate ~= false and expireDate <= ARGV[1] then " - + "redis.call('zrem', KEYS[2], ARGV[2]); " - + "redis.call('hdel', KEYS[1], ARGV[2]); " - + "return false;" - + "end;" - + "return true;" - + "end;" - + "return false;", - Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key); + Promise result = newPromise(); + + Future> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS, + "local value = redis.call('hexists', KEYS[1], ARGV[1]); " + + "local expireDate = 92233720368547758; " + + "if value == 1 then " + + "expireDate = redis.call('zscore', KEYS[2], ARGV[1]); " + + "if expireDate ~= false then " + + "expireDate = tonumber(expireDate) " + + "end; " + + "end;" + + "return {expireDate, value}; ", + Arrays.asList(getName(), getTimeoutSetName()), key); + + addExpireListener(result, future, new BooleanReplayConvertor(), false); + + return result; } @Override @@ -153,50 +167,8 @@ public class RedissonCache extends RedissonMap implements RCacheasList(getName(), getTimeoutSetName()), args.toArray()); } - @Override - public Future> keySetAsync() { - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HKEYS, - "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " - + "if table.getn(expiredKeys) > 0 then " - + "expiredKeys = unpack(expiredKeys); " - + "redis.call('zrem', KEYS[2], expiredKeys); " - + "redis.call('hdel', KEYS[1], expiredKeys); " - + "end; " - + "return redis.call('hkeys', KEYS[1]);", - Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); - } - - @Override - public Future> valuesAsync() { - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HVALS, - "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " - + "if table.getn(expiredKeys) > 0 then " - + "expiredKeys = unpack(expiredKeys); " - + "redis.call('zrem', KEYS[2], expiredKeys); " - + "redis.call('hdel', KEYS[1], expiredKeys); " - + "end; " - + "return redis.call('hvals', KEYS[1]);", - Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); - } - - @Override - public Set> entrySet() { - Future> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_HGETALL, - "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " - + "if table.getn(expiredKeys) > 0 then " - + "expiredKeys = unpack(expiredKeys); " - + "redis.call('zrem', KEYS[2], expiredKeys); " - + "redis.call('hdel', KEYS[1], expiredKeys); " - + "end; " - + "return redis.call('hgetall', KEYS[1]);", - Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis()); - - Map map = get(f); - return map.entrySet(); - } - public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) { - return get(putIfAbsent(key, value, ttl, unit)); + return get(putIfAbsentAsync(key, value, ttl, unit)); } public Future putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit) { @@ -229,16 +201,58 @@ public class RedissonCache extends RedissonMap implements RCache getAsync(K key) { - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL, - "local v = redis.call('hget', KEYS[1], ARGV[2]); " - + "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); " - + "if expireDate ~= false and expireDate <= ARGV[1] then " - + "redis.call('zrem', KEYS[2], ARGV[2]); " - + "redis.call('hdel', KEYS[1], ARGV[2]); " - + "return nil;" - + "end;" - + "return v;", - Arrays.asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key); + Promise result = newPromise(); + + Future> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL, + "local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "local expireDate = redis.call('zscore', KEYS[2], ARGV[1]); " + + "if expireDate == false then " + + "expireDate = 92233720368547758; " + + "end; " + + "return {expireDate, value}; ", + Arrays.asList(getName(), getTimeoutSetName()), key); + + addExpireListener(result, future, null, null); + + return result; + } + + private void addExpireListener(final Promise result, Future> future, final Convertor convertor, final T nullValue) { + future.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + result.setFailure(future.cause()); + return; + } + + List res = future.getNow(); + Long expireDate = (Long) res.get(0); + long currentDate = System.currentTimeMillis(); + if (expireDate <= currentDate) { + result.setSuccess(nullValue); + expireMap(currentDate); + return; + } + + if (convertor != null) { + result.setSuccess((T) convertor.convert(res.get(1))); + } else { + result.setSuccess((T) res.get(1)); + } + } + }); + } + + private void expireMap(long currentDate) { + commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED, + "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); " + + "if table.getn(expiredKeys) > 0 then " + + "expiredKeys = unpack(expiredKeys); " + + "redis.call('zrem', KEYS[2], expiredKeys); " + + "redis.call('hdel', KEYS[1], expiredKeys); " + + "end;", + Arrays.asList(getName(), getTimeoutSetName()), currentDate); } public V put(K key, V value, long ttl, TimeUnit unit) { diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index 632c2aa9c..a2c018752 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -52,6 +52,8 @@ import org.redisson.core.RTopic; */ public interface RedissonClient { + RCache getCache(String name, Codec codec); + RCache getCache(String name); /** diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 6fac24fc0..15a1203e1 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -165,22 +165,25 @@ public class RedisCommand { } public RedisCommand(String name, MultiDecoder replayMultiDecoder) { - this(name, replayMultiDecoder, -1); + this(name, replayMultiDecoder, null, -1); } public RedisCommand(String name, MultiDecoder replayMultiDecoder, Convertor convertor) { - this(name, replayMultiDecoder, -1); - this.convertor = convertor; + this(name, replayMultiDecoder, convertor, -1); } + public RedisCommand(String name, MultiDecoder replayMultiDecoder, Convertor convertor, int inParamIndex) { + this(name, replayMultiDecoder, inParamIndex); + this.convertor = convertor; + } - public RedisCommand(String name, MultiDecoder replayMultiDecoder, int objectParamIndex) { - this(name, null, replayMultiDecoder, null, objectParamIndex); + public RedisCommand(String name, MultiDecoder replayMultiDecoder, int inParamIndex) { + this(name, null, replayMultiDecoder, null, inParamIndex); } public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, - int objectParamIndex) { - this(name, subName, replayMultiDecoder, null, objectParamIndex); + int inParamIndex) { + this(name, subName, replayMultiDecoder, null, inParamIndex); } public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, Decoder reponseDecoder, int inParamIndex) { diff --git a/src/main/java/org/redisson/client/protocol/decoder/TTLMapValueReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/TTLMapValueReplayDecoder.java new file mode 100644 index 000000000..68f862633 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/TTLMapValueReplayDecoder.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.client.handler.State; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class TTLMapValueReplayDecoder implements MultiDecoder> { + + @Override + public Object decode(ByteBuf buf, State state) { + return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); + } + + @Override + public List decode(List parts, State state) { + return (List) parts; + } + + @Override + public boolean isApplicable(int paramNum, State state) { + return paramNum == 0; + } + +}