RCache.getAll optimization. #195

pull/337/head
Nikita 9 years ago
parent e4cf8189c3
commit 3226186311

@ -38,6 +38,7 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.CacheGetAllDecoder;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.RCache;
@ -154,20 +155,46 @@ public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K,
List<Object> args = new ArrayList<Object>(keys.size() + 2);
args.add(System.currentTimeMillis());
args.addAll(keys);
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args), 6, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireSize = redis.call('zcard', KEYS[2])" +
"local maxDate = table.remove(ARGV, 1); " + // index is the first parameter
final Promise<Map<K, V>> result = newPromise();
Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<List<Object>>("EVAL", new CacheGetAllDecoder(args), 6, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireSize = redis.call('zcard', KEYS[2]); " +
"local maxDate = table.remove(ARGV, 1); " // index is the first parameter
+ "local minExpireDate = 92233720368547758;" +
"if expireSize > 0 then "
+ "for i, key in ipairs(ARGV) do "
+ "for i, key in pairs(ARGV) do "
+ "local expireDate = redis.call('zscore', KEYS[2], key); "
+ "if expireDate ~= false and expireDate <= maxDate then "
+ "redis.call('zrem', KEYS[2], key); "
+ "redis.call('hdel', KEYS[1], key); "
+ "minExpireDate = math.min(tonumber(expireDate), minExpireDate); "
+ "print ('expired ' .. key) "
+ "ARGV[i] = ARGV[i] .. '__redisson__skip' "
+ "end;"
+ "end;"
+ "end;" +
"return redis.call('hmget', KEYS[1], unpack(ARGV));",
+ "end; " +
"return {minExpireDate, unpack(redis.call('hmget', KEYS[1], unpack(ARGV)))};",
Arrays.<Object>asList(getName(), getTimeoutSetName()), args.toArray());
future.addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}
List<Object> res = future.getNow();
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
expireMap(currentDate);
}
result.setSuccess((Map<K, V>) res.get(1));
}
});
return result;
}
public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) {
@ -250,10 +277,9 @@ public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K,
private void expireMap(long currentDate) {
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
+ "if table.getn(expiredKeys) > 0 then "
+ "expiredKeys = unpack(expiredKeys); "
+ "redis.call('zrem', KEYS[2], expiredKeys); "
+ "redis.call('hdel', KEYS[1], expiredKeys); "
+ "if #expiredKeys > 0 then "
+ "local s = redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}

@ -0,0 +1,64 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.decoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class CacheGetAllDecoder implements MultiDecoder<List<Object>> {
private final List<Object> args;
public CacheGetAllDecoder(List<Object> args) {
this.args = args;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
@Override
public List<Object> decode(List<Object> parts, State state) {
List<Object> result = new ArrayList<Object>();
Map<Object, Object> map = new HashMap<Object, Object>(parts.size());
for (int index = 0; index < args.size()-1; index++) {
Object value = parts.get(index+1);
if (value == null) {
continue;
}
map.put(args.get(index+1), value);
}
result.add(parts.get(0));
result.add(map);
return result;
}
}
Loading…
Cancel
Save