Fixed - Optional class can't be used as a result object in RemoteService interface. #3282

pull/3417/head
Nikita Koksharov 4 years ago
parent d8343672a4
commit 85653aa802

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@ -415,7 +416,15 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
try {
RFuture<Void> clientsFuture = queue.putAsync(result);
RRemoteServiceResponse response;
if (result instanceof RemoteServiceResponse
&& ((RemoteServiceResponse) result).getResult() instanceof Optional) {
Optional<?> o = (Optional<?>) ((RemoteServiceResponse) result).getResult();
response = new RemoteServiceResponse(result.getId(), o.orElse(null));
} else {
response = result;
}
RFuture<Void> clientsFuture = queue.putAsync(response);
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
clientsFuture.onComplete((res, exc) -> {
@ -423,13 +432,13 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (exc instanceof RedissonShutdownException) {
return;
}
log.error("Can't send response: " + result + " for request: " + request, exc);
log.error("Can't send response: " + response + " for request: " + request, exc);
}
resubscribe(remoteInterface, requestQueue, executor, method.getBean());
});
} catch (Exception ex) {
log.error("Can't send response: " + result + " for request: " + request, e);
log.error("Can't send response: " + result + " for request: " + request, ex);
}
} else {
resubscribe(remoteInterface, requestQueue, executor, method.getBean());

@ -18,6 +18,7 @@ package org.redisson.remote;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonBucket;
@ -134,6 +135,12 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
if (response.getError() != null) {
throw response.getError();
}
if (method.getReturnType().equals(Optional.class)) {
if (response.getResult() == null) {
return Optional.empty();
}
return Optional.of(response.getResult());
}
return response.getResult();
}

@ -7,6 +7,7 @@ import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -28,7 +29,6 @@ import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.api.annotation.RRemoteReactive;
import org.redisson.api.annotation.RRemoteRx;
import org.redisson.codec.FstCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceTimeoutException;
@ -151,7 +151,9 @@ public class RedissonRemoteServiceTest extends BaseTest {
public interface RemoteInterface {
Optional<Integer> optionalResult(Integer value);
void cancelMethod() throws InterruptedException;
void voidMethod(String name, Long param);
@ -190,6 +192,14 @@ public class RedissonRemoteServiceTest extends BaseTest {
this.iterations = iterations;
}
@Override
public Optional<Integer> optionalResult(Integer value) {
if (value == null) {
return Optional.empty();
}
return Optional.of(value);
}
@Override
public void cancelMethod() throws InterruptedException {
while (true) {
@ -262,6 +272,19 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
@Test
public void testOptional() {
RRemoteService remoteService = redisson.getRemoteService();
remoteService.register(RemoteInterface.class, new RemoteImpl());
RemoteInterface service = redisson.getRemoteService().get(RemoteInterface.class);
Optional<Integer> r1 = service.optionalResult(null);
assertThat(r1.isPresent()).isFalse();
Optional<Integer> r2 = service.optionalResult(2);
assertThat(r2.get()).isEqualTo(2);
remoteService.deregister(RemoteInterface.class);
}
@Test
public void testConcurrentInvocations() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
@ -665,36 +688,6 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
}
@Test
public void testInvocationWithFstCodec() {
RedissonClient server = Redisson.create(createConfig().setCodec(new FstCodec()));
RedissonClient client = Redisson.create(createConfig().setCodec(new FstCodec()));
try {
server.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RemoteInterface service = client.getRemoteService().get(RemoteInterface.class);
assertThat(service.resultMethod(21L)).as("Should be compatible with FstCodec").isEqualTo(42L);
try {
assertThat(service.doSomethingWithSerializablePojo(new SerializablePojo("test")).getStringField()).isEqualTo("test");
} catch (Exception e) {
Assert.fail("Should be compatible with FstCodec");
}
try {
assertThat(service.doSomethingWithPojo(new Pojo("test")).getStringField()).isEqualTo("test");
Assert.fail("FstCodec should not be able to serialize a not serializable class");
} catch (Exception e) {
assertThat(e).isInstanceOf(RuntimeException.class);
assertThat(e.getMessage()).contains("Pojo does not implement Serializable");
}
} finally {
client.shutdown();
server.shutdown();
}
}
@Test
public void testInvocationWithSerializationCodec() {
RedissonClient server = Redisson.create(createConfig().setCodec(new SerializationCodec()));

Loading…
Cancel
Save