LocalCachedMap.putAll optimization

pull/794/head
Nikita 8 years ago
parent 204a594725
commit 9b5b7f5ac4

@ -53,7 +53,8 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.*;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -73,14 +74,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public static class LocalCachedMapInvalidate implements Serializable {
private byte[] excludedId;
private byte[] keyHash;
private List<byte[]> keyHashes;
public LocalCachedMapInvalidate() {
}
public LocalCachedMapInvalidate(byte[] excludedId, byte[] keyHash) {
public LocalCachedMapInvalidate(byte[] excludedId, byte[]... keyHash) {
super();
this.keyHash = keyHash;
this.keyHashes = Arrays.asList(keyHash);
this.excludedId = excludedId;
}
@ -88,8 +89,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return excludedId;
}
public byte[] getKeyHash() {
return keyHash;
public Collection<byte[]> getKeyHashes() {
return keyHashes;
}
}
@ -230,8 +231,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
CacheKey key = new CacheKey(invalidateMsg.getKeyHash());
cache.remove(key);
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
}
@ -725,27 +728,30 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
List<Object> params = new ArrayList<Object>(map.size()*3);
List<Object> msgs = new ArrayList<Object>(map.size());
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][];
int i = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
byte[] mapKey = encodeMapKey(t.getKey());
byte[] mapValue = encodeMapValue(t.getValue());
params.add(mapKey);
params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
msgs.add(msgEncoded);
hashes[i] = cacheKey.getKeyHash();
i++;
}
params.addAll(msgs);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
params.add(msgEncoded);
final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then "
+ "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[i]); "
+ "end; "
// + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
// + "end; "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray());

Loading…
Cancel
Save