From dc30a14b3e927efed3751a3a68a8ca6c95bd375b Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 11 Sep 2017 16:37:25 +0300 Subject: [PATCH 1/7] ButeBuf.release method invocation was missed in some RedissonLocalCachedMap methods --- .../src/main/java/org/redisson/RedissonLocalCachedMap.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index a533e9483..61f3bf0ba 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -964,8 +964,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private void cacheMap(Map map) { for (java.util.Map.Entry entry : map.entrySet()) { - ByteBuf mapKey = encodeMapKey(entry.getKey()); - CacheKey cacheKey = toCacheKey(mapKey); + CacheKey cacheKey = toCacheKey(entry.getKey()); CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue()); cache.put(cacheKey, cacheValue); } @@ -1258,9 +1257,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public RFuture replaceAsync(final K key, final V value) { - final ByteBuf keyState = encodeMapKey(key); - final CacheKey cacheKey = toCacheKey(keyState); - RFuture future = super.replaceAsync(key, value); future.addListener(new FutureListener() { @Override @@ -1270,6 +1266,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (future.getNow() != null) { + CacheKey cacheKey = toCacheKey(key); cache.put(cacheKey, new CacheValue(key, value)); } } From 8021dc923f2fa84d4423fb3c77f2750fc0d33ba7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 11 Sep 2017 16:38:14 +0300 Subject: [PATCH 2/7] writeFuture should be canceled on last attempt. --- .../java/org/redisson/command/CommandAsyncService.java | 7 +++++++ .../java/org/redisson/command/CommandBatchService.java | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index f093cca7d..6ae3c22e4 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -538,6 +538,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (details.getConnectionFuture().isSuccess()) { if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { + if (details.getWriteFuture().cancel(false)) { + free(details); + if (details.getException() == null) { + details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"))); + } + details.getAttemptPromise().tryFailure(details.getException()); + } return; } details.incAttempt(); diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index d0c58d35c..46753f21f 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -45,6 +45,7 @@ import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; +import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; @@ -296,6 +297,13 @@ public class CommandBatchService extends CommandAsyncService { if (connectionFuture.isSuccess()) { if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { if (details.getAttempt() == attempts) { + if (details.getWriteFuture().cancel(false)) { + free(entry); + if (details.getException() == null) { + details.setException(new RedisTimeoutException("Unable to send batch after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")); + } + details.getAttemptPromise().tryFailure(details.getException()); + } return; } details.incAttempt(); From deea34a82f384c7a938ef0947a08bb8f2ada5cd9 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 11 Sep 2017 17:29:34 +0300 Subject: [PATCH 3/7] unnecessary ByteBuf.release invocation during last attempt to write execution --- .../main/java/org/redisson/command/CommandAsyncService.java | 4 +--- .../main/java/org/redisson/command/CommandBatchService.java | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 6ae3c22e4..e364e86f3 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -28,7 +28,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.util.ReferenceCountUtil; import org.redisson.RedisClientResult; import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; @@ -64,10 +63,10 @@ import org.redisson.misc.RedissonObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; @@ -539,7 +538,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { if (details.getWriteFuture().cancel(false)) { - free(details); if (details.getException() == null) { details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"))); } diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 46753f21f..f2199f6c7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -298,7 +298,6 @@ public class CommandBatchService extends CommandAsyncService { if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { if (details.getAttempt() == attempts) { if (details.getWriteFuture().cancel(false)) { - free(entry); if (details.getException() == null) { details.setException(new RedisTimeoutException("Unable to send batch after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")); } From 727c4523cb2240d6db4805606df930ee17399892 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 11 Sep 2017 17:42:38 +0300 Subject: [PATCH 4/7] RedissonMapCache bug fixes #1027 --- .../java/org/redisson/RedissonMapCache.java | 102 ++++++++++++++---- .../redisson/eviction/EvictionScheduler.java | 4 +- .../eviction/MapCacheEvictionTask.java | 10 +- 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 0cd87a6b9..6ac3a9faa 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -86,13 +86,13 @@ public class RedissonMapCache extends RedissonMap implements RMapCac public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions options) { super(commandExecutor, name, redisson, options); - evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName()); + evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName(), getLastAccessTimeSetName()); } public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapOptions options) { super(codec, commandExecutor, name, redisson, options); - evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName()); + evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName(), getLastAccessTimeSetName()); } @Override @@ -152,7 +152,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " return 1;" + "end;" + "return 0; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getLastAccessTimeSetNameByKey(key), getOptionsName()), + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getLastAccessTimeSetNameByKey(key), getOptionsName(key)), System.currentTimeMillis(), encodeMapKey(key)); } @@ -486,7 +486,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override protected RFuture putOperationAsync(K key, V value) { return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, - "local v = redis.call('hget', KEYS[1], ARGV[2]);" + + "local v = redis.call('hget', KEYS[1], ARGV[2]);" + "local exists = false;" + "if v ~= false then" + " local t, val = struct.unpack('dLc0', v);" + @@ -1083,17 +1083,14 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "redis.call('zrem', KEYS[2], ARGV[2]); " + "redis.call('zrem', KEYS[3], ARGV[2]); " - + "local maxSize = tonumber(redis.call('hget', KEYS[6], 'max-size'));" - + "if maxSize ~= nil and maxSize ~= 0 then" - + " redis.call('zrem', KEYS[5], ARGV[2]); " - + "end;" + + "redis.call('zrem', KEYS[5], ARGV[2]); " + "redis.call('hdel', KEYS[1], ARGV[2]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(val), val); " + "redis.call('publish', KEYS[4], msg); " + "return val; ", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelNameByKey(key), - getLastAccessTimeSetNameByKey(key), getOptionsName(key)), + getLastAccessTimeSetNameByKey(key)), System.currentTimeMillis(), encodeMapKey(key)); } @@ -1601,7 +1598,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public RFuture putAllOperationAsync(Map map) { - List params = new ArrayList(map.size()*2); + List params = new ArrayList(map.size()*2 + 1); + params.add(System.currentTimeMillis()); for (java.util.Map.Entry t : map.entrySet()) { if (t.getKey() == null) { throw new NullPointerException("map key can't be null"); @@ -1615,17 +1613,72 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "local currentTime = tonumber(table.remove(ARGV, 1)); " + // index is the first parameter + "local maxSize = tonumber(redis.call('hget', KEYS[8], 'max-size'));" + "for i, value in ipairs(ARGV) do " - + "if i % 2 == 0 then " - + "local val = struct.pack('dLc0', 0, string.len(value), value); " - + "local key = ARGV[i-1];" - + "redis.call('hmset', KEYS[1], key, val); " - - + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); " - + "redis.call('publish', KEYS[2], msg); " + + "if i % 2 == 0 then " + + "local key = ARGV[i-1];" + + + "local v = redis.call('hget', KEYS[1], key);" + + "local exists = false;" + + "if v ~= false then" + + " local t, val = struct.unpack('dLc0', v);" + + " local expireDate = 92233720368547758;" + + " local expireDateScore = redis.call('zscore', KEYS[2], key);" + + " if expireDateScore ~= false then" + + " expireDate = tonumber(expireDateScore)" + + " end;" + + " if t ~= 0 then" + + " local expireIdle = redis.call('zscore', KEYS[3], key);" + + " if expireIdle ~= false then" + + " expireDate = math.min(expireDate, tonumber(expireIdle))" + + " end;" + + " end;" + + " if expireDate > tonumber(currentTime) then" + + " exists = true;" + + " end;" + + "end;" + + "" + + "local newvalue = struct.pack('dLc0', 0, string.len(value), value);" + + "redis.call('hset', KEYS[1], key, newvalue);" + + + "local lastAccessTimeSetName = KEYS[6];" + + "if exists == false then" + + " if maxSize ~= nil and maxSize ~= 0 then" + + " redis.call('zadd', lastAccessTimeSetName, currentTime, key);" + + " local cacheSize = tonumber(redis.call('hlen', KEYS[1]));" + + " if cacheSize >= maxSize then" + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize);" + + " for index, lruItem in ipairs(lruItems) do" + + " if lruItem then" + + " local lruItemValue = redis.call('hget', KEYS[1], lruItem);" + + " redis.call('hdel', KEYS[1], lruItem);" + + " redis.call('zrem', KEYS[2], lruItem);" + + " redis.call('zrem', KEYS[3], lruItem);" + + " redis.call('zrem', lastAccessTimeSetName, lruItem);" + + " local removedChannelName = KEYS[7];" + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + + " redis.call('publish', removedChannelName, msg);" + + " end;" + + " end" + + " end;" + + " end;" + + " local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value);" + + " redis.call('publish', KEYS[4], msg);" + + "else " + + "local t, val = struct.unpack('dLc0', v);" + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(key), key, string.len(value), value, string.len(val), val);" + + "redis.call('publish', KEYS[5], msg);" + + + " if maxSize ~= nil and maxSize ~= 0 then " + + " redis.call('zadd', lastAccessTimeSetName, currentTime, key);" + + " end;" + + "end;" + "end;" + "end;", - Arrays.asList(getName(), getCreatedChannelName()), params.toArray()); + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName(), getCreatedChannelName(), + getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()), + params.toArray()); } @Override @@ -1723,7 +1776,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public RFuture expireAtAsync(long timestamp) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " + + "local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " + "if maxSize ~= nil and maxSize ~= 0 then " + " redis.call('pexpire', KEYS[5], ARGV[1]); " + " redis.call('zadd', KEYS[4], 92233720368547758, 'redisson__expiretag'); " + @@ -1741,12 +1794,19 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override public RFuture clearExpireAsync() { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "redis.call('zrem', KEYS[2], 'redisson__expiretag'); " + + "local maxSize = tonumber(redis.call('hget', KEYS[5], 'max-size')); " + + "if maxSize ~= nil and maxSize ~= 0 then " + + " redis.call('persist', KEYS[5]); " + + " redis.call('zrem', KEYS[4], 92233720368547758, 'redisson__expiretag'); " + + " redis.call('persist', KEYS[4]); " + + "end; " + + + "redis.call('zrem', KEYS[2], 'redisson__expiretag'); " + "redis.call('persist', KEYS[2]); " + "redis.call('zrem', KEYS[3], 'redisson__expiretag'); " + "redis.call('persist', KEYS[3]); " + "return redis.call('persist', KEYS[1]); ", - Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName())); + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName(), getLastAccessTimeSetName(), getOptionsName())); } @Override diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java index cb86f0760..347338264 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java @@ -63,8 +63,8 @@ public class EvictionScheduler { } } - public void schedule(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName) { - EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, executor); + public void schedule(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, String lastAccessTimeSetName) { + EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executor); EvictionTask prevTask = tasks.putIfAbsent(name, task); if (prevTask == null) { task.schedule(); diff --git a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java index f40f43354..3aebe254d 100644 --- a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java @@ -33,13 +33,16 @@ public class MapCacheEvictionTask extends EvictionTask { private final String timeoutSetName; private final String maxIdleSetName; private final String expiredChannelName; + private final String lastAccessTimeSetName; - public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, CommandAsyncExecutor executor) { + public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, + String expiredChannelName, String lastAccessTimeSetName, CommandAsyncExecutor executor) { super(executor); this.name = name; this.timeoutSetName = timeoutSetName; this.maxIdleSetName = maxIdleSetName; this.expiredChannelName = expiredChannelName; + this.lastAccessTimeSetName = lastAccessTimeSetName; } @Override @@ -55,6 +58,7 @@ public class MapCacheEvictionTask extends EvictionTask { + "end;" + "end;" + "if #expiredKeys1 > 0 then " + + "redis.call('zrem', KEYS[4], unpack(expiredKeys1)); " + "redis.call('zrem', KEYS[3], unpack(expiredKeys1)); " + "redis.call('zrem', KEYS[2], unpack(expiredKeys1)); " + "redis.call('hdel', KEYS[1], unpack(expiredKeys1)); " @@ -69,12 +73,14 @@ public class MapCacheEvictionTask extends EvictionTask { + "end;" + "end;" + "if #expiredKeys2 > 0 then " + + "redis.call('zrem', KEYS[4], unpack(expiredKeys2)); " + "redis.call('zrem', KEYS[3], unpack(expiredKeys2)); " + "redis.call('zrem', KEYS[2], unpack(expiredKeys2)); " + "redis.call('hdel', KEYS[1], unpack(expiredKeys2)); " + "end; " + "return #expiredKeys1 + #expiredKeys2;", - Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName), System.currentTimeMillis(), keysLimit); + Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName), + System.currentTimeMillis(), keysLimit); } } From 1ab3cb30522c04dbd4ff44cddd98d2849dae2cd4 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 11 Sep 2017 17:43:13 +0300 Subject: [PATCH 5/7] Command logging fixed. #1035 --- .../java/org/redisson/client/handler/CommandDecoder.java | 6 +++--- redisson/src/main/java/org/redisson/misc/LogHelper.java | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index e8349bdc2..4adecf9e9 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -240,9 +240,9 @@ public class CommandDecoder extends ReplayingDecoder { + ". channel: " + channel + " data: " + data)); } else { if (data != null) { - data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data)); + data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + LogHelper.toString(data))); } else { - log.error("Error: {} channel: {} data: {}", error, channel, data); + log.error("Error: {} channel: {} data: {}", error, channel, LogHelper.toString(data)); } } } finally { @@ -320,7 +320,7 @@ public class CommandDecoder extends ReplayingDecoder { parts.add(result); } else { if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, data, result); + log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result)); } } } diff --git a/redisson/src/main/java/org/redisson/misc/LogHelper.java b/redisson/src/main/java/org/redisson/misc/LogHelper.java index fe86507af..857f58787 100644 --- a/redisson/src/main/java/org/redisson/misc/LogHelper.java +++ b/redisson/src/main/java/org/redisson/misc/LogHelper.java @@ -18,6 +18,8 @@ package org.redisson.misc; import java.lang.reflect.Array; import java.util.Collection; +import org.redisson.client.protocol.CommandData; + /** * @author Philipp Marx */ @@ -38,6 +40,9 @@ public class LogHelper { return toArrayString(object); } else if (object instanceof Collection) { return toCollectionString((Collection) object); + } else if (object instanceof CommandData) { + CommandData cd = (CommandData)object; + return cd.getCommand() + ", params: " + LogHelper.toString(cd.getParams()); } else { return String.valueOf(object); } From 097eb32cd6dffdce4af99c60e9f276df23c828b1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 12 Sep 2017 11:02:42 +0300 Subject: [PATCH 6/7] RMapCache.setMaxSize method added. #1027 --- .../main/java/org/redisson/RedissonMap.java | 2 +- .../java/org/redisson/RedissonMapCache.java | 48 +++++++++++-------- .../java/org/redisson/api/RKeysAsync.java | 2 +- .../main/java/org/redisson/api/RMapCache.java | 11 ++++- .../java/org/redisson/api/RMapCacheAsync.java | 12 ++++- .../client/protocol/RedisCommands.java | 5 +- 6 files changed, 54 insertions(+), 26 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 7c4f7fd22..b8b52fa70 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -849,7 +849,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } protected RFuture fastPutOperationAsync(K key, V value) { - return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSET, getName(key), key, value); + return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSET, getName(key), encodeMapKey(key), encodeMapValue(value)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 6ac3a9faa..09752207a 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -106,16 +106,24 @@ public class RedissonMapCache extends RedissonMap implements RMapCac throw new IllegalArgumentException("maxSize should be greater than zero"); } - return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local value = redis.call('hget', KEYS[1], 'max-size'); " + - "if (value == false) then " - + "redis.call('hset', KEYS[1], 'max-size', ARGV[1]); " - + "return 1;" - + "end;" - + "return 0;", - Arrays.asList(getOptionsName()), maxSize); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.HSETNX, getOptionsName(), "max-size", maxSize); + } + + @Override + public void setMaxSize(int permits) { + get(setMaxSizeAsync(permits)); } + @Override + public RFuture setMaxSizeAsync(int maxSize) { + if (maxSize <= 0) { + throw new IllegalArgumentException("maxSize should be greater than zero"); + } + + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HSET_VOID, getOptionsName(), "max-size", maxSize); + } + + @Override public RFuture containsKeyAsync(Object key) { checkKey(key); @@ -515,8 +523,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " if maxSize ~= nil and maxSize ~= 0 then" + " redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]);" + " local cacheSize = tonumber(redis.call('hlen', KEYS[1]));" + - " if cacheSize >= maxSize then" + - " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize);" + + " if cacheSize > maxSize then" + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1);" + " for index, lruItem in ipairs(lruItems) do" + " if lruItem then" + " local lruItemValue = redis.call('hget', KEYS[1], lruItem);" + @@ -584,8 +592,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "if maxSize ~= nil and maxSize ~= 0 then " + " redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " + " local cacheSize = tonumber(redis.call('hlen', KEYS[1])); " + - " if cacheSize >= maxSize then " + - " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize); " + + " if cacheSize > maxSize then " + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1); " + " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + @@ -662,8 +670,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " local lastAccessTimeSetName = KEYS[6]; " + " redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " + " local cacheSize = tonumber(redis.call('hlen', KEYS[1])); " + - " if cacheSize >= maxSize then " + - " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize); " + + " if cacheSize > maxSize then " + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1); " + " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + @@ -1290,8 +1298,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " local lastAccessTimeSetName = KEYS[6]; " + " redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " + " local cacheSize = tonumber(redis.call('hlen', KEYS[1])); " + - " if cacheSize >= maxSize then " + - " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize); " + + " if cacheSize > maxSize then " + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1); " + " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + @@ -1340,8 +1348,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac "if maxSize ~= nil and maxSize ~= 0 then " + " redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " + " local cacheSize = tonumber(redis.call('hlen', KEYS[1])); " + - " if cacheSize >= maxSize then " + - " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize); " + + " if cacheSize > maxSize then " + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1); " + " for index, lruItem in ipairs(lruItems) do " + " if lruItem then " + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + @@ -1647,8 +1655,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac " if maxSize ~= nil and maxSize ~= 0 then" + " redis.call('zadd', lastAccessTimeSetName, currentTime, key);" + " local cacheSize = tonumber(redis.call('hlen', KEYS[1]));" + - " if cacheSize >= maxSize then" + - " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize);" + + " if cacheSize > maxSize then" + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1);" + " for index, lruItem in ipairs(lruItems) do" + " if lruItem then" + " local lruItemValue = redis.call('hget', KEYS[1], lruItem);" + diff --git a/redisson/src/main/java/org/redisson/api/RKeysAsync.java b/redisson/src/main/java/org/redisson/api/RKeysAsync.java index 65343be43..d22b8ab74 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysAsync.java +++ b/redisson/src/main/java/org/redisson/api/RKeysAsync.java @@ -194,7 +194,7 @@ public interface RKeysAsync { *

* Requires Redis 4.0+ * - * @param keys + * @param keys - object names * @return number of removed keys */ RFuture unlinkAsync(String ... keys); diff --git a/redisson/src/main/java/org/redisson/api/RMapCache.java b/redisson/src/main/java/org/redisson/api/RMapCache.java index f84c05016..9286d9a38 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCache.java +++ b/redisson/src/main/java/org/redisson/api/RMapCache.java @@ -40,7 +40,16 @@ import org.redisson.api.map.event.MapEntryListener; public interface RMapCache extends RMap, RMapCacheAsync { /** - * Tries to set max size of the map. + * Sets max size of the map. + * Superfluous elements are evicted using LRU algorithm. + * + * @param maxSize - max size + */ + void setMaxSize(int maxSize); + + /** + * Tries to set max size of the map. + * Superfluous elements are evicted using LRU algorithm. * * @param maxSize - max size * @return true if max size has been successfully set, otherwise false. diff --git a/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java b/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java index cbbc74781..cc4a716bf 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java @@ -38,7 +38,17 @@ import java.util.concurrent.TimeUnit; public interface RMapCacheAsync extends RMapAsync { /** - * Tries to set max size of the map. + * Sets max size of the map. + * Superfluous elements are evicted using LRU algorithm. + * + * @param maxSize - max size + * @return void + */ + RFuture setMaxSizeAsync(int maxSize); + + /** + * Tries to set max size of the map. + * Superfluous elements are evicted using LRU algorithm. * * @param maxSize - max size * @return true if max size has been successfully set, otherwise false. diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 6e50d598b..56fd6bb35 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -254,8 +254,9 @@ public interface RedisCommands { RedisStrictCommand MSET = new RedisStrictCommand("MSET", new VoidReplayConvertor()); RedisStrictCommand MSETNX = new RedisStrictCommand("MSETNX", new BooleanReplayConvertor()); - RedisCommand HSETNX = new RedisCommand("HSETNX", new BooleanReplayConvertor()); - RedisCommand HSET = new RedisCommand("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP); + RedisStrictCommand HSETNX = new RedisStrictCommand("HSETNX", new BooleanReplayConvertor()); + RedisStrictCommand HSET = new RedisStrictCommand("HSET", new BooleanReplayConvertor()); + RedisStrictCommand HSET_VOID = new RedisStrictCommand("HSET", new VoidReplayConvertor()); RedisCommand> HSCAN = new RedisCommand>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); RedisCommand> HGETALL = new RedisCommand>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP); RedisCommand>> HGETALL_ENTRY = new RedisCommand>>("HGETALL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); From cf1a1905948bc9d42ddb882b2ea924708311c359 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 12 Sep 2017 12:14:52 +0300 Subject: [PATCH 7/7] RMapCacheTest fixed --- .../org/redisson/RedissonMapCacheTest.java | 116 +++++++++++++++++- 1 file changed, 111 insertions(+), 5 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 7d659983f..5c3fc3191 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.junit.Assert; @@ -218,16 +219,17 @@ public class RedissonMapCacheTest extends BaseMapTest { @Test public void testMaxSize() { - final int maxSize = 2; + final AtomicInteger maxSize = new AtomicInteger(2); Map store = new LinkedHashMap() { @Override protected boolean removeEldestEntry(Entry eldest) { - return size() > maxSize; + return size() > maxSize.get(); } }; MapOptions options = MapOptions.defaults().writer(createMapWriter(store)); RMapCache map = redisson.getMapCache("test", options); - map.trySetMaxSize(maxSize); + assertThat(map.trySetMaxSize(maxSize.get())).isTrue(); + assertThat(map.trySetMaxSize(1)).isFalse(); assertThat(map.fastPutIfAbsent("01", "00")).isTrue(); assertThat(map.fastPutIfAbsent("02", "00")).isTrue(); @@ -237,7 +239,7 @@ public class RedissonMapCacheTest extends BaseMapTest { assertThat(map.fastPut("2", "22", 10, TimeUnit.SECONDS)).isTrue(); assertThat(map.fastPut("3", "33", 10, TimeUnit.SECONDS)).isTrue(); - assertThat(map.size()).isEqualTo(maxSize); + assertThat(map.size()).isEqualTo(maxSize.get()); Map expected = new HashMap<>(); expected.put("2", "22"); @@ -257,7 +259,111 @@ public class RedissonMapCacheTest extends BaseMapTest { assertThat(map.remove("2", "22")).isTrue(); assertThat(map.remove("0")).isNull(); assertThat(map.remove("3")).isEqualTo("33"); - } + + maxSize.set(6); + map.setMaxSize(maxSize.get()); + assertThat(map.fastPut("01", "00")).isTrue(); + assertThat(map.fastPut("02", "00")).isTrue(); + assertThat(map.fastPut("03", "00")).isTrue(); + assertThat(map.fastPut("04", "00")).isTrue(); + assertThat(map.fastPut("05", "00")).isTrue(); + assertThat(map.fastPut("06", "00")).isTrue(); + assertThat(map.fastPut("07", "00")).isTrue(); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("02", "03", "04", "05", "06", "07"); + + map.put("08", "00"); + map.put("09", "00"); + map.put("10", "00"); + map.put("11", "00"); + map.put("12", "00"); + map.put("13", "00"); + map.put("14", "00"); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("09", "10", "11", "12", "13", "14"); + + map.putIfAbsent("15", "00", 1, TimeUnit.SECONDS); + map.putIfAbsent("16", "00", 1, TimeUnit.SECONDS); + map.putIfAbsent("17", "00", 1, TimeUnit.SECONDS); + map.putIfAbsent("18", "00", 1, TimeUnit.SECONDS); + map.putIfAbsent("19", "00", 1, TimeUnit.SECONDS); + map.putIfAbsent("20", "00", 1, TimeUnit.SECONDS); + map.putIfAbsent("21", "00", 1, TimeUnit.SECONDS); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("16", "17", "18", "19", "20", "21"); + + map.putIfAbsent("22", "00"); + map.putIfAbsent("23", "00"); + map.putIfAbsent("24", "00"); + map.putIfAbsent("25", "00"); + map.putIfAbsent("26", "00"); + map.putIfAbsent("27", "00"); + map.putIfAbsent("28", "00"); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("23", "24", "25", "26", "27", "28"); + + map.fastPut("29", "00", 1, TimeUnit.SECONDS); + map.fastPut("30", "00", 1, TimeUnit.SECONDS); + map.fastPut("31", "00", 1, TimeUnit.SECONDS); + map.fastPut("32", "00", 1, TimeUnit.SECONDS); + map.fastPut("33", "00", 1, TimeUnit.SECONDS); + map.fastPut("34", "00", 1, TimeUnit.SECONDS); + map.fastPut("35", "00", 1, TimeUnit.SECONDS); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("30", "31", "32", "33", "34", "35"); + + map.put("36", "00", 1, TimeUnit.SECONDS); + map.put("37", "00", 1, TimeUnit.SECONDS); + map.put("38", "00", 1, TimeUnit.SECONDS); + map.put("39", "00", 1, TimeUnit.SECONDS); + map.put("40", "00", 1, TimeUnit.SECONDS); + map.put("41", "00", 1, TimeUnit.SECONDS); + map.put("42", "00", 1, TimeUnit.SECONDS); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("37", "38", "39", "40", "41", "42"); + + map.fastPutIfAbsent("43", "00"); + map.fastPutIfAbsent("44", "00"); + map.fastPutIfAbsent("45", "00"); + map.fastPutIfAbsent("46", "00"); + map.fastPutIfAbsent("47", "00"); + map.fastPutIfAbsent("48", "00"); + map.fastPutIfAbsent("49", "00"); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("44", "45", "46", "47", "48", "49"); + + map.fastPutIfAbsent("50", "00", 1, TimeUnit.SECONDS); + map.fastPutIfAbsent("51", "00", 1, TimeUnit.SECONDS); + map.fastPutIfAbsent("52", "00", 1, TimeUnit.SECONDS); + map.fastPutIfAbsent("53", "00", 1, TimeUnit.SECONDS); + map.fastPutIfAbsent("54", "00", 1, TimeUnit.SECONDS); + map.fastPutIfAbsent("55", "00", 1, TimeUnit.SECONDS); + map.fastPutIfAbsent("56", "00", 1, TimeUnit.SECONDS); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("51", "52", "53", "54", "55", "56"); + + Map newMap = new LinkedHashMap<>(); + newMap.put("57", "00"); + newMap.put("58", "00"); + newMap.put("59", "00"); + newMap.put("60", "00"); + newMap.put("61", "00"); + newMap.put("62", "00"); + newMap.put("63", "00"); + map.putAll(newMap); + + assertThat(map.size()).isEqualTo(maxSize.get()); + assertThat(map.keySet()).containsExactly("58", "59", "60", "61", "62", "63"); + + } @Test public void testOrdering() {