From 7ead223c93198f4cb0dd93c969631d25fd444eae Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 17 Jan 2018 10:03:29 +0300 Subject: [PATCH] Fixed - RLocalCachedMap.putAll gets stuck. #1245 --- .../org/redisson/RedissonLocalCachedMap.java | 39 ++++++++++--------- .../redisson/RedissonLocalCachedMapTest.java | 22 ++++------- 2 files changed, 29 insertions(+), 32 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index d92d87495..7496e028d 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -68,6 +68,7 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,6 @@ import io.netty.buffer.Unpooled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; /** * @@ -239,6 +239,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + log.error("Can't check existance", future.cause()); return; } @@ -953,7 +954,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } } - final RPromise> promise = newPromise(); + final RPromise> promise = new RedissonPromise>(); RFuture> future = super.getAllAsync(mapKeys); future.addListener(new FutureListener>() { @Override @@ -984,19 +985,16 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture putAllOperationAsync(final Map map) { + protected RFuture putAllOperationAsync(final Map map) { List params = new ArrayList(map.size()*3); params.add(invalidateEntryOnChange); params.add(map.size()*2); byte[][] hashes = new byte[map.size()][]; int i = 0; - int payloadSize = 0; for (java.util.Map.Entry t : map.entrySet()) { ByteBuf mapKey = encodeMapKey(t.getKey()); - payloadSize += mapKey.readableBytes(); ByteBuf mapValue = encodeMapValue(t.getValue()); - payloadSize += mapValue.readableBytes(); params.add(mapKey); params.add(mapValue); CacheKey cacheKey = toCacheKey(mapKey); @@ -1004,7 +1002,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R i++; } - ByteBuf msgEncoded; + ByteBuf msgEncoded = null; if (syncStrategy == SyncStrategy.UPDATE) { List entries = new ArrayList(); for (int j = 2; j < params.size(); j += 2) { @@ -1024,23 +1022,25 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] entryId = generateLogEntryId(hash); params.add(time); params.add(entryId); - payloadSize += entryId.length + 8; } } - params.add(msgEncoded); - payloadSize += msgEncoded.readableBytes(); - - log.debug("Payload size passed to putAll method: {}", payloadSize); + if (msgEncoded != null) { + params.add(msgEncoded); + } - final RPromise result = newPromise(); + final RPromise result = new RedissonPromise(); RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, - "redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" + "for i=3, tonumber(ARGV[2]) + 2, 5000 do " + + "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); " + + "end; " + "if ARGV[1] == '1' then " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "end;" + "if ARGV[1] == '2' then " - + "redis.call('zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));" + + "for i=tonumber(ARGV[2]) + 2 + 1, #ARGV - 1, 5000 do " + + "redis.call('hmset', KEYS[3], unpack(ARGV, i, math.min(i+4999, #ARGV - 1))); " + + "end; " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "end;", Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), @@ -1050,6 +1050,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { + result.tryFailure(future.cause()); return; } @@ -1125,7 +1126,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R result.add((V) value.getValue()); } - final RPromise> promise = newPromise(); + final RPromise> promise = new RedissonPromise>(); RFuture> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS, "local entries = redis.call('hgetall', KEYS[1]); " + "local result = {};" @@ -1172,13 +1173,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R result.put((K)value.getKey(), (V)value.getValue()); } - final RPromise> promise = newPromise(); + final RPromise> promise = new RedissonPromise>(); RFuture> future = readAll(ALL_MAP, mapKeys, result); future.addListener(new FutureListener>() { @Override public void operationComplete(Future> future) throws Exception { if (!future.isSuccess()) { + promise.tryFailure(future.cause()); return; } @@ -1215,13 +1217,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R result.add(new AbstractMap.SimpleEntry((K)value.getKey(), (V)value.getValue())); } - final RPromise>> promise = newPromise(); + final RPromise>> promise = new RedissonPromise>>(); RFuture>> future = readAll(ALL_ENTRIES, mapKeys, result); future.addListener(new FutureListener>>() { @Override public void operationComplete(Future>> future) throws Exception { if (!future.isSuccess()) { + promise.tryFailure(future.cause()); return; } diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index b04ded83a..f5a9d848d 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -126,24 +126,18 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { return redisson.getLocalCachedMap("test", options); } -// @Test - public void testBigData() throws InterruptedException { + @Test + public void testBigPutAll() throws InterruptedException { RLocalCachedMap m = redisson.getLocalCachedMap("testValuesWithNearCache2", - LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU)); + LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).syncStrategy(SyncStrategy.INVALIDATE)); - for (int i = 0; i < 100; i++) { - for (int k = 0; k < 1000; k++) { - Map map = new HashMap<>(); - map.put("" + k * i, "" + k * i); - m.putAll(map); - } - System.out.println(i); + Map map = new HashMap<>(); + for (int k = 0; k < 10000; k++) { + map.put("" + k, "" + k); } + m.putAll(map); - System.out.println("done"); - - Thread.sleep(1000000); - + assertThat(m.size()).isEqualTo(10000); }