diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index c8410d6e2..3189d2eca 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -253,17 +253,17 @@ public class Redisson implements RedissonClient { @Override public RLocalCachedMap getLocalCachedMap(String name, LocalCachedMapOptions options) { - return new RedissonLocalCachedMap(id, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this); + return new RedissonLocalCachedMap(connectionManager.getCommandExecutor(), name, options, evictionScheduler, this); } @Override public RLocalCachedMap getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) { - return new RedissonLocalCachedMap(id, codec, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this); + return new RedissonLocalCachedMap(codec, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this); } @Override public RMap getMap(String name) { - return new RedissonMap(id, connectionManager.getCommandExecutor(), name, this); + return new RedissonMap(connectionManager.getCommandExecutor(), name, this); } @Override @@ -308,17 +308,17 @@ public class Redisson implements RedissonClient { @Override public RMapCache getMapCache(String name) { - return new RedissonMapCache(id, evictionScheduler, connectionManager.getCommandExecutor(), name, this); + return new RedissonMapCache(evictionScheduler, connectionManager.getCommandExecutor(), name, this); } @Override public RMapCache getMapCache(String name, Codec codec) { - return new RedissonMapCache(id, codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this); + return new RedissonMapCache(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this); } @Override public RMap getMap(String name, Codec codec) { - return new RedissonMap(id, codec, connectionManager.getCommandExecutor(), name, this); + return new RedissonMap(codec, connectionManager.getCommandExecutor(), name, this); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index b25b07f50..3e7cc2509 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -102,12 +102,12 @@ public class RedissonBatch implements RBatch { @Override public RMapAsync getMap(String name) { - return new RedissonMap(id, executorService, name, null); + return new RedissonMap(executorService, name, null); } @Override public RMapAsync getMap(String name, Codec codec) { - return new RedissonMap(id, codec, executorService, name, null); + return new RedissonMap(codec, executorService, name, null); } @Override @@ -202,12 +202,12 @@ public class RedissonBatch implements RBatch { @Override public RMapCacheAsync getMapCache(String name, Codec codec) { - return new RedissonMapCache(id, codec, evictionScheduler, executorService, name, null); + return new RedissonMapCache(codec, evictionScheduler, executorService, name, null); } @Override public RMapCacheAsync getMapCache(String name) { - return new RedissonMapCache(id, evictionScheduler, executorService, name, null); + return new RedissonMapCache(evictionScheduler, executorService, name, null); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index efb981356..2d018a3c4 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -208,17 +208,17 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private int invalidationStatusListenerId; private volatile long lastInvalidate; - protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { - super(id, commandExecutor, name, redisson); - init(id, name, options, redisson, evictionScheduler); + protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { + super(commandExecutor, name, redisson); + init(name, options, redisson, evictionScheduler); } - protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { - super(id, codec, connectionManager, name, redisson); - init(id, name, options, redisson, evictionScheduler); + protected RedissonLocalCachedMap(Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { + super(codec, connectionManager, name, redisson); + init(name, options, redisson, evictionScheduler); } - private void init(UUID id, String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) { + private void init(String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) { instanceId = generateId(); if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 5eb7552a7..a1a71d9e5 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import org.redisson.api.RFuture; import org.redisson.api.RLock; @@ -47,7 +46,6 @@ import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.command.CommandExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.mapreduce.RedissonMapReduce; import org.redisson.misc.Hash; @@ -69,18 +67,15 @@ public class RedissonMap extends RedissonExpirable implements RMap { static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP); static final RedisCommand EVAL_PUT = EVAL_REPLACE; - private final UUID id; - private final RedissonClient redisson; + final RedissonClient redisson; - protected RedissonMap(UUID id, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name); - this.id = id; this.redisson = redisson; } - public RedissonMap(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(codec, commandExecutor, name); - this.id = id; this.redisson = redisson; } @@ -92,13 +87,13 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public RLock getLock(K key) { String lockName = getLockName(key); - return new RedissonLock((CommandExecutor)commandExecutor, lockName, id); + return redisson.getLock(lockName); } @Override public RReadWriteLock getReadWriteLock(K key) { String lockName = getLockName(key); - return new RedissonReadWriteLock((CommandExecutor)commandExecutor, lockName, id); + return redisson.getReadWriteLock(lockName); } private String getLockName(Object key) { diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index b7df19c49..c60f449bb 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -15,6 +15,8 @@ */ package org.redisson; +import java.io.IOException; +import java.math.BigDecimal; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -23,19 +25,27 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RMapCache; +import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; +import org.redisson.api.listener.MessageListener; +import org.redisson.api.map.event.EntryCreatedListener; +import org.redisson.api.map.event.EntryEvent; +import org.redisson.api.map.event.EntryExpiredListener; +import org.redisson.api.map.event.EntryRemovedListener; +import org.redisson.api.map.event.EntryUpdatedListener; +import org.redisson.api.map.event.MapEntryListener; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.MapScanCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder; @@ -45,16 +55,13 @@ import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ObjectListDecoder; import org.redisson.client.protocol.decoder.ObjectMapDecoder; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.codec.MapCacheEventCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.eviction.EvictionScheduler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.io.IOException; -import java.math.BigDecimal; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.convertor.NumberConvertor; /** *

Map-based cache with ability to set TTL for each entry via @@ -77,45 +84,39 @@ import org.redisson.client.protocol.convertor.NumberConvertor; */ public class RedissonMapCache extends RedissonMap implements RMapCache { - static final RedisCommand EVAL_PUT_IF_ABSENT = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP); - static final RedisCommand EVAL_HSET = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP); - static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); - static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); + static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 7, ValueType.MAP, ValueType.MAP_VALUE); static final RedisCommand EVAL_HMSET = new RedisCommand("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP); - private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 5, ValueType.MAP); - private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_PUT_TTL_IF_ABSENT = new RedisCommand("EVAL", 10, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_FAST_PUT_TTL = new RedisCommand("EVAL", new BooleanReplayConvertor(), 9, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_FAST_PUT_TTL_IF_ABSENT = new RedisCommand("EVAL", new BooleanReplayConvertor(), 10, ValueType.MAP, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 12, ValueType.MAP, ValueType.MAP_VALUE); + private static final RedisCommand EVAL_PUT_TTL_IF_ABSENT = new RedisCommand("EVAL", 11, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand EVAL_GET_TTL = new RedisCommand("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_CONTAINS_KEY = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_KEY); - static final RedisCommand EVAL_CONTAINS_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_VALUE); - static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 5, ValueType.MAP_KEY); + static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 7, ValueType.MAP_KEY); + static final RedisCommand EVAL_PUT = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); + static final RedisCommand EVAL_PUT_IF_ABSENT = new RedisCommand("EVAL", 5, ValueType.MAP, ValueType.MAP_VALUE); - RedissonMapCache(UUID id, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { - super(id, commandExecutor, name, redisson); + RedissonMapCache(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + super(commandExecutor, name, redisson); } - RedissonMapCache(UUID id, Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { - super(id, codec, commandExecutor, name, redisson); + RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + super(codec, commandExecutor, name, redisson); } - public RedissonMapCache(UUID id, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { - super(id, commandExecutor, name, redisson); - evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName()); + public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + super(commandExecutor, name, redisson); + evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName()); } - public RedissonMapCache(UUID id, Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { - super(id, codec, commandExecutor, name, redisson); - evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName()); + public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { + super(codec, commandExecutor, name, redisson); + evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName(), getExpiredChannelName()); } @Override public RFuture containsKeyAsync(Object key) { checkKey(key); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_CONTAINS_KEY, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "local expireDate = 92233720368547758; " + "if value ~= false then " + @@ -141,14 +142,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "return 1;" + "end;" + "return 0; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), + System.currentTimeMillis(), encodeMapKey(key)); } @Override public RFuture containsValueAsync(Object value) { checkValue(value); - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE, + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, "local s = redis.call('hgetall', KEYS[1]); " + "for i, v in ipairs(s) do " + "if i % 2 == 0 then " @@ -179,7 +181,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; " + "end;" + "return 0;", - Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), value); + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), encodeMapValue(value)); } @Override @@ -287,23 +289,21 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "if value == false then " + "insertable = true; " + "else " - + "if insertable == false then " - + "local t, val = struct.unpack('dLc0', value); " - + "local expireDate = 92233720368547758; " - + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " - + "if expireDateScore ~= false then " - + "expireDate = tonumber(expireDateScore) " - + "end; " - + "if t ~= 0 then " - + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " - + "if expireIdle ~= false then " - + "expireDate = math.min(expireDate, tonumber(expireIdle)) " - + "end; " - + "end; " - + "if expireDate <= tonumber(ARGV[1]) then " - + "insertable = true; " - + "end; " - + "end; " + + "local t, val = struct.unpack('dLc0', value); " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " + + "if expireIdle ~= false then " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "insertable = true; " + + "end; " + "end; " + "if insertable == true then " @@ -322,16 +322,20 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; " // value - + "local val = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "local val = struct.pack('dLc0', tonumber(ARGV[4]), string.len(ARGV[6]), ARGV[6]); " + "redis.call('hset', KEYS[1], ARGV[5], val); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); " + + "redis.call('publish', KEYS[4], msg); " + + "return nil;" + "else " + "local t, val = struct.unpack('dLc0', value); " + "redis.call('zadd', KEYS[3], t + ARGV[1], ARGV[5]); " + "return val;" + "end; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()), + System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); } @Override @@ -339,7 +343,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[1]); " + "if value == false then " + "return 0; " @@ -348,11 +352,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "if val == ARGV[2] then " + "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('zrem', KEYS[3], ARGV[1]); " - + "return redis.call('hdel', KEYS[1], ARGV[1]); " + + "redis.call('hdel', KEYS[1], ARGV[1]); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "return 1; " + "else " - + "return 0 " + + "return 0; " + "end", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), key, value); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelName()), + encodeMapKey(key), encodeMapValue(value)); } @Override @@ -403,11 +411,16 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[1], value); " + "if v == false then " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2]); " + + "redis.call('publish', KEYS[2], msg); " + "return nil; " + "end; " + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2], string.len(val), val); " + + "redis.call('publish', KEYS[3], msg); " + "return val; ", - Collections.singletonList(getName(key)), key, value); + Arrays.asList(getName(key), getCreatedChannelName(), getUpdatedChannelName()), + key, value); } @Override @@ -415,9 +428,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT, + return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_IF_ABSENT, "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); " + "if redis.call('hsetnx', KEYS[1], ARGV[1], value) == 1 then " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2]); " + + "redis.call('publish', KEYS[2], msg); " + "return nil;" + "else " + "local v = redis.call('hget', KEYS[1], ARGV[1]); " @@ -427,7 +442,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "local t, val = struct.unpack('dLc0', v); " + "return val; " + "end", - Collections.singletonList(getName(key)), key, value); + Arrays.asList(getName(key), getCreatedChannelName()), + key, value); } @Override @@ -473,14 +489,22 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; " + "end; " + "end; " + + "local newValue = tonumber(ARGV[3]); " - + "if expireDate >= tonumber(ARGV[1]) then " + + "if value ~= false and expireDate > tonumber(ARGV[1]) then " + "newValue = tonumber(val) + newValue; " + + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(newValue), newValue, string.len(val), val); " + + "redis.call('publish', KEYS[5], msg); " + + "else " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + + "redis.call('publish', KEYS[4], msg); " + "end; " + "local newValuePack = struct.pack('dLc0', t + tonumber(ARGV[1]), string.len(newValue), newValue); " + "redis.call('hset', KEYS[1], ARGV[2], newValuePack); " + "return tostring(newValue); ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), keyState, valueState); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), + System.currentTimeMillis(), keyState, valueState); } @Override @@ -533,20 +557,53 @@ public class RedissonMapCache extends RedissonMap implements RMapCac maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta; } - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_FAST_PUT_TTL, - "if tonumber(ARGV[1]) > 0 then " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); " + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + "local insertable = false; " + + "local value = redis.call('hget', KEYS[1], ARGV[5]); " + + "local t, val;" + + "if value == false then " + + "insertable = true; " + + "else " + + "t, val = struct.unpack('dLc0', value); " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " + + "if expireIdle ~= false then " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "insertable = true; " + + "end; " + + "end; " + + + "if tonumber(ARGV[2]) > 0 then " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[5]); " + "else " - + "redis.call('zrem', KEYS[2], ARGV[4]); " + + "redis.call('zrem', KEYS[2], ARGV[5]); " + "end; " - + "if tonumber(ARGV[2]) > 0 then " - + "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); " + + "if tonumber(ARGV[3]) > 0 then " + + "redis.call('zadd', KEYS[3], ARGV[3], ARGV[5]); " + "else " - + "redis.call('zrem', KEYS[3], ARGV[4]); " + + "redis.call('zrem', KEYS[3], ARGV[5]); " + "end; " - + "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); " + - "return redis.call('hset', KEYS[1], ARGV[4], value); ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); + + "local value = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "redis.call('hset', KEYS[1], ARGV[5], value); " + + "if insertable == true then " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); " + + "redis.call('publish', KEYS[4], msg); " + + "return 1;" + + "else " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6], string.len(val), val); " + + "redis.call('publish', KEYS[5], msg); " + + "return 0;" + + "end;", + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), + System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); } @Override @@ -595,25 +652,56 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_TTL, - "local v = redis.call('hget', KEYS[1], ARGV[4]); " - + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); " + "local insertable = false; " + + "local v = redis.call('hget', KEYS[1], ARGV[5]); " + + "if v == false then " + + "insertable = true; " + + "else " + + "local t, val = struct.unpack('dLc0', v); " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[5]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[5]); " + + "if expireIdle ~= false then " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "insertable = true; " + + "end; " + + "end; " + + + "if tonumber(ARGV[2]) > 0 then " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[5]); " + "else " - + "redis.call('zrem', KEYS[2], ARGV[4]); " + + "redis.call('zrem', KEYS[2], ARGV[5]); " + "end; " - + "if tonumber(ARGV[2]) > 0 then " - + "redis.call('zadd', KEYS[3], ARGV[2], ARGV[4]); " + + "if tonumber(ARGV[3]) > 0 then " + + "redis.call('zadd', KEYS[3], ARGV[3], ARGV[5]); " + "else " - + "redis.call('zrem', KEYS[3], ARGV[4]); " + + "redis.call('zrem', KEYS[3], ARGV[5]); " + "end; " - + "local value = struct.pack('dLc0', ARGV[3], string.len(ARGV[5]), ARGV[5]); " - + "redis.call('hset', KEYS[1], ARGV[4], value); " - + "if v == false then " + + + "local value = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "redis.call('hset', KEYS[1], ARGV[5], value); " + + + "if insertable == true then " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); " + + "redis.call('publish', KEYS[4], msg); " + "return nil;" + "end; " + + "local t, val = struct.unpack('dLc0', v); " + + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6], string.len(val), val); " + + "redis.call('publish', KEYS[5], msg); " + + "return val", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), + System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); } String getTimeoutSetNameByKey(Object key) { @@ -640,6 +728,38 @@ public class RedissonMapCache extends RedissonMap implements RMapCac return prefixName("redisson__idle__set", getName()); } + String getCreatedChannelName(String name) { + return prefixName("redisson_map_cache_created", name); + } + + String getCreatedChannelName() { + return prefixName("redisson_map_cache_created", getName()); + } + + String getUpdatedChannelName(String name) { + return prefixName("redisson_map_cache_updated", name); + } + + String getUpdatedChannelName() { + return prefixName("redisson_map_cache_updated", getName()); + } + + String getExpiredChannelName(String name) { + return prefixName("redisson_map_cache_expired", name); + } + + String getExpiredChannelName() { + return prefixName("redisson_map_cache_expired", getName()); + } + + String getRemovedChannelName(String name) { + return prefixName("redisson_map_cache_removed", name); + } + + String getRemovedChannelName() { + return prefixName("redisson_map_cache_removed", getName()); + } + @Override public RFuture removeAsync(K key) { @@ -652,10 +772,13 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "redis.call('hdel', KEYS[1], ARGV[1]); " + "if v ~= false then " + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + "return val; " + "end; " + "return v", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), key); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelName()), + key); } @Override @@ -666,9 +789,18 @@ public class RedissonMapCache extends RedissonMap implements RMapCac return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE, "redis.call('zrem', KEYS[3], unpack(ARGV)); " + - "redis.call('zrem', KEYS[2], unpack(ARGV)); " + + "redis.call('zrem', KEYS[2], unpack(ARGV)); " + + "for i, key in ipairs(ARGV) do " + + "local v = redis.call('hget', KEYS[1], key); " + + "if v ~= false then " + + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "end;" + + "end;" + "return redis.call('hdel', KEYS[1], unpack(ARGV)); ", - Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName()), keys); + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName(), getRemovedChannelName()), + keys); } @Override @@ -762,10 +894,44 @@ public class RedissonMapCache extends RedissonMap implements RMapCac checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_HSET, - "local val = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); " - + "return redis.call('hset', KEYS[1], ARGV[1], val); ", - Collections.singletonList(getName(key)), key, value); + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + "local insertable = false; " + + "local v = redis.call('hget', KEYS[1], ARGV[2]); " + + "if v == false then " + + "insertable = true; " + + "else " + + "local t, val = struct.unpack('dLc0', v); " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); " + + "if expireIdle ~= false then " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate <= tonumber(ARGV[1]) then " + + "insertable = true; " + + "end; " + + "end; " + + + "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[2], val); " + + + "if insertable == true then " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + + "redis.call('publish', KEYS[4], msg); " + + "return 1;" + + "else " + + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); " + + "redis.call('publish', KEYS[5], msg); " + + "return 0;" + + "end;", + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName(), getUpdatedChannelName()), + System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @Override @@ -773,11 +939,13 @@ public class RedissonMapCache extends RedissonMap implements RMapCac checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_IF_ABSENT, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "if value == false then " + "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); " + "redis.call('hset', KEYS[1], ARGV[2], val); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + + "redis.call('publish', KEYS[4], msg); " + "return 1; " + "end; " + "local t, val = struct.unpack('dLc0', value); " @@ -805,8 +973,12 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "redis.call('zrem', KEYS[3], ARGV[2]); " + "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); " + "redis.call('hset', KEYS[1], ARGV[2], val); " + + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + + "redis.call('publish', KEYS[4], msg); " + "return 1; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key, value); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()), + System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @Override @@ -854,7 +1026,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta; } - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_FAST_PUT_TTL_IF_ABSENT, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local insertable = false; " + "local value = redis.call('hget', KEYS[1], ARGV[5]); " + "if value == false then " @@ -898,11 +1070,15 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "local val = struct.pack('dLc0', ARGV[4], string.len(ARGV[6]), ARGV[6]); " + "redis.call('hset', KEYS[1], ARGV[5], val); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[5]), ARGV[5], string.len(ARGV[6]), ARGV[6]); " + + "redis.call('publish', KEYS[4], msg); " + + "return 1; " + "else " + "return 0; " + "end; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelName()), + System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); } @Override @@ -915,7 +1091,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac throw new NullPointerException("map new value can't be null"); } - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE_VALUE, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local v = redis.call('hget', KEYS[1], ARGV[2]); " + "if v == false then " + "return 0;" @@ -939,12 +1115,16 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; " + "end; " + "if expireDate > tonumber(ARGV[1]) and val == ARGV[3] then " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[4]), ARGV[4], string.len(ARGV[3]), ARGV[3]); " + + "redis.call('publish', KEYS[4], msg); " + + "local value = struct.pack('dLc0', t, string.len(ARGV[4]), ARGV[4]); " + "redis.call('hset', KEYS[1], ARGV[2], value); " + "return 1; " + "end; " + "return 0; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key, oldValue, newValue); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getUpdatedChannelName()), + System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); } @Override @@ -961,11 +1141,16 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; " + "local value = struct.pack('dLc0', t, string.len(ARGV[3]), ARGV[3]); " + "redis.call('hset', KEYS[1], ARGV[2], value); " + + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); " + + "redis.call('publish', KEYS[3], msg); " + + "return val; " + "else " + "return nil; " + "end", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key)), System.currentTimeMillis(), key, value); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getUpdatedChannelName()), + System.currentTimeMillis(), key, value); } @Override @@ -992,10 +1177,85 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "if i % 2 == 0 then " + "local val = struct.pack('dLc0', 0, string.len(value), value); " + "ARGV[i] = val; " + + "local key = ARGV[i-1];" + + + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); " + + "redis.call('publish', KEYS[2], msg); " + "end;" + "end;" + "return redis.call('hmset', KEYS[1], unpack(ARGV)); ", - Collections.singletonList(getName()), params.toArray()); + Arrays.asList(getName(), getCreatedChannelName()), params.toArray()); + } + + @Override + public int addListener(final MapEntryListener listener) { + if (listener == null) { + throw new NullPointerException(); + } + + if (listener instanceof EntryRemovedListener) { + RTopic> topic = redisson.getTopic(getRemovedChannelName(), new MapCacheEventCodec(codec)); + return topic.addListener(new MessageListener>() { + @Override + public void onMessage(String channel, List msg) { + System.out.println("channel: " + channel); + System.out.println("msg: " + msg); + + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.REMOVED, (K)msg.get(0), (V)msg.get(1), null); + ((EntryRemovedListener) listener).onRemoved(event); + } + }); + } + + if (listener instanceof EntryCreatedListener) { + RTopic> topic = redisson.getTopic(getCreatedChannelName(), new MapCacheEventCodec(codec)); + return topic.addListener(new MessageListener>() { + @Override + public void onMessage(String channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.CREATED, (K)msg.get(0), (V)msg.get(1), null); + ((EntryCreatedListener) listener).onCreated(event); + } + }); + } + + if (listener instanceof EntryUpdatedListener) { + RTopic> topic = redisson.getTopic(getUpdatedChannelName(), new MapCacheEventCodec(codec)); + return topic.addListener(new MessageListener>() { + @Override + public void onMessage(String channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.UPDATED, (K)msg.get(0), (V)msg.get(1), (V)msg.get(2)); + ((EntryUpdatedListener) listener).onUpdated(event); + } + }); + } + + if (listener instanceof EntryExpiredListener) { + RTopic> topic = redisson.getTopic(getExpiredChannelName(), new MapCacheEventCodec(codec)); + return topic.addListener(new MessageListener>() { + @Override + public void onMessage(String channel, List msg) { + EntryEvent event = new EntryEvent(RedissonMapCache.this, EntryEvent.Type.EXPIRED, (K)msg.get(0), (V)msg.get(1), null); + ((EntryExpiredListener) listener).onExpired(event); + } + }); + } + + throw new IllegalArgumentException("Wrong listener type " + listener.getClass()); + } + + @Override + public void removeListener(int listenerId) { + RTopic> removedTopic = redisson.getTopic(getRemovedChannelName(), new MapCacheEventCodec(codec)); + removedTopic.removeListener(listenerId); + + RTopic> createdTopic = redisson.getTopic(getCreatedChannelName(), new MapCacheEventCodec(codec)); + createdTopic.removeListener(listenerId); + + RTopic> updatedTopic = redisson.getTopic(getUpdatedChannelName(), new MapCacheEventCodec(codec)); + updatedTopic.removeListener(listenerId); + + RTopic> expiredTopic = redisson.getTopic(getExpiredChannelName(), new MapCacheEventCodec(codec)); + expiredTopic.removeListener(listenerId); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RMapCache.java b/redisson/src/main/java/org/redisson/api/RMapCache.java index c59be9beb..9365f74b2 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCache.java +++ b/redisson/src/main/java/org/redisson/api/RMapCache.java @@ -17,6 +17,8 @@ package org.redisson.api; import java.util.concurrent.TimeUnit; +import org.redisson.api.map.event.MapEntryListener; + /** *

Map-based cache with ability to set TTL for each entry via * {@link #put(Object, Object, long, TimeUnit)} or {@link #putIfAbsent(Object, Object, long, TimeUnit)} @@ -130,6 +132,7 @@ public interface RMapCache extends RMap, RMapCacheAsync { * @param ttl - time to live for key\value entry. * If 0 then stores infinitely. * @param ttlUnit - time unit + * * @return true if key is a new key in the hash and value was set. * false if key already exists in the hash and the value was updated. */ @@ -177,6 +180,7 @@ public interface RMapCache extends RMap, RMapCacheAsync { * @param ttl - time to live for key\value entry. * If 0 then stores infinitely. * @param ttlUnit - time unit + * * @return true if key is a new key in the hash and value was set. * false if key already exists in the hash */ @@ -218,4 +222,24 @@ public interface RMapCache extends RMap, RMapCacheAsync { @Override int size(); + /** + * Adds map entry listener + * + * @see org.redisson.api.map.event.EntryCreatedListener + * @see org.redisson.api.map.event.EntryUpdatedListener + * @see org.redisson.api.map.event.EntryRemovedListener + * @see org.redisson.api.map.event.EntryExpiredListener + * + * @param listener - entry listener + * @return listener id + */ + int addListener(MapEntryListener listener); + + /** + * Removes map entry listener + * + * @param listenerId - listener id + */ + void removeListener(int listenerId); + } diff --git a/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java b/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java index 8eb0a8698..0e7188cc6 100644 --- a/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapCacheAsync.java @@ -134,7 +134,9 @@ public interface RMapCacheAsync extends RMapAsync { * @param ttl - time to live for key\value entry. * If 0 then stores infinitely. * @param unit - time unit - * @return true if value has been set successfully + * + * @return true if key is a new key in the hash and value was set. + * false if key already exists in the hash and the value was updated. */ RFuture fastPutAsync(K key, V value, long ttl, TimeUnit unit); @@ -160,7 +162,8 @@ public interface RMapCacheAsync extends RMapAsync { * if maxIdleTime and ttl params are equal to 0 * then entry stores infinitely. - * @return true if value has been set successfully + * @return true if key is a new key in the hash and value was set. + * false if key already exists in the hash and the value was updated. */ RFuture fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); @@ -186,7 +189,8 @@ public interface RMapCacheAsync extends RMapAsync { * if maxIdleTime and ttl params are equal to 0 * then entry stores infinitely. * - * @return previous associated value + * @return true if key is a new key in the hash and value was set. + * false if key already exists in the hash */ RFuture fastPutIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit); diff --git a/redisson/src/main/java/org/redisson/api/map/event/EntryCreatedListener.java b/redisson/src/main/java/org/redisson/api/map/event/EntryCreatedListener.java new file mode 100644 index 000000000..460e426ec --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/event/EntryCreatedListener.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map.event; + +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface EntryCreatedListener extends MapEntryListener { + + void onCreated(EntryEvent event); + +} diff --git a/redisson/src/main/java/org/redisson/api/map/event/EntryEvent.java b/redisson/src/main/java/org/redisson/api/map/event/EntryEvent.java new file mode 100644 index 000000000..0fadd3bce --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/event/EntryEvent.java @@ -0,0 +1,66 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map.event; + +import org.redisson.api.RMapCache; + +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public class EntryEvent { + + public enum Type {CREATED, UPDATED, REMOVED, EXPIRED} + + private RMapCache source; + private Type type; + private K key; + private V value; + private V oldValue; + + public EntryEvent(RMapCache source, Type type, K key, V value, V oldValue) { + super(); + this.source = source; + this.type = type; + this.key = key; + this.value = value; + this.oldValue = oldValue; + } + + public RMapCache getSource() { + return source; + } + + public Type getType() { + return type; + } + + public K getKey() { + return key; + } + + public V getOldValue() { + return oldValue; + } + + public V getValue() { + return value; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/map/event/EntryExpiredListener.java b/redisson/src/main/java/org/redisson/api/map/event/EntryExpiredListener.java new file mode 100644 index 000000000..d79c9ab63 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/event/EntryExpiredListener.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map.event; + +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface EntryExpiredListener extends MapEntryListener { + + void onExpired(EntryEvent event); + +} diff --git a/redisson/src/main/java/org/redisson/api/map/event/EntryRemovedListener.java b/redisson/src/main/java/org/redisson/api/map/event/EntryRemovedListener.java new file mode 100644 index 000000000..32be8ed24 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/event/EntryRemovedListener.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map.event; + +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface EntryRemovedListener extends MapEntryListener { + + void onRemoved(EntryEvent event); + +} diff --git a/redisson/src/main/java/org/redisson/api/map/event/EntryUpdatedListener.java b/redisson/src/main/java/org/redisson/api/map/event/EntryUpdatedListener.java new file mode 100644 index 000000000..897d2d82d --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/event/EntryUpdatedListener.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map.event; + +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface EntryUpdatedListener extends MapEntryListener { + + void onUpdated(EntryEvent event); + +} diff --git a/redisson/src/main/java/org/redisson/api/map/event/MapEntryListener.java b/redisson/src/main/java/org/redisson/api/map/event/MapEntryListener.java new file mode 100644 index 000000000..4da2fe803 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/event/MapEntryListener.java @@ -0,0 +1,27 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map.event; + +import java.util.EventListener; + +/** + * + * @author Nikita Koksharov + * + */ +public interface MapEntryListener extends EventListener { + +} diff --git a/redisson/src/main/java/org/redisson/codec/CustomObjectInputStream.java b/redisson/src/main/java/org/redisson/codec/CustomObjectInputStream.java index 3947c1088..1329e513d 100644 --- a/redisson/src/main/java/org/redisson/codec/CustomObjectInputStream.java +++ b/redisson/src/main/java/org/redisson/codec/CustomObjectInputStream.java @@ -20,6 +20,11 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectStreamClass; +/** + * + * @author Nikita Koksharov + * + */ public class CustomObjectInputStream extends ObjectInputStream { private ClassLoader classLoader; diff --git a/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java b/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java new file mode 100644 index 000000000..0cad09901 --- /dev/null +++ b/redisson/src/main/java/org/redisson/codec/MapCacheEventCodec.java @@ -0,0 +1,106 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.codec; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.PlatformDependent; + +/** + * + * @author Nikita Koksharov + * + */ +public class MapCacheEventCodec implements Codec { + + private final Codec codec; + + private final Decoder decoder = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + List result = new ArrayList(3); + + Object key = MapCacheEventCodec.this.decode(buf, state, codec.getMapKeyDecoder()); + result.add(key); + + Object value = MapCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); + result.add(value); + + if (buf.isReadable()) { + Object oldValue = MapCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); + result.add(oldValue); + } + + return result; + } + }; + + public MapCacheEventCodec(Codec codec) { + super(); + this.codec = codec; + } + + @Override + public Decoder getMapValueDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapValueEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getMapKeyDecoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Encoder getMapKeyEncoder() { + throw new UnsupportedOperationException(); + } + + @Override + public Decoder getValueDecoder() { + return decoder; + } + + @Override + public Encoder getValueEncoder() { + throw new UnsupportedOperationException(); + } + + private Object decode(ByteBuf buf, State state, Decoder decoder) throws IOException { + int keyLen; + if (PlatformDependent.isWindows()) { + keyLen = buf.readIntLE(); + } else { + keyLen = (int) buf.readLongLE(); + } + ByteBuf keyBuf = buf.readSlice(keyLen); + Object key = decoder.decode(keyBuf, state); + return key; + } + +} diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java index 6f229ff90..cb86f0760 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java @@ -63,8 +63,8 @@ public class EvictionScheduler { } } - public void schedule(String name, String timeoutSetName, String maxIdleSetName) { - EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, executor); + public void schedule(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName) { + EvictionTask task = new MapCacheEvictionTask(name, timeoutSetName, maxIdleSetName, expiredChannelName, executor); EvictionTask prevTask = tasks.putIfAbsent(name, task); if (prevTask == null) { task.schedule(); diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionTask.java b/redisson/src/main/java/org/redisson/eviction/EvictionTask.java index 346b1036c..8f3daccac 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionTask.java @@ -34,7 +34,7 @@ abstract class EvictionTask implements Runnable { final Deque sizeHistory = new LinkedList(); final int minDelay = 1; - final int maxDelay = 2*60*60; + final int maxDelay = 30*60; final int keysLimit = 300; int delay = 10; diff --git a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java index 9c6e0341f..f40f43354 100644 --- a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java @@ -32,31 +32,49 @@ public class MapCacheEvictionTask extends EvictionTask { private final String name; private final String timeoutSetName; private final String maxIdleSetName; + private final String expiredChannelName; - public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, CommandAsyncExecutor executor) { + public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, CommandAsyncExecutor executor) { super(executor); this.name = name; this.timeoutSetName = timeoutSetName; this.maxIdleSetName = maxIdleSetName; + this.expiredChannelName = expiredChannelName; } @Override RFuture execute() { return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "local expiredKeys1 = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + + "for i, key in ipairs(expiredKeys1) do " + + "local v = redis.call('hget', KEYS[1], key); " + + "if v ~= false then " + + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "end;" + + "end;" + "if #expiredKeys1 > 0 then " + "redis.call('zrem', KEYS[3], unpack(expiredKeys1)); " + "redis.call('zrem', KEYS[2], unpack(expiredKeys1)); " + "redis.call('hdel', KEYS[1], unpack(expiredKeys1)); " + "end; " + "local expiredKeys2 = redis.call('zrangebyscore', KEYS[3], 0, ARGV[1], 'limit', 0, ARGV[2]); " + + "for i, key in ipairs(expiredKeys2) do " + + "local v = redis.call('hget', KEYS[1], key); " + + "if v ~= false then " + + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "end;" + + "end;" + "if #expiredKeys2 > 0 then " + "redis.call('zrem', KEYS[3], unpack(expiredKeys2)); " + "redis.call('zrem', KEYS[2], unpack(expiredKeys2)); " + "redis.call('hdel', KEYS[1], unpack(expiredKeys2)); " + "end; " + "return #expiredKeys1 + #expiredKeys2;", - Arrays.asList(name, timeoutSetName, maxIdleSetName), System.currentTimeMillis(), keysLimit); + Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName), System.currentTimeMillis(), keysLimit); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 94f0cd1c4..ad756c24d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -70,12 +70,12 @@ public class RedissonMapCacheReactive extends RedissonExpirableReactive im public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); - this.mapCache = new RedissonMapCache(id, evictionScheduler, commandExecutor, name, null); + this.mapCache = new RedissonMapCache(evictionScheduler, commandExecutor, name, null); } public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); - this.mapCache = new RedissonMapCache(id, codec, evictionScheduler, commandExecutor, name, null); + this.mapCache = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index e291fcab4..360a39493 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -50,12 +50,12 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); - instance = new RedissonMap(null, codec, commandExecutor, name, null); + instance = new RedissonMap(codec, commandExecutor, name, null); } public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); - instance = new RedissonMap(null, codec, commandExecutor, name, null); + instance = new RedissonMap(codec, commandExecutor, name, null); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 541b893f8..1f687a65e 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -14,17 +14,26 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.RMapCache; -import org.redisson.client.codec.LongCodec; +import org.redisson.api.map.event.EntryCreatedListener; +import org.redisson.api.map.event.EntryEvent; +import org.redisson.api.map.event.EntryExpiredListener; +import org.redisson.api.map.event.EntryRemovedListener; +import org.redisson.api.map.event.EntryUpdatedListener; import org.redisson.client.codec.StringCodec; import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.MsgPackJacksonCodec; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.Duration; + public class RedissonMapCacheTest extends BaseTest { public static class SimpleKey implements Serializable { @@ -671,11 +680,164 @@ public class RedissonMapCacheTest extends BaseTest { } + @Test + public void testCreatedListener() { + RMapCache map = redisson.getMapCache("simple"); + + checkCreatedListener(map, 1, 2, () -> map.put(1, 2)); + checkCreatedListener(map, 10, 2, () -> map.put(10, 2, 2, TimeUnit.SECONDS)); + checkCreatedListener(map, 2, 5, () -> map.fastPut(2, 5)); + checkCreatedListener(map, 13, 2, () -> map.fastPut(13, 2, 2, TimeUnit.SECONDS)); + checkCreatedListener(map, 3, 2, () -> map.putIfAbsent(3, 2)); + checkCreatedListener(map, 14, 2, () -> map.putIfAbsent(14, 2, 2, TimeUnit.SECONDS)); + checkCreatedListener(map, 4, 1, () -> map.fastPutIfAbsent(4, 1)); + checkCreatedListener(map, 15, 2, () -> map.fastPutIfAbsent(15, 2, 2, TimeUnit.SECONDS)); + checkCreatedListener(map, 5, 0, () -> map.addAndGet(5, 0)); + } + + private void checkCreatedListener(RMapCache map, Integer key, Integer value, Runnable runnable) { + AtomicBoolean ref = new AtomicBoolean(); + int createListener1 = map.addListener(new EntryCreatedListener() { + + @Override + public void onCreated(EntryEvent event) { + assertThat(event.getKey()).isEqualTo(key); + assertThat(event.getValue()).isEqualTo(value); + + if (!ref.compareAndSet(false, true)) { + Assert.fail(); + } + } + + }); + runnable.run(); + + Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref); + map.removeListener(createListener1); + } + + @Test + public void testUpdatedListener() { + RMapCache map = redisson.getMapCache("simple"); + + map.put(1, 1); + checkUpdatedListener(map, 1, 3, 1, () -> map.put(1, 3)); + + map.put(10, 1); + checkUpdatedListener(map, 10, 2, 1, () -> map.put(10, 2, 2, TimeUnit.SECONDS)); + + map.put(2, 1); + checkUpdatedListener(map, 2, 5, 1, () -> map.fastPut(2, 5)); + + map.put(13, 1); + checkUpdatedListener(map, 13, 2, 1, () -> map.fastPut(13, 2, 2, TimeUnit.SECONDS)); + + map.put(14, 1); + checkUpdatedListener(map, 14, 2, 1, () -> map.replace(14, 2)); + checkUpdatedListener(map, 14, 3, 2, () -> map.replace(14, 2, 3)); + + map.put(5, 1); + checkUpdatedListener(map, 5, 4, 1, () -> map.addAndGet(5, 3)); + + } + + @Test + public void testExpiredListener() { + RMapCache map = redisson.getMapCache("simple"); + + checkExpiredListener(map, 10, 2, () -> map.put(10, 2, 2, TimeUnit.SECONDS)); + checkExpiredListener(map, 13, 2, () -> map.fastPut(13, 2, 2, TimeUnit.SECONDS)); + checkExpiredListener(map, 14, 2, () -> map.putIfAbsent(14, 2, 2, TimeUnit.SECONDS)); + checkExpiredListener(map, 15, 2, () -> map.fastPutIfAbsent(15, 2, 2, TimeUnit.SECONDS)); + } + + private void checkExpiredListener(RMapCache map, Integer key, Integer value, Runnable runnable) { + AtomicBoolean ref = new AtomicBoolean(); + int createListener1 = map.addListener(new EntryExpiredListener() { + + @Override + public void onExpired(EntryEvent event) { + assertThat(event.getKey()).isEqualTo(key); + assertThat(event.getValue()).isEqualTo(value); + + if (!ref.compareAndSet(false, true)) { + Assert.fail(); + } + } + + }); + runnable.run(); + + Awaitility.await().atMost(Duration.ONE_MINUTE).untilTrue(ref); + map.removeListener(createListener1); + } + + + private void checkUpdatedListener(RMapCache map, Integer key, Integer value, Integer oldValue, Runnable runnable) { + AtomicBoolean ref = new AtomicBoolean(); + int createListener1 = map.addListener(new EntryUpdatedListener() { + + @Override + public void onUpdated(EntryEvent event) { + assertThat(event.getKey()).isEqualTo(key); + assertThat(event.getValue()).isEqualTo(value); + assertThat(event.getOldValue()).isEqualTo(oldValue); + + if (!ref.compareAndSet(false, true)) { + Assert.fail(); + } + } + + }); + runnable.run(); + + Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref); + map.removeListener(createListener1); + } + + @Test + public void testRemovedListener() { + RMapCache map = redisson.getMapCache("simple"); + + map.put(1, 1); + checkRemovedListener(map, 1, 1, () -> map.remove(1, 1)); + + map.put(10, 1); + checkRemovedListener(map, 10, 1, () -> map.remove(10)); + + map.put(2, 1); + checkRemovedListener(map, 2, 1, () -> map.fastRemove(2)); + } + + private void checkRemovedListener(RMapCache map, Integer key, Integer value, Runnable runnable) { + AtomicBoolean ref = new AtomicBoolean(); + int createListener1 = map.addListener(new EntryRemovedListener() { + + @Override + public void onRemoved(EntryEvent event) { + assertThat(event.getKey()).isEqualTo(key); + assertThat(event.getValue()).isEqualTo(value); + + if (!ref.compareAndSet(false, true)) { + Assert.fail(); + } + } + + }); + runnable.run(); + + Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(ref); + map.removeListener(createListener1); + } + + @Test public void testFastPut() throws Exception { RMapCache map = redisson.getMapCache("simple"); Assert.assertTrue(map.fastPut(1, 2)); + assertThat(map.get(1)).isEqualTo(2); Assert.assertFalse(map.fastPut(1, 3)); + assertThat(map.get(1)).isEqualTo(3); Assert.assertEquals(1, map.size()); }