From f2d373be2a99b441715eef819f27541484cf2ac1 Mon Sep 17 00:00:00 2001 From: Tobias Wichtrey Date: Fri, 21 Sep 2018 16:25:11 +0200 Subject: [PATCH 1/2] add putAll with ttl --- .../java/org/redisson/RedissonMapCache.java | 125 ++++++++++++++++++ .../main/java/org/redisson/api/RMapCache.java | 29 ++++ 2 files changed, 154 insertions(+) diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index e046d7342..71e7dc913 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -55,6 +55,7 @@ import org.redisson.codec.MapCacheEventCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.eviction.EvictionScheduler; +import org.redisson.misc.RedissonPromise; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.Future; @@ -619,6 +620,31 @@ public class RedissonMapCache extends RedissonMap implements RMapCac getLastAccessTimeSetNameByKey(key), getRemovedChannelNameByKey(key), getOptionsName(key)), System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } + + @Override + public void putAll(Map map, long ttl, TimeUnit ttlUnit) { + get(putAllAsync(map, ttl, ttlUnit)); + } + + @Override + public RFuture putAllAsync(Map map, long ttl, TimeUnit ttlUnit) { + if (map.isEmpty()) { + return RedissonPromise.newSucceededFuture(null); + } + + RFuture future = putAllOperationAsync(map, ttl, ttlUnit); + if (hasNoWriter()) { + return future; + } + + MapWriterTask listener = new MapWriterTask() { + @Override + public void execute() { + options.getWriter().writeAll((Map) map); + } + }; + return mapWriterFuture(future, listener); + } @Override public V addAndGet(K key, Number value) { @@ -1771,6 +1797,105 @@ public class RedissonMapCache extends RedissonMap implements RMapCac getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()), params.toArray()); } + + private RFuture putAllOperationAsync(Map map, long ttl, TimeUnit ttlUnit) { + List params = new ArrayList(map.size()*2 + 2); + params.add(System.currentTimeMillis()); + long ttlTimeout = 0; + if (ttl > 0) { + ttlTimeout = System.currentTimeMillis() + ttlUnit.toMillis(ttl); + } + params.add(ttlTimeout); + for (java.util.Map.Entry t : map.entrySet()) { + if (t.getKey() == null) { + throw new NullPointerException("map key can't be null"); + } + if (t.getValue() == null) { + throw new NullPointerException("map value can't be null"); + } + + params.add(encodeMapKey(t.getKey())); + params.add(encodeMapValue(t.getValue())); + } + + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "local currentTime = tonumber(table.remove(ARGV, 1)); " + // index is the first parameter + "local ttl = table.remove(ARGV, 1); " + // ttl is the second parameter + "local ttlNumber = tonumber(ttl); " + + "local maxSize = tonumber(redis.call('hget', KEYS[8], 'max-size'));" + + "for i, value in ipairs(ARGV) do " + + "if i % 2 == 0 then " + + "local key = ARGV[i-1];" + + + "local v = redis.call('hget', KEYS[1], key);" + + "local exists = false;" + + "if v ~= false then" + + " local t, val = struct.unpack('dLc0', v);" + + " local expireDate = 92233720368547758;" + + " local expireDateScore = redis.call('zscore', KEYS[2], key);" + + " if expireDateScore ~= false then" + + " expireDate = tonumber(expireDateScore)" + + " end;" + + " if t ~= 0 then" + + " local expireIdle = redis.call('zscore', KEYS[3], key);" + + " if expireIdle ~= false then" + + " expireDate = math.min(expireDate, tonumber(expireIdle))" + + " end;" + + " end;" + + " if expireDate > tonumber(currentTime) then" + + " exists = true;" + + " end;" + + "end;" + + "" + + "if ttlNumber > 0 then " + + " redis.call('zadd', KEYS[2], ttl, key); " + + "else " + + " redis.call('zrem', KEYS[2], key); " + + "end; " + + "" + + "local newvalue = struct.pack('dLc0', 0, string.len(value), value);" + + "redis.call('hset', KEYS[1], key, newvalue);" + + + "local lastAccessTimeSetName = KEYS[6];" + + "if exists == false then" + + " if maxSize ~= nil and maxSize ~= 0 then" + + " redis.call('zadd', lastAccessTimeSetName, currentTime, key);" + + " local cacheSize = tonumber(redis.call('hlen', KEYS[1]));" + + " if cacheSize > maxSize then" + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1);" + + " for index, lruItem in ipairs(lruItems) do" + + " if lruItem then" + + " local lruItemValue = redis.call('hget', KEYS[1], lruItem);" + + " redis.call('hdel', KEYS[1], lruItem);" + + " redis.call('zrem', KEYS[2], lruItem);" + + " redis.call('zrem', KEYS[3], lruItem);" + + " redis.call('zrem', lastAccessTimeSetName, lruItem);" + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[7];" + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(lruItemValue), lruItemValue);" + + " redis.call('publish', removedChannelName, msg);" + + "end; " + + " end;" + + " end" + + " end;" + + " end;" + + " local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value);" + + " redis.call('publish', KEYS[4], msg);" + + "else " + + "local t, val = struct.unpack('dLc0', v);" + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(key), key, string.len(value), value, string.len(val), val);" + + "redis.call('publish', KEYS[5], msg);" + + + " if maxSize ~= nil and maxSize ~= 0 then " + + " redis.call('zadd', lastAccessTimeSetName, currentTime, key);" + + " end;" + + "end;" + + "end;" + + "end;", + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName(), getCreatedChannelName(), + getUpdatedChannelName(), getLastAccessTimeSetName(), getRemovedChannelName(), getOptionsName()), + params.toArray()); + } private Boolean isWindows; diff --git a/redisson/src/main/java/org/redisson/api/RMapCache.java b/redisson/src/main/java/org/redisson/api/RMapCache.java index ddabab8ff..cc649aaa5 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCache.java +++ b/redisson/src/main/java/org/redisson/api/RMapCache.java @@ -15,8 +15,10 @@ */ package org.redisson.api; +import java.util.Map; import java.util.concurrent.TimeUnit; +import org.redisson.api.map.MapWriter; import org.redisson.api.map.event.MapEntryListener; /** @@ -231,6 +233,33 @@ public interface RMapCache extends RMap, RMapCacheAsync, RDest * false if key already exists in the hash. */ boolean fastPutIfAbsent(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); + + /** + * Associates the specified value with the specified key + * in batch. + *

+ * If {@link MapWriter} is defined then new map entries will be stored in write-through mode. + * + * @param map - mappings to be stored in this map + * @param ttl - time to live for all key\value entries. + * If 0 then stores infinitely. + * @param ttlUnit - time unit + */ + void putAll(java.util.Map map, long ttl, TimeUnit ttlUnit); + + /** + * Associates the specified value with the specified key + * in batch. + *

+ * If {@link MapWriter} is defined then new map entries are stored in write-through mode. + * + * @param map - mappings to be stored in this map + * @param ttl - time to live for all key\value entries. + * If 0 then stores infinitely. + * @param ttlUnit - time unit + * @return void + */ + RFuture putAllAsync(Map map, long ttl, TimeUnit ttlUnit); /** * Returns the number of entries in cache. From c339adf552613e6e257c06648483d33ee551c7e1 Mon Sep 17 00:00:00 2001 From: Tobias Wichtrey Date: Mon, 8 Oct 2018 11:11:12 +0200 Subject: [PATCH 2/2] add test --- .../org/redisson/RedissonMapCacheTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index c086dec21..189fb3ba1 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -649,6 +649,38 @@ public class RedissonMapCacheTest extends BaseMapTest { map.destroy(); } + @Test + public void testPutAllGetTTL() throws InterruptedException { + RMapCache map = redisson.getMapCache("simple06"); + Assert.assertNull(map.get(new SimpleKey("33"))); + Assert.assertNull(map.get(new SimpleKey("55"))); + + Map entries = new HashMap<>(); + entries.put(new SimpleKey("33"), new SimpleValue("44")); + entries.put(new SimpleKey("55"), new SimpleValue("66")); + map.putAll(entries, 2, TimeUnit.SECONDS); + + SimpleValue val1 = map.get(new SimpleKey("33")); + Assert.assertEquals("44", val1.getValue()); + SimpleValue val2 = map.get(new SimpleKey("55")); + Assert.assertEquals("66", val2.getValue()); + + Thread.sleep(1000); + + Assert.assertEquals(2, map.size()); + SimpleValue val3 = map.get(new SimpleKey("33")); + Assert.assertEquals("44", val3.getValue()); + SimpleValue val4 = map.get(new SimpleKey("55")); + Assert.assertEquals("66", val4.getValue()); + Assert.assertEquals(2, map.size()); + + Thread.sleep(1000); + + Assert.assertNull(map.get(new SimpleKey("33"))); + Assert.assertNull(map.get(new SimpleKey("55"))); + map.destroy(); + } + @Test public void testPutIfAbsentTTL() throws Exception { RMapCache map = redisson.getMapCache("simple");