diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index aa600d125..1f23dfea3 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -415,7 +416,15 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS RBlockingQueueAsync queue = getBlockingQueue(responseName, codec); try { - RFuture clientsFuture = queue.putAsync(result); + RRemoteServiceResponse response; + if (result instanceof RemoteServiceResponse + && ((RemoteServiceResponse) result).getResult() instanceof Optional) { + Optional o = (Optional) ((RemoteServiceResponse) result).getResult(); + response = new RemoteServiceResponse(result.getId(), o.orElse(null)); + } else { + response = result; + } + RFuture clientsFuture = queue.putAsync(response); queue.expireAsync(timeout, TimeUnit.MILLISECONDS); clientsFuture.onComplete((res, exc) -> { @@ -423,13 +432,13 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (exc instanceof RedissonShutdownException) { return; } - log.error("Can't send response: " + result + " for request: " + request, exc); + log.error("Can't send response: " + response + " for request: " + request, exc); } resubscribe(remoteInterface, requestQueue, executor, method.getBean()); }); } catch (Exception ex) { - log.error("Can't send response: " + result + " for request: " + request, e); + log.error("Can't send response: " + result + " for request: " + request, ex); } } else { resubscribe(remoteInterface, requestQueue, executor, method.getBean()); diff --git a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java index 4b371253a..8fd47cf97 100644 --- a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java @@ -18,6 +18,7 @@ package org.redisson.remote; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.Optional; import java.util.concurrent.ConcurrentMap; import org.redisson.RedissonBucket; @@ -134,6 +135,12 @@ public class SyncRemoteProxy extends BaseRemoteProxy { if (response.getError() != null) { throw response.getError(); } + if (method.getReturnType().equals(Optional.class)) { + if (response.getResult() == null) { + return Optional.empty(); + } + return Optional.of(response.getResult()); + } return response.getResult(); } diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 5c597edd9..4dabedbb6 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -7,6 +7,7 @@ import java.io.NotSerializableException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -28,7 +29,6 @@ import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.annotation.RRemoteAsync; import org.redisson.api.annotation.RRemoteReactive; import org.redisson.api.annotation.RRemoteRx; -import org.redisson.codec.FstCodec; import org.redisson.codec.SerializationCodec; import org.redisson.remote.RemoteServiceAckTimeoutException; import org.redisson.remote.RemoteServiceTimeoutException; @@ -151,7 +151,9 @@ public class RedissonRemoteServiceTest extends BaseTest { public interface RemoteInterface { - + + Optional optionalResult(Integer value); + void cancelMethod() throws InterruptedException; void voidMethod(String name, Long param); @@ -190,6 +192,14 @@ public class RedissonRemoteServiceTest extends BaseTest { this.iterations = iterations; } + @Override + public Optional optionalResult(Integer value) { + if (value == null) { + return Optional.empty(); + } + return Optional.of(value); + } + @Override public void cancelMethod() throws InterruptedException { while (true) { @@ -262,6 +272,19 @@ public class RedissonRemoteServiceTest extends BaseTest { } + @Test + public void testOptional() { + RRemoteService remoteService = redisson.getRemoteService(); + remoteService.register(RemoteInterface.class, new RemoteImpl()); + RemoteInterface service = redisson.getRemoteService().get(RemoteInterface.class); + + Optional r1 = service.optionalResult(null); + assertThat(r1.isPresent()).isFalse(); + Optional r2 = service.optionalResult(2); + assertThat(r2.get()).isEqualTo(2); + remoteService.deregister(RemoteInterface.class); + } + @Test public void testConcurrentInvocations() { ExecutorService executorService = Executors.newFixedThreadPool(2); @@ -665,36 +688,6 @@ public class RedissonRemoteServiceTest extends BaseTest { } } - @Test - public void testInvocationWithFstCodec() { - RedissonClient server = Redisson.create(createConfig().setCodec(new FstCodec())); - RedissonClient client = Redisson.create(createConfig().setCodec(new FstCodec())); - try { - server.getRemoteService().register(RemoteInterface.class, new RemoteImpl()); - - RemoteInterface service = client.getRemoteService().get(RemoteInterface.class); - - assertThat(service.resultMethod(21L)).as("Should be compatible with FstCodec").isEqualTo(42L); - - try { - assertThat(service.doSomethingWithSerializablePojo(new SerializablePojo("test")).getStringField()).isEqualTo("test"); - } catch (Exception e) { - Assert.fail("Should be compatible with FstCodec"); - } - - try { - assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test"); - Assert.fail("FstCodec should not be able to serialize a not serializable class"); - } catch (Exception e) { - assertThat(e).isInstanceOf(RuntimeException.class); - assertThat(e.getMessage()).contains("Pojo does not implement Serializable"); - } - } finally { - client.shutdown(); - server.shutdown(); - } - } - @Test public void testInvocationWithSerializationCodec() { RedissonClient server = Redisson.create(createConfig().setCodec(new SerializationCodec()));