From 09be99e0ce6ea1f42e6ffe3903ed4cb3941b62c5 Mon Sep 17 00:00:00 2001 From: jackygurui Date: Wed, 7 Sep 2016 00:53:27 +0100 Subject: [PATCH] Redisson Reference support for batch, reactive and reactive batch --- .../src/main/java/org/redisson/Redisson.java | 12 ++- .../main/java/org/redisson/RedissonBatch.java | 5 +- .../java/org/redisson/RedissonReactive.java | 19 +++- .../java/org/redisson/RedissonReference.java | 37 ++++++-- .../org/redisson/api/RObjectReactive.java | 4 +- .../redisson/api/RedissonReactiveClient.java | 10 +- .../command/CommandAsyncExecutor.java | 12 ++- .../redisson/command/CommandAsyncService.java | 49 ++++++++-- .../redisson/command/CommandBatchService.java | 21 ++++- .../misc/RedissonObjectFactory.java | 94 +++++++++++++++---- .../reactive/RedissonBatchReactive.java | 6 ++ .../reactive/RedissonObjectReactive.java | 5 + .../RedissonReferenceReactiveTest.java | 53 +++++++++++ .../org/redisson/RedissonReferenceTest.java | 27 +++++- 14 files changed, 309 insertions(+), 45 deletions(-) create mode 100644 redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 0a7216f3b..1e4c67c04 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -159,7 +159,11 @@ public class Redisson implements RedissonClient { * @return Redisson instance */ public static RedissonReactiveClient createReactive(Config config) { - return new RedissonReactive(config); + RedissonReactive react = new RedissonReactive(config); + if (config.isRedissonReferenceEnabled()) { + react.enableRedissonReferenceSupport(); + } + return react; } @Override @@ -489,7 +493,11 @@ public class Redisson implements RedissonClient { @Override public RBatch createBatch() { - return new RedissonBatch(evictionScheduler, connectionManager); + RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager); + if (config.isRedissonReferenceEnabled()) { + batch.enableRedissonReferenceSupport(this); + } + return batch; } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index dbc6698bf..83983556f 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -56,7 +56,7 @@ public class RedissonBatch implements RBatch { private final EvictionScheduler evictionScheduler; private final CommandBatchService executorService; - public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { + protected RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { this.executorService = new CommandBatchService(connectionManager); this.evictionScheduler = evictionScheduler; } @@ -281,5 +281,8 @@ public class RedissonBatch implements RBatch { return new RedissonListMultimapCache(evictionScheduler, codec, executorService, name); } + protected void enableRedissonReferenceSupport(Redisson redisson) { + this.executorService.enableRedissonReferenceSupport(redisson); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index bc9f39c2e..47176e68c 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -45,6 +45,7 @@ import org.redisson.api.RTopicReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; +import org.redisson.codec.CodecProvider; import org.redisson.command.CommandReactiveService; import org.redisson.config.Config; import org.redisson.config.ConfigSupport; @@ -82,7 +83,8 @@ public class RedissonReactive implements RedissonReactiveClient { protected final CommandReactiveService commandExecutor; protected final ConnectionManager connectionManager; protected final Config config; - + protected final CodecProvider codecProvider; + protected RedissonReactive(Config config) { this.config = config; Config configCopy = new Config(config); @@ -90,6 +92,7 @@ public class RedissonReactive implements RedissonReactiveClient { connectionManager = ConfigSupport.createConnectionManager(configCopy); commandExecutor = new CommandReactiveService(connectionManager); evictionScheduler = new EvictionScheduler(commandExecutor); + codecProvider = config.getCodecProvider(); } @@ -259,7 +262,11 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBatchReactive createBatch() { - return new RedissonBatchReactive(evictionScheduler, connectionManager); + RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager); + if (config.isRedissonReferenceEnabled()) { + batch.enableRedissonReferenceSupport(this); + } + return batch; } @Override @@ -272,6 +279,11 @@ public class RedissonReactive implements RedissonReactiveClient { return config; } + @Override + public CodecProvider getCodecProvider() { + return codecProvider; + } + @Override public NodesGroup getNodesGroup() { return new RedisNodes(connectionManager); @@ -300,5 +312,8 @@ public class RedissonReactive implements RedissonReactiveClient { return connectionManager.isShuttingDown(); } + protected void enableRedissonReferenceSupport() { + this.commandExecutor.enableRedissonReferenceSupport(this); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonReference.java b/redisson/src/main/java/org/redisson/RedissonReference.java index 17e5c4836..13df377aa 100644 --- a/redisson/src/main/java/org/redisson/RedissonReference.java +++ b/redisson/src/main/java/org/redisson/RedissonReference.java @@ -17,6 +17,7 @@ package org.redisson; import org.redisson.client.codec.Codec; import org.redisson.api.RObject; +import org.redisson.api.RObjectReactive; import org.redisson.api.annotation.REntity; /** @@ -37,12 +38,14 @@ public class RedissonReference { } public RedissonReference(Class type, String keyName, Codec codec) { - if (!type.isAnnotationPresent(REntity.class) && !RObject.class.isAssignableFrom(type)) { - throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject"); + if (!type.isAnnotationPresent(REntity.class) && !RObject.class.isAssignableFrom(type) && !RObjectReactive.class.isAssignableFrom(type)) { + throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive"); } - this.type = type.getName(); + this.type = RObjectReactive.class.isAssignableFrom(type) + ? type.getName().substring(0, type.getName().length() - "Reactive".length()).replaceFirst(".reactive", "") + : type.getName(); this.keyName = keyName; - this.codec = codec != null ? codec.getClass().getCanonicalName() : null; + this.codec = codec != null ? codec.getClass().getName() : null; } public boolean isDefaultCodec() { @@ -60,6 +63,17 @@ public class RedissonReference { return Class.forName(type); } + /** + * @return the type + * @throws java.lang.Exception - which could be: + * LinkageError - if the linkage fails + * ExceptionInInitializerError - if the initialization provoked by this method fails + * ClassNotFoundException - if the class cannot be located + */ + public Class getReactiveType() throws Exception { + return Class.forName(type.replaceFirst("org.redisson", "org.redisson.reactive") + "Reactive");//live object is not supported in reactive client + } + /** * @return type name in string */ @@ -67,14 +81,21 @@ public class RedissonReference { return type; } + /** + * @return type name in string + */ + public String getReactiveTypeName() { + return type + "Reactive"; + } + /** * @param type the type to set */ public void setType(Class type) { - if (!type.isAnnotationPresent(REntity.class) && !RObject.class.isAssignableFrom(type)) { - throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject"); + if (!type.isAnnotationPresent(REntity.class) && (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) { + throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive"); } - this.type = type.getCanonicalName(); + this.type = type.getName(); } /** @@ -115,7 +136,7 @@ public class RedissonReference { * @param codec the codec to set */ public void setCodecType(Class codec) { - this.codec = codec.getCanonicalName(); + this.codec = codec.getName(); } } diff --git a/redisson/src/main/java/org/redisson/api/RObjectReactive.java b/redisson/src/main/java/org/redisson/api/RObjectReactive.java index d11bfe0dc..87f0e0f3f 100644 --- a/redisson/src/main/java/org/redisson/api/RObjectReactive.java +++ b/redisson/src/main/java/org/redisson/api/RObjectReactive.java @@ -16,6 +16,7 @@ package org.redisson.api; import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; /** * Base interface for all Redisson objects @@ -26,7 +27,8 @@ import org.reactivestreams.Publisher; public interface RObjectReactive { String getName(); - + + Codec getCodec(); /** * Transfer a object from a source Redis instance to a destination Redis instance * in mode diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index ae3e6964a..71a72a95f 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.List; import org.redisson.client.codec.Codec; +import org.redisson.codec.CodecProvider; import org.redisson.config.Config; /** @@ -356,7 +357,14 @@ public interface RedissonReactiveClient { * @return Config object */ Config getConfig(); - + + /** + * Returns the CodecProvider instance + * + * @return CodecProvider + */ + CodecProvider getCodecProvider(); + /** * Get Redis nodes group for server operations * diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index ca487f00a..49eca78d9 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -28,8 +28,8 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; -import io.netty.util.concurrent.Future; -import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.api.RedissonReactiveClient; /** * @@ -40,8 +40,12 @@ public interface CommandAsyncExecutor { ConnectionManager getConnectionManager(); - CommandAsyncExecutor enableRedissonReferenceSupport(Redisson redisson); - + CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson); + + CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive); + + boolean isRedissonReferenceSupportEnabled(); + RedisException convertException(RFuture RFuture); boolean await(RFuture RFuture, long timeout, TimeUnit timeoutUnit) throws InterruptedException; diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 74651bb5e..5bba0a607 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -61,8 +61,11 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import org.redisson.Redisson; +import org.redisson.RedissonReactive; import org.redisson.RedissonReference; import org.redisson.api.RObject; +import org.redisson.api.RedissonClient; +import org.redisson.api.RedissonReactiveClient; import org.redisson.liveobject.misc.RedissonObjectFactory; /** @@ -75,7 +78,8 @@ public class CommandAsyncService implements CommandAsyncExecutor { private static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class); final ConnectionManager connectionManager; - private Redisson redisson; + protected RedissonClient redisson; + protected RedissonReactiveClient redissonReactive; public CommandAsyncService(ConnectionManager connectionManager) { this.connectionManager = connectionManager; @@ -87,13 +91,28 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public CommandAsyncExecutor enableRedissonReferenceSupport(Redisson redisson) { + public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson) { if (redisson != null) { this.redisson = redisson; + this.redissonReactive = null; + } + return this; + } + + @Override + public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) { + if (redissonReactive != null) { + this.redissonReactive = redissonReactive; + this.redisson = null; } return this; } + @Override + public boolean isRedissonReferenceSupportEnabled() { + return redisson != null || redissonReactive != null; + } + @Override public V get(RFuture future) { final CountDownLatch l = new CountDownLatch(1); @@ -450,9 +469,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { } final AsyncDetails details = AsyncDetails.acquire(); - if (redisson != null) { + if (isRedissonReferenceSupportEnabled()) { for (int i = 0; i < params.length; i++) { - RedissonReference reference = RedissonObjectFactory.toReference(redisson, params[i]); + RedissonReference reference = redisson != null + ? RedissonObjectFactory.toReference(redisson, params[i]) + : RedissonObjectFactory.toReference(redissonReactive, params[i]); params[i] = reference == null ? params[i] : reference; } } @@ -730,9 +751,25 @@ public class CommandAsyncService implements CommandAsyncExecutor { } ((RedisClientResult)res).setRedisClient(addr); } - if (redisson != null && res instanceof RedissonReference) { + + if (isRedissonReferenceSupportEnabled() && res instanceof List) { + List r = (List) res; + for (int i = 0; i < r.size(); i++) { + if (r.get(i) instanceof RedissonReference) { + try { + r.set(i ,(redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)))); + } catch (Exception exception) {//skip and carry on to next one. + } + } + } + details.getMainPromise().trySuccess(res); + } else if (isRedissonReferenceSupportEnabled() && res instanceof RedissonReference) { try { - details.getMainPromise().trySuccess(RedissonObjectFactory.fromReference(redisson, (RedissonReference) res)); + details.getMainPromise().trySuccess(redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) res) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) res)); } catch (Exception exception) { details.getMainPromise().trySuccess(res);//fallback } diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index d693f3670..1a7226704 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -50,8 +50,9 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; +import org.redisson.RedissonReference; +import org.redisson.liveobject.misc.RedissonObjectFactory; public class CommandBatchService extends CommandReactiveService { @@ -109,7 +110,14 @@ public class CommandBatchService extends CommandReactiveService { if (!readOnlyMode) { entry.setReadOnlyMode(false); } - + if (isRedissonReferenceSupportEnabled()) { + for (int i = 0; i < params.length; i++) { + RedissonReference reference = redisson != null + ? RedissonObjectFactory.toReference(redisson, params[i]) + : RedissonObjectFactory.toReference(redissonReactive, params[i]); + params[i] = reference == null ? params[i] : reference; + } + } BatchCommandData commandData = new BatchCommandData(mainPromise, codec, command, params, index.incrementAndGet()); entry.getCommands().add(commandData); } @@ -171,7 +179,14 @@ public class CommandBatchService extends CommandReactiveService { Collections.sort(entries); List result = new ArrayList(entries.size()); for (BatchCommandData commandEntry : entries) { - result.add(commandEntry.getPromise().getNow()); + Object entryResult = commandEntry.getPromise().getNow(); + if (isRedissonReferenceSupportEnabled() && entryResult instanceof RedissonReference) { + result.add(redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) entryResult) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) entryResult)); + } else { + result.add(commandEntry.getPromise().getNow()); + } } promise.setSuccess(result); commands = null; diff --git a/redisson/src/main/java/org/redisson/liveobject/misc/RedissonObjectFactory.java b/redisson/src/main/java/org/redisson/liveobject/misc/RedissonObjectFactory.java index 9d0342224..be04d086e 100644 --- a/redisson/src/main/java/org/redisson/liveobject/misc/RedissonObjectFactory.java +++ b/redisson/src/main/java/org/redisson/liveobject/misc/RedissonObjectFactory.java @@ -17,21 +17,20 @@ package org.redisson.liveobject.misc; import java.lang.reflect.Method; import java.util.Arrays; -import java.util.BitSet; import java.util.HashMap; import java.util.List; import org.redisson.RedissonReference; import org.redisson.client.codec.Codec; -import org.redisson.api.RBitSet; import org.redisson.api.RLiveObject; import org.redisson.api.RLiveObjectService; import org.redisson.api.RObject; +import org.redisson.api.RObjectReactive; import org.redisson.api.RedissonClient; +import org.redisson.api.RedissonReactiveClient; import org.redisson.api.annotation.REntity; import org.redisson.api.annotation.RId; import org.redisson.codec.CodecProvider; -import org.redisson.liveobject.provider.ResolverProvider; import org.redisson.liveobject.resolver.NamingScheme; /** @@ -41,6 +40,7 @@ import org.redisson.liveobject.resolver.NamingScheme; public class RedissonObjectFactory { private static final HashMap> builders = new HashMap>(); + private static final HashMap> reactiveBuilders = new HashMap>(); static { for (Method method : RedissonClient.class.getDeclaredMethods()) { @@ -52,7 +52,25 @@ public class RedissonObjectFactory { builders.put(cls, new HashMap()); } HashMap builder = builders.get(cls); - if (method.getParameterTypes().length == 2 + if (method.getParameterTypes().length == 2 //first param is name, second param is codec. + && Codec.class.isAssignableFrom(method.getParameterTypes()[1])) { + builder.put(Boolean.FALSE, method); + } else if (method.getParameterTypes().length == 1) { + builder.put(Boolean.TRUE, method); + } + } + } + + for (Method method : RedissonReactiveClient.class.getDeclaredMethods()) { + if (!method.getReturnType().equals(Void.TYPE) + && RObjectReactive.class.isAssignableFrom(method.getReturnType()) + && method.getName().startsWith("get")) { + Class cls = method.getReturnType(); + if (!reactiveBuilders.containsKey(cls)) { + reactiveBuilders.put(cls, new HashMap()); + } + HashMap builder = reactiveBuilders.get(cls); + if (method.getParameterTypes().length == 2 //first param is name, second param is codec. && Codec.class.isAssignableFrom(method.getParameterTypes()[1])) { builder.put(Boolean.FALSE, method); } else if (method.getParameterTypes().length == 1) { @@ -90,6 +108,30 @@ public class RedissonObjectFactory { } throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName()); } + + public static T fromReference(RedissonReactiveClient redisson, RedissonReference rr) throws Exception { + return fromReference(redisson, rr, null); + } + + public static T fromReference(RedissonReactiveClient redisson, RedissonReference rr, Class expected) throws Exception { + Class type = rr.getReactiveType(); + CodecProvider codecProvider = redisson.getConfig().getCodecProvider(); + /** + * Live Object from reference in reactive client is not supported yet. + */ + if (type != null) { + List> interfaces = Arrays.asList(type.getInterfaces()); + for (Class iType : interfaces) { + if (reactiveBuilders.containsKey(iType)) {// user cache to speed up things a little. + Method builder = reactiveBuilders.get(iType).get(rr.isDefaultCodec()); + return (T) (rr.isDefaultCodec() + ? builder.invoke(redisson, rr.getKeyName()) + : builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()))); + } + } + } + throw new ClassNotFoundException("No RObjectReactive is found to match class type of " + rr.getReactiveTypeName()+ " with codec type of " + rr.getCodecName()); + } public static RedissonReference toReference(RedissonClient redisson, Object object) { if (object instanceof RObject) { @@ -117,20 +159,40 @@ public class RedissonObjectFactory { return null; } + public static RedissonReference toReference(RedissonReactiveClient redissonReactive, Object object) { + if (object instanceof RObjectReactive) { + RObjectReactive rObject = ((RObjectReactive) object); + redissonReactive.getCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec()); + return new RedissonReference(object.getClass(), ((RObjectReactive) object).getName(), ((RObjectReactive) object).getCodec()); + } + try { + if (object instanceof RLiveObject) { + Class rEntity = object.getClass().getSuperclass(); + REntity anno = rEntity.getAnnotation(REntity.class); + NamingScheme ns = anno.namingScheme() + .getDeclaredConstructor(Codec.class) + .newInstance(redissonReactive.getCodecProvider().getCodec(anno, (Class) rEntity)); + String name = Introspectior + .getFieldsWithAnnotation(rEntity, RId.class) + .getOnly().getName(); + Class type = rEntity.getDeclaredField(name).getType(); + return new RedissonReference(rEntity, + ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId())); + } + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + return null; + } + public static T createRObject(RedissonClient redisson, Class expectedType, String name, K codec) throws Exception { List> interfaces = Arrays.asList(expectedType.getInterfaces()); - for (Method method : RedissonClient.class.getDeclaredMethods()) { - if (method.getName().startsWith("get") - && method.getReturnType().isAssignableFrom(expectedType) - && interfaces.contains(method.getReturnType())) { - if ((codec == null || RBitSet.class.isAssignableFrom(method.getReturnType())) && method.getParameterTypes().length == 1) { - return (T) method.invoke(redisson, name); - } else if (codec != null - && method.getParameterTypes().length == 2 - && String.class.equals(method.getParameterTypes()[0]) - && Codec.class.equals(method.getParameterTypes()[1])) { - return (T) method.invoke(redisson, name, codec); - } + for (Class iType : interfaces) { + if (builders.containsKey(iType)) {// user cache to speed up things a little. + Method builder = builders.get(iType).get(codec != null); + return (T) (codec != null + ? builder.invoke(redisson, name) + : builder.invoke(redisson, name, codec)); } } throw new ClassNotFoundException("No RObject is found to match class type of " + (expectedType != null ? expectedType.getName() : "null") + " with codec type of " + (codec != null ? codec.getClass().getName() : "null")); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 8696d6824..adfd96518 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -19,6 +19,7 @@ import java.util.List; import org.reactivestreams.Publisher; import org.redisson.EvictionScheduler; +import org.redisson.Redisson; import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RBatchReactive; import org.redisson.api.RBitSetReactive; @@ -37,6 +38,7 @@ import org.redisson.api.RScriptReactive; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; @@ -201,4 +203,8 @@ public class RedissonBatchReactive implements RBatchReactive { return new NettyFuturePublisher>(executorService.executeAsync()); } + public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) { + this.executorService.enableRedissonReferenceSupport(redissonReactive); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 6c4d70970..2519dbc2d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -60,6 +60,11 @@ abstract class RedissonObjectReactive implements RObjectReactive { return name; } + @Override + public Codec getCodec() { + return codec; + } + @Override public Publisher rename(String newName) { return commandExecutor.writeReactive(getName(), RedisCommands.RENAME, getName(), newName); diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java new file mode 100644 index 000000000..2d4e0c324 --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java @@ -0,0 +1,53 @@ +package org.redisson; + +import java.util.List; +import static org.junit.Assert.*; +import org.junit.Test; +import org.redisson.api.RBatchReactive; +import org.redisson.api.RBucketReactive; +import org.redisson.reactive.RedissonBucketReactive; +import org.redisson.reactive.RedissonMapCacheReactive; + +/** + * + * @author Rui Gu (https://github.com/jackygurui) + */ +public class RedissonReferenceReactiveTest extends BaseReactiveTest { + + @Test + public void test() throws InterruptedException { + RBucketReactive b1 = redisson.getBucket("b1"); + RBucketReactive b2 = redisson.getBucket("b2"); + RBucketReactive b3 = redisson.getBucket("b3"); + sync(b2.set(b3)); + sync(b1.set(redisson.getBucket("b2"))); + assertTrue(sync(b1.get()).getClass().equals(RedissonBucketReactive.class)); + assertEquals("b3", ((RBucketReactive) sync(((RBucketReactive) sync(b1.get())).get())).getName()); + RBucketReactive b4 = redisson.getBucket("b4"); + sync(b4.set(redisson.getMapCache("testCache"))); + assertTrue(sync(b4.get()) instanceof RedissonMapCacheReactive); + sync(((RedissonMapCacheReactive) sync(b4.get())).fastPut(b1, b2)); + assertEquals("b2", ((RBucketReactive) sync(((RedissonMapCacheReactive) sync(b4.get())).get(b1))).getName()); + } + + @Test + public void testBatch() throws InterruptedException { + RBatchReactive batch = redisson.createBatch(); + RBucketReactive b1 = batch.getBucket("b1"); + RBucketReactive b2 = batch.getBucket("b2"); + RBucketReactive b3 = batch.getBucket("b3"); + b2.set(b3); + b1.set(b2); + b3.set(b1); + sync(batch.execute()); + + batch = redisson.createBatch(); + batch.getBucket("b1").get(); + batch.getBucket("b2").get(); + batch.getBucket("b3").get(); + List result = (List) sync(batch.execute()); + assertEquals("b2", result.get(0).getName()); + assertEquals("b3", result.get(1).getName()); + assertEquals("b1", result.get(2).getName()); + } +} diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java index 2cc0fcb72..383ae0a77 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java @@ -1,9 +1,12 @@ package org.redisson; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; import org.junit.Test; +import org.redisson.api.RBatch; import org.redisson.api.RBucket; +import org.redisson.api.RBucketAsync; /** * @@ -12,7 +15,7 @@ import org.redisson.api.RBucket; public class RedissonReferenceTest extends BaseTest { @Test - public void test() { + public void testBasic() { RBucket b1 = redisson.getBucket("b1"); RBucket b2 = redisson.getBucket("b2"); RBucket b3 = redisson.getBucket("b3"); @@ -26,4 +29,26 @@ public class RedissonReferenceTest extends BaseTest { ((RedissonMapCache) b4.get()).fastPut(b1, b2, 1, TimeUnit.MINUTES); assertEquals("b2", ((RBucket)((RedissonMapCache) b4.get()).get(b1)).getName()); } + + @Test + public void testBatch() { + RBatch batch = redisson.createBatch(); + RBucketAsync b1 = batch.getBucket("b1"); + RBucketAsync b2 = batch.getBucket("b2"); + RBucketAsync b3 = batch.getBucket("b3"); + b2.setAsync(b3); + b1.setAsync(b2); + b3.setAsync(b1); + batch.execute(); + + batch = redisson.createBatch(); + batch.getBucket("b1").getAsync(); + batch.getBucket("b2").getAsync(); + batch.getBucket("b3").getAsync(); + List result = (List)batch.execute(); + assertEquals("b2", result.get(0).getName()); + assertEquals("b3", result.get(1).getName()); + assertEquals("b1", result.get(2).getName()); + } + }