From 11602d914fbff136d1c37bdd16b7c5c35ea9d34e Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 14 Sep 2016 15:08:33 +0300 Subject: [PATCH 1/6] refactoring --- .../java/org/redisson/RedissonReference.java | 8 +- .../redisson/command/CommandAsyncService.java | 121 +++++++++--------- .../redisson/misc/RedissonObjectFactory.java | 13 +- 3 files changed, 77 insertions(+), 65 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReference.java b/redisson/src/main/java/org/redisson/RedissonReference.java index 45a9bcd96..83f2b276b 100644 --- a/redisson/src/main/java/org/redisson/RedissonReference.java +++ b/redisson/src/main/java/org/redisson/RedissonReference.java @@ -86,10 +86,6 @@ public class RedissonReference { this.codec = codec != null ? codec.getClass().getName() : null; } - public boolean isDefaultCodec() { - return codec == null; - } - /** * @return the type * @throws java.lang.Exception - which could be: @@ -152,6 +148,10 @@ public class RedissonReference { public void setKeyName(String keyName) { this.keyName = keyName; } + + public String getCodec() { + return codec; + } /** * @return the codec diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 46276a37c..2433fd0b6 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -768,73 +768,80 @@ public class CommandAsyncService implements CommandAsyncExecutor { ((RedisClientResult)res).setRedisClient(addr); } - if (isRedissonReferenceSupportEnabled() && (res instanceof List || res instanceof ListScanResult)) { - List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res; - for (int i = 0; i < r.size(); i++) { - if (r.get(i) instanceof RedissonReference) { - try { - r.set(i ,(redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)))); - } catch (Exception exception) {//skip and carry on to next one. - } - } else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) { - try { - ScoredEntry se = ((ScoredEntry) r.get(i)); - r.set(i ,new ScoredEntry(se.getScore(), redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue()))); - } catch (Exception exception) {//skip and carry on to next one. - } - } - } + if (isRedissonReferenceSupportEnabled()) { + handleReference(details.getMainPromise(), res); + } else { details.getMainPromise().trySuccess(res); - } else if (isRedissonReferenceSupportEnabled() && (res instanceof MapScanResult)) { - Map map = ((MapScanResult)res).getMap(); - HashMap toAdd = null; - for (Map.Entry e : (Set>) map.entrySet()) { - if (e.getValue().getObj() instanceof RedissonReference) { - try { - e.setValue(new ScanObjectEntry(e.getValue().getBuf(), redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getValue().getObj()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getValue().getObj()))); - } catch (Exception exception) {//skip and carry on to next one. - } + } + } else { + details.getMainPromise().tryFailure(future.cause()); + } + AsyncDetails.release(details); + } + + private void handleReference(RPromise mainPromise, R res) { + if (res instanceof List || res instanceof ListScanResult) { + List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res; + for (int i = 0; i < r.size(); i++) { + if (r.get(i) instanceof RedissonReference) { + try { + r.set(i ,(redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)))); + } catch (Exception exception) {//skip and carry on to next one. } - if (e.getKey().getObj() instanceof RedissonReference) { - if (toAdd == null) { - toAdd = new HashMap(); - } - toAdd.put(e.getKey(), e.getValue()); + } else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) { + try { + ScoredEntry se = ((ScoredEntry) r.get(i)); + r.set(i ,new ScoredEntry(se.getScore(), redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue()))); + } catch (Exception exception) {//skip and carry on to next one. } } - if (toAdd != null) { - for (Map.Entry e : (Set>) toAdd.entrySet()) { - try { - map.put(new ScanObjectEntry(e.getValue().getBuf(), (redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getKey().getObj()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getKey().getObj()))), map.remove(e.getKey())); - } catch (Exception exception) {//skip and carry on to next one. - } + } + mainPromise.trySuccess(res); + } else if (res instanceof MapScanResult) { + Map map = ((MapScanResult)res).getMap(); + HashMap toAdd = null; + for (Map.Entry e : map.entrySet()) { + if (e.getValue().getObj() instanceof RedissonReference) { + try { + e.setValue(new ScanObjectEntry(e.getValue().getBuf(), redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getValue().getObj()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getValue().getObj()))); + } catch (Exception exception) {//skip and carry on to next one. } } - details.getMainPromise().trySuccess(res); - } else if (isRedissonReferenceSupportEnabled() && (res instanceof MapScanResult)) { - } else if (isRedissonReferenceSupportEnabled() && res instanceof RedissonReference) { - try { - details.getMainPromise().trySuccess(redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) res) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) res)); - } catch (Exception exception) { - details.getMainPromise().trySuccess(res);//fallback + if (e.getKey().getObj() instanceof RedissonReference) { + if (toAdd == null) { + toAdd = new HashMap(); + } + toAdd.put(e.getKey(), e.getValue()); } - } else { - details.getMainPromise().trySuccess(res); + } + if (toAdd != null) { + for (Map.Entry e : toAdd.entrySet()) { + try { + map.put(new ScanObjectEntry(e.getValue().getBuf(), (redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getKey().getObj()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getKey().getObj()))), map.remove(e.getKey())); + } catch (Exception exception) {//skip and carry on to next one. + } + } + } + mainPromise.trySuccess(res); + } else if (res instanceof RedissonReference) { + try { + mainPromise.trySuccess(redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) res) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) res)); + } catch (Exception exception) { + mainPromise.trySuccess(res);//fallback } } else { - details.getMainPromise().tryFailure(future.cause()); + mainPromise.trySuccess(res); } - AsyncDetails.release(details); } } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java b/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java index cf9ddbc9a..d68181a71 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java @@ -118,8 +118,8 @@ public class RedissonObjectFactory { List> interfaces = Arrays.asList(type.getInterfaces()); for (Class iType : interfaces) { if (builders.containsKey(iType)) {// user cache to speed up things a little. - Method builder = builders.get(iType).get(rr.isDefaultCodec()); - return (T) (rr.isDefaultCodec() + Method builder = builders.get(iType).get(isDefaultCodec(rr)); + return (T) (isDefaultCodec(rr) ? builder.invoke(redisson, rr.getKeyName()) : builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()))); } @@ -128,6 +128,11 @@ public class RedissonObjectFactory { throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName()); } + public static boolean isDefaultCodec(RedissonReference rr) { + return rr.getCodec() == null; + } + + public static T fromReference(RedissonReactiveClient redisson, RedissonReference rr) throws Exception { return fromReference(redisson, rr, null); } @@ -142,8 +147,8 @@ public class RedissonObjectFactory { List> interfaces = Arrays.asList(type.getInterfaces()); for (Class iType : interfaces) { if (builders.containsKey(iType)) {// user cache to speed up things a little. - Method builder = builders.get(iType).get(rr.isDefaultCodec()); - return (T) (rr.isDefaultCodec() + Method builder = builders.get(iType).get(isDefaultCodec(rr)); + return (T) (isDefaultCodec(rr) ? builder.invoke(redisson, rr.getKeyName()) : builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()))); } From bec6ee4d4dd1b805220965c67847180254d5a908 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 14 Sep 2016 15:17:55 +0300 Subject: [PATCH 2/6] failed connection should be closed --- .../connection/pool/ConnectionPool.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 14065be65..6ac025101 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -41,6 +41,13 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +/** + * Base connection pool class + * + * @author Nikita Koksharov + * + * @param - connection type + */ abstract class ConnectionPool { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -211,8 +218,6 @@ abstract class ConnectionPool { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - releaseConnection(entry); - promiseFailure(entry, promise, future.cause()); return; } @@ -223,13 +228,13 @@ abstract class ConnectionPool { return; } - promiseSuccessful(entry, promise, conn); + connectedSuccessful(entry, promise, conn); } }); return promise; } - private void promiseSuccessful(ClientConnectionsEntry entry, RPromise promise, T conn) { + private void connectedSuccessful(ClientConnectionsEntry entry, RPromise promise, T conn) { entry.resetFailedAttempts(); if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); @@ -247,15 +252,20 @@ abstract class ConnectionPool { checkForReconnect(entry); } + releaseConnection(entry); + promise.tryFailure(cause); } private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, T conn) { int attempts = entry.incFailedAttempts(); if (attempts == config.getFailedAttempts()) { + conn.closeAsync(); checkForReconnect(entry); } else if (attempts < config.getFailedAttempts()) { releaseConnection(entry, conn); + } else { + conn.closeAsync(); } releaseConnection(entry); @@ -267,9 +277,12 @@ abstract class ConnectionPool { private RFuture promiseFailure(ClientConnectionsEntry entry, T conn) { int attempts = entry.incFailedAttempts(); if (attempts == config.getFailedAttempts()) { + conn.closeAsync(); checkForReconnect(entry); } else if (attempts < config.getFailedAttempts()) { releaseConnection(entry, conn); + } else { + conn.closeAsync(); } releaseConnection(entry); From 93dd797b1c7af1a82352ffb1b58e2ec54191337d Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 14 Sep 2016 15:18:08 +0300 Subject: [PATCH 3/6] comments added --- .../org/redisson/connection/pool/MasterConnectionPool.java | 6 ++++++ .../org/redisson/connection/pool/PubSubConnectionPool.java | 6 ++++++ .../connection/pool/SinglePubSubConnectionPool.java | 6 ++++++ .../org/redisson/connection/pool/SlaveConnectionPool.java | 6 ++++++ 4 files changed, 24 insertions(+) diff --git a/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java index 1661ef4dc..aa1dde795 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/MasterConnectionPool.java @@ -22,6 +22,12 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.ClientConnectionsEntry; +/** + * Connection pool for master node + * + * @author Nikita Koksharov + * + */ public class MasterConnectionPool extends ConnectionPool { public MasterConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index b0d7de32d..82ede0d4c 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -22,6 +22,12 @@ import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +/** + * Connection pool for Publish / Subscribe + * + * @author Nikita Koksharov + * + */ public class PubSubConnectionPool extends ConnectionPool { public PubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { diff --git a/redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java index f9b40bf3f..08741a488 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java @@ -20,6 +20,12 @@ import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +/** + * Connection pool for Publish/Subscribe used with single node + * + * @author Nikita Koksharov + * + */ public class SinglePubSubConnectionPool extends PubSubConnectionPool { public SinglePubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, diff --git a/redisson/src/main/java/org/redisson/connection/pool/SlaveConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/SlaveConnectionPool.java index c066c3f92..11bf20f97 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/SlaveConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/SlaveConnectionPool.java @@ -21,6 +21,12 @@ import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +/** + * Connection pool for slave node + * + * @author Nikita Koksharov + * + */ public class SlaveConnectionPool extends ConnectionPool { public SlaveConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, From 4051564bd1c3dafbdf6e71c35eac6bfa3c134dd8 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 14 Sep 2016 15:35:46 +0300 Subject: [PATCH 4/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 516053294..e687c238f 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ Articles [Java Remote Method Invocation with Redisson](https://dzone.com/articles/java-remote-method-invocation-with-redisson) [Java Multimaps With Redis](https://dzone.com/articles/multimaps-with-redis) -Easy to use. Quick start +Quick start =============================== #### Maven From a8a9527eb8139191d553a9c511d5afd2d09a5229 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 14 Sep 2016 16:40:34 +0300 Subject: [PATCH 5/6] Config parsing fixed --- .../main/java/org/redisson/codec/DefaultCodecProvider.java | 2 +- .../src/main/java/org/redisson/config/ConfigSupport.java | 4 ++-- .../liveobject/provider/DefaultResolverProvider.java | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/codec/DefaultCodecProvider.java b/redisson/src/main/java/org/redisson/codec/DefaultCodecProvider.java index 1c3ecf775..1c4dcd6c6 100644 --- a/redisson/src/main/java/org/redisson/codec/DefaultCodecProvider.java +++ b/redisson/src/main/java/org/redisson/codec/DefaultCodecProvider.java @@ -28,7 +28,7 @@ import org.redisson.api.annotation.RObjectField; */ public class DefaultCodecProvider implements CodecProvider { - public final ConcurrentMap, Codec> codecCache = PlatformDependent.newConcurrentHashMap(); + public transient final ConcurrentMap, Codec> codecCache = PlatformDependent.newConcurrentHashMap(); @Override public T getCodec(Class codecClass) { diff --git a/redisson/src/main/java/org/redisson/config/ConfigSupport.java b/redisson/src/main/java/org/redisson/config/ConfigSupport.java index a5685d2a9..2198c94a0 100644 --- a/redisson/src/main/java/org/redisson/config/ConfigSupport.java +++ b/redisson/src/main/java/org/redisson/config/ConfigSupport.java @@ -203,8 +203,8 @@ public class ConfigSupport { mapper.addMixIn(MasterSlaveServersConfig.class, MasterSlaveServersConfigMixIn.class); mapper.addMixIn(SingleServerConfig.class, SingleSeverConfigMixIn.class); mapper.addMixIn(Config.class, ConfigMixIn.class); - mapper.addMixIn(CodecProvider.class, ConfigMixIn.class); - mapper.addMixIn(ResolverProvider.class, ConfigMixIn.class); + mapper.addMixIn(CodecProvider.class, ClassMixIn.class); + mapper.addMixIn(ResolverProvider.class, ClassMixIn.class); mapper.addMixIn(Codec.class, ClassMixIn.class); mapper.addMixIn(RedissonNodeInitializer.class, ClassMixIn.class); mapper.addMixIn(LoadBalancer.class, ClassMixIn.class); diff --git a/redisson/src/main/java/org/redisson/liveobject/provider/DefaultResolverProvider.java b/redisson/src/main/java/org/redisson/liveobject/provider/DefaultResolverProvider.java index 12cec345f..cda30f7db 100644 --- a/redisson/src/main/java/org/redisson/liveobject/provider/DefaultResolverProvider.java +++ b/redisson/src/main/java/org/redisson/liveobject/provider/DefaultResolverProvider.java @@ -25,10 +25,11 @@ import org.redisson.liveobject.resolver.Resolver; * @author Rui Gu (https://github.com/jackygurui) */ public class DefaultResolverProvider implements ResolverProvider { - public final ConcurrentMap, Resolver> providerCache = PlatformDependent.newConcurrentHashMap(); + + public transient final ConcurrentMap, Resolver> providerCache = PlatformDependent.newConcurrentHashMap(); @Override - public Resolver getResolver(Class cls, Class resolverClass, Annotation anno) { + public Resolver getResolver(Class cls, Class resolverClass, Annotation anno) { if (!providerCache.containsKey(resolverClass)) { try { providerCache.putIfAbsent(resolverClass, resolverClass.newInstance()); From ccaebb29fbb2ebffd7d008b3edfa79d9db95085b Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 14 Sep 2016 17:05:04 +0300 Subject: [PATCH 6/6] LocalCachedMap should implement RMap interface. #592 --- .../org/redisson/RedissonLocalCachedMap.java | 526 +++++++++++++++--- .../main/java/org/redisson/RedissonMap.java | 7 +- .../java/org/redisson/RedissonMapCache.java | 8 +- .../org/redisson/api/RLocalCachedMap.java | 29 +- .../main/java/org/redisson/api/RMapAsync.java | 2 +- .../java/org/redisson/api/RMapReactive.java | 2 +- .../reactive/RedissonMapCacheReactive.java | 5 +- .../reactive/RedissonMapReactive.java | 3 +- .../redisson/RedissonLocalCachedMapTest.java | 272 ++++++++- .../RedissonMapCacheReactiveTest.java | 8 +- .../org/redisson/RedissonMapReactiveTest.java | 16 +- 11 files changed, 723 insertions(+), 155 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 8888af165..6073aecf5 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -15,13 +15,17 @@ */ package org.redisson; +import java.math.BigDecimal; import java.util.AbstractCollection; import java.util.AbstractMap; import java.util.AbstractSet; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; @@ -30,31 +34,36 @@ import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.RFuture; import org.redisson.api.RLocalCachedMap; -import org.redisson.api.RMap; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.NumberConvertor; +import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.Cache; import org.redisson.misc.Hash; import org.redisson.misc.LFUCacheMap; import org.redisson.misc.LRUCacheMap; import org.redisson.misc.NoneCacheMap; +import org.redisson.misc.RPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.ThreadLocalRandom; /** * * @author Nikita Koksharov * */ -public class RedissonLocalCachedMap extends RedissonExpirable implements RLocalCachedMap { +public class RedissonLocalCachedMap extends RedissonMap implements RLocalCachedMap { public static class LocalCachedMapClear { @@ -62,16 +71,22 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R public static class LocalCachedMapInvalidate { + private byte[] excludedId; private byte[] keyHash; public LocalCachedMapInvalidate() { } - public LocalCachedMapInvalidate(byte[] keyHash) { + public LocalCachedMapInvalidate(byte[] excludedId, byte[] keyHash) { super(); this.keyHash = keyHash; + this.excludedId = excludedId; } - + + public byte[] getExcludedId() { + return excludedId; + } + public byte[] getKeyHash() { return keyHash; } @@ -162,12 +177,14 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R } } - + + private static final RedisCommand> ALL_KEYS = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); + private static final RedisCommand>> ALL_ENTRIES = new RedisCommand>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); private static final RedisCommand EVAL_PUT = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE); private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE); + private byte[] id; private RTopic invalidationTopic; - private RMap map; private Cache cache; private int invalidateEntryOnChange; private int invalidationListenerId; @@ -183,7 +200,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R } private void init(RedissonClient redisson, String name, LocalCachedMapOptions options) { - map = redisson.getMap(name); + id = generateId(); if (options.isInvalidateEntryOnChange()) { invalidateEntryOnChange = 1; @@ -207,34 +224,17 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R cache.clear(); } if (msg instanceof LocalCachedMapInvalidate) { - CacheKey key = new CacheKey(((LocalCachedMapInvalidate)msg).getKeyHash()); - cache.remove(key); + LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg; + if (!Arrays.equals(invalidateMsg.getExcludedId(), id)) { + CacheKey key = new CacheKey(invalidateMsg.getKeyHash()); + cache.remove(key); + } } } }); } } - @Override - public int size() { - return get(sizeAsync()); - } - - @Override - public RFuture sizeAsync() { - return map.sizeAsync(); - } - - @Override - public boolean isEmpty() { - return map.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return get(containsKeyAsync(key)); - } - private CacheKey toCacheKey(Object key) { byte[] encoded = encodeMapKey(key); return toCacheKey(encoded); @@ -248,30 +248,20 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R public RFuture containsKeyAsync(Object key) { CacheKey cacheKey = toCacheKey(key); if (!cache.containsKey(cacheKey)) { - return map.containsKeyAsync(key); + return super.containsKeyAsync(key); } return newSucceededFuture(true); } - @Override - public boolean containsValue(Object value) { - return get(containsValueAsync(value)); - } - @Override public RFuture containsValueAsync(Object value) { CacheValue cacheValue = new CacheValue(null, value); if (!cache.containsValue(cacheValue)) { - return map.containsValueAsync(value); + return super.containsValueAsync(value); } return newSucceededFuture(true); } - @Override - public V get(Object key) { - return get(getAsync(key)); - } - @Override public RFuture getAsync(final Object key) { if (key == null) { @@ -284,7 +274,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R return newSucceededFuture((V)cacheValue.getValue()); } - RFuture future = map.getAsync((K)key); + RFuture future = super.getAsync((K)key); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -300,13 +290,14 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R }); return future; } - - - @Override - public V put(K key, V value) { - return get(putAsync(key, value)); - } + protected byte[] generateId() { + byte[] id = new byte[16]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + ThreadLocalRandom.current().nextBytes(id); + return id; + } + @Override public RFuture putAsync(K key, V value) { if (key == null) { @@ -318,7 +309,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R byte[] mapKey = encodeMapKey(key); CacheKey cacheKey = toCacheKey(mapKey); - byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash())); + byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); CacheValue cacheValue = new CacheValue(key, value); cache.put(cacheKey, cacheValue); return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT, @@ -331,11 +322,6 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R mapKey, encodeMapValue(value), msg, invalidateEntryOnChange); } - @Override - public boolean fastPut(K key, V value) { - return get(fastPutAsync(key, value)); - } - @Override public RFuture fastPutAsync(K key, V value) { if (key == null) { @@ -348,7 +334,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R byte[] encodedKey = encodeMapKey(key); byte[] encodedValue = encodeMapKey(value); CacheKey cacheKey = toCacheKey(encodedKey); - byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash())); + byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); CacheValue cacheValue = new CacheValue(key, value); cache.put(cacheKey, cacheValue); return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, @@ -370,11 +356,6 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R } } - @Override - public V remove(Object key) { - return get(removeAsync((K)key)); - } - @Override public RFuture removeAsync(K key) { if (key == null) { @@ -383,7 +364,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R byte[] keyEncoded = encodeMapKey(key); CacheKey cacheKey = toCacheKey(keyEncoded); - byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash())); + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); cache.remove(cacheKey); return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " @@ -396,30 +377,40 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R } @Override - public boolean fastRemove(Object key) { - return get(fastRemoveAsync((K)key)); - } - - @Override - public RFuture fastRemoveAsync(K key) { - if (key == null) { + public RFuture fastRemoveAsync(K ... keys) { + if (keys == null) { throw new NullPointerException(); } - byte[] keyEncoded = encodeMapKey(key); - CacheKey cacheKey = toCacheKey(keyEncoded); - byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash())); - cache.remove(cacheKey); - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, - "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then " - + "if ARGV[3] == '1' then " - + "redis.call('publish', KEYS[2], ARGV[2]); " - + "end; " - + "return 1;" + List params = new ArrayList(); + params.add(invalidateEntryOnChange); + for (K k : keys) { + byte[] keyEncoded = encodeMapKey(k); + params.add(keyEncoded); + + CacheKey cacheKey = toCacheKey(keyEncoded); + cache.remove(cacheKey); + if (invalidateEntryOnChange == 1) { + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + params.add(msgEncoded); + } else { + params.add(null); + } + } + + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + "local counter = 0; " + + "for j = 2, #ARGV, 2 do " + + "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then " + + "if ARGV[1] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[j+1]); " + + "end; " + + "counter = counter + 1;" + + "end;" + "end;" - + "return 0;", + + "return counter;", Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), - keyEncoded, msgEncoded, invalidateEntryOnChange); + params.toArray()); } @@ -432,17 +423,15 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R cacheMap.put(cacheKey, cacheValue); } cache.putAll(cacheMap); - map.putAll(m); - for (CacheKey cacheKey : cacheMap.keySet()) { - invalidationTopic.publish(new LocalCachedMapInvalidate(cacheKey.getKeyHash())); + super.putAll(m); + + if (invalidateEntryOnChange == 1) { + for (CacheKey cacheKey : cacheMap.keySet()) { + invalidationTopic.publish(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + } } } - @Override - public void clear() { - delete(); - } - @Override public RFuture deleteAsync() { cache.clear(); @@ -523,7 +512,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R @Override public Iterator iterator() { - return new CompositeIterable(cacheKeySetIterator(), map.keySet().iterator()) { + return new CompositeIterable(cacheKeySetIterator(), RedissonLocalCachedMap.super.keySet().iterator()) { @Override boolean isCacheContains(Object object) { @@ -600,7 +589,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R final class EntrySet extends AbstractSet> { public final Iterator> iterator() { - return new CompositeIterable>(cacheEntrySetIterator(), map.entrySet().iterator()) { + return new CompositeIterable>(cacheEntrySetIterator(), RedissonLocalCachedMap.super.entrySet().iterator()) { @Override boolean isCacheContains(Map.Entry entry) { @@ -625,7 +614,7 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R Map.Entry e = (Map.Entry) o; Object key = e.getKey(); Object value = e.getValue(); - return RedissonLocalCachedMap.this.map.remove(key, value); + return RedissonLocalCachedMap.super.remove(key, value); } return false; } @@ -696,5 +685,360 @@ public class RedissonLocalCachedMap extends RedissonExpirable implements R } } + + @Override + public RFuture> getAllAsync(Set keys) { + final Map result = new HashMap(); + Set mapKeys = new HashSet(keys); + for (Iterator iterator = mapKeys.iterator(); iterator.hasNext();) { + K key = iterator.next(); + CacheValue value = cache.get(key); + if (value != null) { + result.put(key, (V)value.getValue()); + iterator.remove(); + } + } + + final RPromise> promise = newPromise(); + RFuture> future = super.getAllAsync(mapKeys); + future.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + Map map = future.getNow(); + result.putAll(map); + + cacheMap(map); + + promise.trySuccess(result); + } + + }); + return promise; + } + + private void cacheMap(Map map) { + for (java.util.Map.Entry entry : map.entrySet()) { + byte[] mapKey = encodeMapKey(entry.getKey()); + CacheKey cacheKey = toCacheKey(mapKey); + CacheValue cacheValue = new CacheValue(entry.getKey(), entry.getValue()); + cache.put(cacheKey, cacheValue); + } + } + + @Override + public RFuture putAllAsync(final Map map) { + if (map.isEmpty()) { + return newSucceededFuture(null); + } + + List params = new ArrayList(map.size()*3); + List msgs = new ArrayList(map.size()); + params.add(invalidateEntryOnChange); + params.add(map.size()*2); + for (java.util.Map.Entry t : map.entrySet()) { + byte[] mapKey = encodeMapKey(t.getKey()); + byte[] mapValue = encodeMapValue(t.getValue()); + params.add(mapKey); + params.add(mapValue); + CacheKey cacheKey = toCacheKey(mapKey); + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + msgs.add(msgEncoded); + } + params.addAll(msgs); + + RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, + "redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" + + "if ARGV[1] == '1' then " + + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do " + + "redis.call('publish', KEYS[2], ARGV[i]); " + + "end; " + + "end;", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray()); + + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + cacheMap(map); + } + }); + return future; + } + + @Override + public RFuture addAndGetAsync(final K key, Number value) { + final byte[] keyState = encodeMapKey(key); + CacheKey cacheKey = toCacheKey(keyState); + byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + + RFuture future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand("EVAL", new NumberConvertor(value.getClass())), + "local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[3] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end; " + + "return result; ", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), + keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg); + + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + V value = future.getNow(); + if (value != null) { + CacheKey cacheKey = toCacheKey(keyState); + cache.put(cacheKey, new CacheValue(key, value)); + } + } + }); + return future; + } + + @Override + public RFuture fastPutIfAbsentAsync(final K key, final V value) { + RFuture future = super.fastPutIfAbsentAsync(key, value); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (future.getNow()) { + CacheKey cacheKey = toCacheKey(key); + cache.put(cacheKey, new CacheValue(key, value)); + } + } + }); + return future; + } + + @Override + public RFuture> readAllValuesAsync() { + final List result = new ArrayList(); + final List mapKeys = new ArrayList(); + for (CacheValue value : cache.values()) { + mapKeys.add(encodeMapKey(value.getKey())); + result.add((V) value.getValue()); + } + + final RPromise> promise = newPromise(); + RFuture> future = commandExecutor.evalReadAsync(getName(), codec, ALL_KEYS, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local result = {};" + + "for j = 1, #entries, 2 do " + + "local founded = false;" + + "for i = 1, #ARGV, 1 do " + + "if ARGV[i] == entries[j] then " + + "founded = true;" + + "end;" + + "end; " + + "if founded == false then " + + "table.insert(result, entries[j+1]);" + + "end;" + + "end; " + + "return result; ", + Arrays.asList(getName()), + mapKeys.toArray()); + + future.addListener(new FutureListener>() { + + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + result.addAll(future.get()); + promise.trySuccess(result); + } + }); + + return promise; + } + + @Override + public RFuture>> readAllEntrySetAsync() { + final Set> result = new HashSet>(); + List mapKeys = new ArrayList(); + for (CacheValue value : cache.values()) { + mapKeys.add(encodeMapKey(value.getKey())); + result.add(new AbstractMap.SimpleEntry((K)value.getKey(), (V)value.getValue())); + } + + final RPromise>> promise = newPromise(); + RFuture>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_ENTRIES, + "local entries = redis.call('hgetall', KEYS[1]); " + + "local result = {};" + + "for j = 1, #entries, 2 do " + + "local founded = false;" + + "for i = 1, #ARGV, 1 do " + + "if ARGV[i] == entries[j] then " + + "founded = true;" + + "end;" + + "end; " + + "if founded == false then " + + "table.insert(result, entries[j]);" + + "table.insert(result, entries[j+1]);" + + "end;" + + "end; " + + "return result; ", + Arrays.asList(getName()), + mapKeys.toArray()); + + future.addListener(new FutureListener>>() { + @Override + public void operationComplete(Future>> future) throws Exception { + if (!future.isSuccess()) { + return; + } + + result.addAll(future.getNow()); + promise.trySuccess(result); + } + }); + + return promise; + } + + @Override + public RFuture replaceAsync(final K key, final V value) { + final byte[] keyState = encodeMapKey(key); + byte[] valueState = encodeMapValue(value); + final CacheKey cacheKey = toCacheKey(keyState); + byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + + RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, + "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " + + "local v = redis.call('hget', KEYS[1], ARGV[1]); " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[3] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end; " + + "return v; " + + "else " + + "return nil; " + + "end", + Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0)), + keyState, valueState, invalidateEntryOnChange, msg); + + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (future.getNow() != null) { + CacheKey cacheKey = toCacheKey(key); + cache.put(cacheKey, new CacheValue(key, value)); + } + } + }); + + return future; + } + + @Override + public RFuture replaceAsync(final K key, V oldValue, final V newValue) { + final byte[] keyState = encodeMapKey(key); + byte[] oldValueState = encodeMapValue(oldValue); + byte[] newValueState = encodeMapValue(newValue); + final CacheKey cacheKey = toCacheKey(keyState); + byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + + RFuture future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); " + + "if ARGV[4] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[5]); " + + "end; " + + "return 1; " + + "else " + + "return 0; " + + "end", + Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0)), + keyState, oldValueState, newValueState, invalidateEntryOnChange, msg); + + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (future.getNow()) { + cache.put(cacheKey, new CacheValue(key, newValue)); + } + } + }); + + return future; + } + + @Override + public RFuture removeAsync(Object key, Object value) { + final byte[] keyState = encodeMapKey(key); + byte[] valueState = encodeMapValue(value); + final CacheKey cacheKey = toCacheKey(keyState); + byte[] msg = encode(new LocalCachedMapInvalidate(id, cacheKey.getKeyHash())); + + RFuture future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + + "if ARGV[3] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end; " + + "return redis.call('hdel', KEYS[1], ARGV[1]) " + + "else " + + "return 0 " + + "end", + Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0)), + keyState, valueState, invalidateEntryOnChange, msg); + + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (future.getNow()) { + cache.remove(cacheKey); + } + } + }); + return future; + } + + @Override + public RFuture putIfAbsentAsync(final K key, final V value) { + RFuture future = super.putIfAbsentAsync(key, value); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (future.getNow() == null) { + CacheKey cacheKey = toCacheKey(key); + cache.put(cacheKey, new CacheValue(key, value)); + } + } + }); + return future; + } } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index d10bef7e6..33a0c4032 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -38,7 +38,6 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; -import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -59,7 +58,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); - static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 4, ValueType.MAP); + static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP); static final RedisCommand EVAL_PUT = EVAL_REPLACE; protected RedissonMap(CommandAsyncExecutor commandExecutor, String name) { @@ -244,11 +243,11 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public boolean remove(Object key, Object value) { - return get(removeAsync(key, value)) == 1; + return get(removeAsync(key, value)); } @Override - public RFuture removeAsync(Object key, Object value) { + public RFuture removeAsync(Object key, Object value) { return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "return redis.call('hdel', KEYS[1], ARGV[1]) " diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index fa9cffba0..4f4c172f7 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -34,7 +34,6 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; -import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder; @@ -42,9 +41,7 @@ import org.redisson.client.protocol.decoder.MapCacheScanResult; import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ObjectListDecoder; -import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapDecoder; -import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; @@ -77,9 +74,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); private static final RedisCommand EVAL_HMSET = new RedisCommand("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP); - private static final RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP); private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 5, ValueType.MAP); + private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 5, ValueType.MAP); private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand EVAL_FAST_PUT_TTL = new RedisCommand("EVAL", new BooleanReplayConvertor(), 9, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand EVAL_GET_TTL = new RedisCommand("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE); @@ -283,7 +279,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture removeAsync(Object key, Object value) { + public RFuture removeAsync(Object key, Object value) { return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[1]); " + "if value == false then " diff --git a/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java b/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java index 8a99bb3b3..d0ee6b7da 100644 --- a/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/api/RLocalCachedMap.java @@ -15,8 +15,6 @@ */ package org.redisson.api; -import java.util.Map; - /** * Map object with entry cache support. *

@@ -28,31 +26,6 @@ import java.util.Map; * @param * @param */ -public interface RLocalCachedMap extends Map, RExpirable, RLocalCachedMapAsync, RDestroyable { +public interface RLocalCachedMap extends RMap, RExpirable, RDestroyable { - /** - * Associates the specified value with the specified key. - *

- * Works faster than RLocalCachedMap.put but not returning - * the previous value associated with key - * - * @param key - * @param value - * @return true if key is a new key in the hash and value was set. - * false if key already exists in the hash and the value was updated. - */ - boolean fastPut(K key, V value); - - /** - * Removes key from map - *

- * Works faster than RLocalCachedMap.remove but not returning - * the value associated with key - * - * @param key - * @return true if key has been deleted. - * false if key doesn't exist. - */ - boolean fastRemove(Object key); - } diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index 4e54df9ca..d43edfd22 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -100,7 +100,7 @@ public interface RMapAsync extends RExpirableAsync { RFuture replaceAsync(K key, V oldValue, V newValue); - RFuture removeAsync(Object key, Object value); + RFuture removeAsync(Object key, Object value); RFuture putIfAbsentAsync(K key, V value); diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index df1b2e85c..a2795374d 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -77,7 +77,7 @@ public interface RMapReactive extends RExpirableReactive { Publisher replace(K key, V oldValue, V newValue); - Publisher remove(Object key, Object value); + Publisher remove(Object key, Object value); Publisher putIfAbsent(K key, V value); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index c7cd7e829..dc7599072 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -36,7 +36,6 @@ import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.Convertor; -import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; @@ -74,7 +73,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im private static final RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new LongReplayConvertor(), 5, ValueType.MAP); + private static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 5, ValueType.MAP); private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); private static final RedisCommand> EVAL_GET_TTL = new RedisCommand>("EVAL", new TTLMapValueReplayDecoder(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE); private static final RedisCommand> EVAL_CONTAINS_KEY = new RedisCommand>("EVAL", new ObjectListReplayDecoder(), 5, ValueType.MAP_KEY); @@ -225,7 +224,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im } @Override - public Publisher remove(Object key, Object value) { + public Publisher remove(Object key, Object value) { return commandExecutor.evalWriteReactive(getName(), codec, EVAL_REMOVE_VALUE, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "redis.call('zrem', KEYS[2], ARGV[1]); " diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index 4c882f5ec..918e330dc 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -78,6 +78,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme return reactive(instance.getAllAsync(keys)); } + @Override public Publisher putAll(Map map) { return reactive(instance.putAllAsync(map)); } @@ -88,7 +89,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme } @Override - public Publisher remove(Object key, Object value) { + public Publisher remove(Object key, Object value) { return reactive(instance.removeAsync(key, value)); } diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 7430cf875..d98933d99 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -2,6 +2,9 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -9,9 +12,12 @@ import org.junit.Assert; import org.junit.Test; import org.redisson.RedissonLocalCachedMap.CacheKey; import org.redisson.RedissonLocalCachedMap.CacheValue; +import org.redisson.RedissonMapTest.SimpleKey; +import org.redisson.RedissonMapTest.SimpleValue; import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.RLocalCachedMap; +import org.redisson.api.RMap; import org.redisson.misc.Cache; import mockit.Deencapsulation; @@ -62,8 +68,8 @@ public class RedissonLocalCachedMapTest extends BaseTest { map2.put("2", 4); Thread.sleep(50); - assertThat(cache1.size()).isEqualTo(0); - assertThat(cache2.size()).isEqualTo(0); + assertThat(cache1.size()).isEqualTo(1); + assertThat(cache2.size()).isEqualTo(1); } @Test @@ -197,18 +203,267 @@ public class RedissonLocalCachedMapTest extends BaseTest { @Test public void testPut() { RLocalCachedMap map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); map.put("12", 1); map.put("14", 2); map.put("15", 3); - - Deencapsulation.setField(map, "map", null); - + + assertThat(cache).containsValues(new CacheValue("12", 1), new CacheValue("12", 2), new CacheValue("15", 3)); assertThat(map.get("12")).isEqualTo(1); assertThat(map.get("14")).isEqualTo(2); assertThat(map.get("15")).isEqualTo(3); + + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults()); + + assertThat(map1.get("12")).isEqualTo(1); + assertThat(map1.get("14")).isEqualTo(2); + assertThat(map1.get("15")).isEqualTo(3); + } + + @Test + public void testGetAll() { + RMap map = redisson.getLocalCachedMap("getAll", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put("1", 100); + map.put("2", 200); + map.put("3", 300); + map.put("4", 400); + + assertThat(cache.size()).isEqualTo(4); + Map filtered = map.getAll(new HashSet(Arrays.asList("2", "3", "5"))); + + Map expectedMap = new HashMap(); + expectedMap.put("2", 200); + expectedMap.put("3", 300); + assertThat(filtered).isEqualTo(expectedMap); + + RMap map1 = redisson.getLocalCachedMap("getAll", LocalCachedMapOptions.defaults()); + + Map filtered1 = map1.getAll(new HashSet(Arrays.asList("2", "3", "5"))); + + assertThat(filtered1).isEqualTo(expectedMap); + } + + @Test + public void testPutAll() { + Map map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Map map1 = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + Cache cache1 = Deencapsulation.getField(map1, "cache"); + map.put(1, "1"); + map.put(2, "2"); + map.put(3, "3"); + + Map joinMap = new HashMap(); + joinMap.put(4, "4"); + joinMap.put(5, "5"); + joinMap.put(6, "6"); + map.putAll(joinMap); + + assertThat(cache.size()).isEqualTo(6); + assertThat(cache1.size()).isEqualTo(0); + assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6); + + map1.putAll(joinMap); + + assertThat(cache.size()).isEqualTo(3); + assertThat(cache1.size()).isEqualTo(3); + } + + @Test + public void testAddAndGet() throws InterruptedException { + RMap map = redisson.getLocalCachedMap("getAll", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(1, 100); + + Integer res = map.addAndGet(1, 12); + assertThat(cache.size()).isEqualTo(1); + assertThat(res).isEqualTo(112); + res = map.get(1); + assertThat(res).isEqualTo(112); + + RMap map2 = redisson.getLocalCachedMap("getAll2", LocalCachedMapOptions.defaults()); + map2.put(1, new Double(100.2)); + + Double res2 = map2.addAndGet(1, new Double(12.1)); + assertThat(res2).isEqualTo(112.3); + res2 = map2.get(1); + assertThat(res2).isEqualTo(112.3); + + RMap mapStr = redisson.getLocalCachedMap("mapStr", LocalCachedMapOptions.defaults()); + assertThat(mapStr.put("1", 100)).isNull(); + + assertThat(mapStr.addAndGet("1", 12)).isEqualTo(112); + assertThat(mapStr.get("1")).isEqualTo(112); + assertThat(cache.size()).isEqualTo(1); + } + + @Test + public void testFastPutIfAbsent() throws Exception { + RMap map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + + SimpleKey key = new SimpleKey("1"); + SimpleValue value = new SimpleValue("2"); + map.put(key, value); + assertThat(map.fastPutIfAbsent(key, new SimpleValue("3"))).isFalse(); + assertThat(cache.size()).isEqualTo(1); + assertThat(map.get(key)).isEqualTo(value); + + SimpleKey key1 = new SimpleKey("2"); + SimpleValue value1 = new SimpleValue("4"); + assertThat(map.fastPutIfAbsent(key1, value1)).isTrue(); + assertThat(cache.size()).isEqualTo(2); + assertThat(map.get(key1)).isEqualTo(value1); + } + + @Test + public void testReadAllEntrySet() { + RMap map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(new SimpleKey("1"), new SimpleValue("2")); + map.put(new SimpleKey("33"), new SimpleValue("44")); + map.put(new SimpleKey("5"), new SimpleValue("6")); + + assertThat(map.readAllEntrySet().size()).isEqualTo(3); + assertThat(cache.size()).isEqualTo(3); + Map testMap = new HashMap<>(map); + assertThat(map.readAllEntrySet()).containsOnlyElementsOf(testMap.entrySet()); + + RMap map2 = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults()); + assertThat(map2.readAllEntrySet()).containsOnlyElementsOf(testMap.entrySet()); + } + + @Test + public void testPutIfAbsent() throws Exception { + RMap map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + + SimpleKey key = new SimpleKey("1"); + SimpleValue value = new SimpleValue("2"); + map.put(key, value); + Assert.assertEquals(value, map.putIfAbsent(key, new SimpleValue("3"))); + Assert.assertEquals(value, map.get(key)); + + SimpleKey key1 = new SimpleKey("2"); + SimpleValue value1 = new SimpleValue("4"); + Assert.assertNull(map.putIfAbsent(key1, value1)); + Assert.assertEquals(value1, map.get(key1)); + assertThat(cache.size()).isEqualTo(2); + } + + @Test + public void testRemoveValue() { + RMap map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(new SimpleKey("1"), new SimpleValue("2")); + + boolean res = map.remove(new SimpleKey("1"), new SimpleValue("2")); + Assert.assertTrue(res); + + SimpleValue val1 = map.get(new SimpleKey("1")); + Assert.assertNull(val1); + + Assert.assertEquals(0, map.size()); + assertThat(cache.size()).isEqualTo(0); + } + + @Test + public void testRemoveValueFail() { + RMap map = redisson.getLocalCachedMap("simple12", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(new SimpleKey("1"), new SimpleValue("2")); + + boolean res = map.remove(new SimpleKey("2"), new SimpleValue("1")); + Assert.assertFalse(res); + + boolean res1 = map.remove(new SimpleKey("1"), new SimpleValue("3")); + Assert.assertFalse(res1); + + SimpleValue val1 = map.get(new SimpleKey("1")); + Assert.assertEquals("2", val1.getValue()); + assertThat(cache.size()).isEqualTo(1); } + @Test + public void testReplaceOldValueFail() { + RMap map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(new SimpleKey("1"), new SimpleValue("2")); + + boolean res = map.replace(new SimpleKey("1"), new SimpleValue("43"), new SimpleValue("31")); + Assert.assertFalse(res); + + SimpleValue val1 = map.get(new SimpleKey("1")); + Assert.assertEquals("2", val1.getValue()); + assertThat(cache.size()).isEqualTo(1); + } + + @Test + public void testReplaceOldValueSuccess() { + RMap map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(new SimpleKey("1"), new SimpleValue("2")); + + boolean res = map.replace(new SimpleKey("1"), new SimpleValue("2"), new SimpleValue("3")); + Assert.assertTrue(res); + + boolean res1 = map.replace(new SimpleKey("1"), new SimpleValue("2"), new SimpleValue("3")); + Assert.assertFalse(res1); + + SimpleValue val1 = map.get(new SimpleKey("1")); + Assert.assertEquals("3", val1.getValue()); + assertThat(cache.size()).isEqualTo(1); + } + + @Test + public void testReplaceValue() { + RMap map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(new SimpleKey("1"), new SimpleValue("2")); + + SimpleValue res = map.replace(new SimpleKey("1"), new SimpleValue("3")); + Assert.assertEquals("2", res.getValue()); + assertThat(cache.size()).isEqualTo(1); + + SimpleValue val1 = map.get(new SimpleKey("1")); + Assert.assertEquals("3", val1.getValue()); + } + + @Test + public void testReadAllValues() { + RMap map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + + map.put(new SimpleKey("1"), new SimpleValue("2")); + map.put(new SimpleKey("33"), new SimpleValue("44")); + map.put(new SimpleKey("5"), new SimpleValue("6")); + assertThat(cache.size()).isEqualTo(3); + + assertThat(map.readAllValues().size()).isEqualTo(3); + Map testMap = new HashMap<>(map); + assertThat(map.readAllValues()).containsOnlyElementsOf(testMap.values()); + + RMap map2 = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + assertThat(map2.readAllValues()).containsOnlyElementsOf(testMap.values()); + } + + + @Test + public void testFastRemoveAsync() throws InterruptedException, ExecutionException { + RMap map = redisson.getLocalCachedMap("simple", LocalCachedMapOptions.defaults()); + Cache cache = Deencapsulation.getField(map, "cache"); + map.put(1, 3); + map.put(3, 5); + map.put(4, 6); + map.put(7, 8); + + assertThat(map.fastRemoveAsync(1, 3, 7).get()).isEqualTo(3); + assertThat(cache.size()).isEqualTo(1); + assertThat(map.size()).isEqualTo(1); + } + @Test public void testRemove() { RLocalCachedMap map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults()); @@ -225,13 +480,14 @@ public class RedissonLocalCachedMapTest extends BaseTest { } @Test - public void testFastRemoveAsync() throws InterruptedException, ExecutionException { + public void testFastRemove() throws InterruptedException, ExecutionException { RLocalCachedMap map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults()); map.put(1, 3); + map.put(2, 4); map.put(7, 8); - assertThat(map.fastRemoveAsync(1).get()).isTrue(); - assertThat(map.fastRemoveAsync(2).get()).isFalse(); + assertThat(map.fastRemove(1, 2)).isEqualTo(2); + assertThat(map.fastRemove(2)).isEqualTo(0); assertThat(map.size()).isEqualTo(1); } diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java index 91b1d32f7..cea260f87 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheReactiveTest.java @@ -278,8 +278,8 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest { RMapCacheReactive map = redisson.getMapCache("simple"); sync(map.put(new SimpleKey("1"), new SimpleValue("2"), 1, TimeUnit.SECONDS)); - long res = sync(map.remove(new SimpleKey("1"), new SimpleValue("2"))); - Assert.assertEquals(1, res); + boolean res = sync(map.remove(new SimpleKey("1"), new SimpleValue("2"))); + Assert.assertTrue(res); SimpleValue val1 = sync(map.get(new SimpleKey("1"))); Assert.assertNull(val1); @@ -373,9 +373,9 @@ public class RedissonMapCacheReactiveTest extends BaseReactiveTest { @Test public void testEmptyRemove() { RMapCacheReactive map = redisson.getMapCache("simple"); - Assert.assertEquals(0, sync(map.remove(1, 3)).longValue()); + assertThat(sync(map.remove(1, 3))).isEqualTo(0); sync(map.put(4, 5)); - Assert.assertEquals(1, sync(map.remove(4, 5)).longValue()); + assertThat(sync(map.remove(4, 5))).isEqualTo(1); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonMapReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonMapReactiveTest.java index 2b3de1dcc..5128055b5 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapReactiveTest.java @@ -307,8 +307,8 @@ public class RedissonMapReactiveTest extends BaseReactiveTest { RMapReactive map = redisson.getMap("simple"); sync(map.put(new SimpleKey("1"), new SimpleValue("2"))); - long size = sync(map.remove(new SimpleKey("1"), new SimpleValue("2"))); - Assert.assertEquals(1, size); + boolean size = sync(map.remove(new SimpleKey("1"), new SimpleValue("2"))); + Assert.assertTrue(size); SimpleValue val1 = sync(map.get(new SimpleKey("1"))); Assert.assertNull(val1); @@ -321,11 +321,11 @@ public class RedissonMapReactiveTest extends BaseReactiveTest { RMapReactive map = redisson.getMap("simple"); sync(map.put(new SimpleKey("1"), new SimpleValue("2"))); - long size = sync(map.remove(new SimpleKey("2"), new SimpleValue("1"))); - Assert.assertEquals(0, size); + boolean removed = sync(map.remove(new SimpleKey("2"), new SimpleValue("1"))); + Assert.assertFalse(removed); - long size2 = sync(map.remove(new SimpleKey("1"), new SimpleValue("3"))); - Assert.assertEquals(0, size2); + boolean size2 = sync(map.remove(new SimpleKey("1"), new SimpleValue("3"))); + Assert.assertFalse(size2); SimpleValue val1 = sync(map.get(new SimpleKey("1"))); Assert.assertEquals("2", val1.getValue()); @@ -444,9 +444,9 @@ public class RedissonMapReactiveTest extends BaseReactiveTest { @Test public void testEmptyRemove() { RMapReactive map = redisson.getMap("simple"); - Assert.assertEquals(0, sync(map.remove(1, 3)).intValue()); + assertThat(sync(map.remove(1, 3))).isFalse(); sync(map.put(4, 5)); - Assert.assertEquals(1, sync(map.remove(4, 5)).intValue()); + assertThat(sync(map.remove(4, 5))).isTrue(); } @Test