Feature - Add support for Reactive and RxJava2 interfaces for RemoteService object #2040

pull/2053/head
Nikita Koksharov 6 years ago
parent 747d073c1b
commit 042290e9bd

@ -480,7 +480,7 @@ public class Redisson implements RedissonClient {
} else {
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, this, name, connectionManager.getCommandExecutor(), executorId, responses);
return new RedissonRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
}
@Override

@ -166,7 +166,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = new RedissonExecutorRemoteService(codec, redisson, name, connectionManager.getCommandExecutor(), executorId, responses);
remoteService = new RedissonExecutorRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
requestQueueName = ((RedissonRemoteService) remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService) remoteService).getResponseQueueName(executorId);
String objectName = requestQueueName;
@ -185,7 +185,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersTopic = redisson.getTopic(workersChannelName);
executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService = new TasksService(codec, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
@ -197,7 +197,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses);
scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);
@ -350,7 +350,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
private TasksBatchService createBatchService() {
TasksBatchService executorRemoteService = new TasksBatchService(codec, redisson, name, commandExecutor, executorId, responses);
TasksBatchService executorRemoteService = new TasksBatchService(codec, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);

@ -17,6 +17,8 @@ package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNode;
@ -48,6 +50,7 @@ import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
@ -83,6 +86,7 @@ import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive;
import org.redisson.reactive.RedissonTransactionReactive;
import org.redisson.remote.ResponseEntry;
/**
* Main infrastructure class allows to get access
@ -98,6 +102,8 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected RedissonReactive(Config config) {
this.config = config;
@ -400,6 +406,32 @@ public class RedissonReactive implements RedissonReactiveClient {
public RAtomicDoubleReactive getAtomicDouble(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleReactive.class);
}
@Override
public RRemoteService getRemoteService() {
return getRemoteService("redisson_rs", connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(String name) {
return getRemoteService(name, connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(Codec codec) {
return getRemoteService("redisson_rs", codec);
}
@Override
public RRemoteService getRemoteService(String name, Codec codec) {
String executorId;
if (codec == connectionManager.getCodec()) {
executorId = connectionManager.getId().toString();
} else {
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses);
}
@Override
public RBitSetReactive getBitSet(String name) {

@ -31,13 +31,11 @@ import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -90,8 +88,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = new ConcurrentHashMap<>();
private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap<>();
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
@Override
@ -150,6 +148,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
register(remoteInterface, object, workers, commandExecutor.getConnectionManager().getExecutor());
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
if (workers < 1) {
@ -166,10 +168,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
remoteMap.put(remoteInterface, new Entry(workers));
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
subscribe(remoteInterface, requestQueue, executor);
}
private <T> void subscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor) {
Entry entry = remoteMap.get(remoteInterface);
@ -185,8 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
if (e != null) {
if (e instanceof RedissonShutdownException
|| redisson.isShuttingDown()) {
if (e instanceof RedissonShutdownException) {
return;
}
log.error("Can't process the remote service request.", e);
@ -207,7 +208,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
subscribe(remoteInterface, requestQueue, executor);
}
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceRequest> tasks = getMap(requestQueue.getName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.onComplete((request, exc) -> {
if (exc != null) {
@ -277,7 +278,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
RList<Object> list = redisson.getList(responseName, codec);
RList<Object> list = new RedissonList<>(codec, commandExecutor, responseName, null);
RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
addFuture.onComplete((res, exce) -> {
if (exce != null) {
@ -336,7 +337,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// could be removed not from future object
if (r.isSendResponse()) {
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceCancelResponse> map = getMap(cancelResponseMapName);
map.fastPutAsync(request.getId(), response);
map.expireAsync(60, TimeUnit.SECONDS);
}
@ -371,7 +372,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
RBlockingQueueAsync<RRemoteServiceResponse> queue = redisson.getBlockingQueue(responseName, codec);
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
RFuture<Void> clientsFuture = queue.putAsync(responseHolder.get());
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);

@ -15,6 +15,9 @@
*/
package org.redisson;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNode;
import org.redisson.api.MapOptions;
@ -43,6 +46,7 @@ import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RQueueRx;
import org.redisson.api.RRateLimiterRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScoredSortedSetRx;
import org.redisson.api.RScriptRx;
import org.redisson.api.RSemaphoreRx;
@ -61,6 +65,7 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.remote.ResponseEntry;
import org.redisson.rx.CommandRxExecutor;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RedissonBatchRx;
@ -96,6 +101,8 @@ public class RedissonRx implements RedissonRxClient {
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected RedissonRx(Config config) {
this.config = config;
Config configCopy = new Config(config);
@ -376,6 +383,32 @@ public class RedissonRx implements RedissonRxClient {
public RAtomicDoubleRx getAtomicDouble(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleRx.class);
}
@Override
public RRemoteService getRemoteService() {
return getRemoteService("redisson_rs", connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(String name) {
return getRemoteService(name, connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(Codec codec) {
return getRemoteService("redisson_rs", codec);
}
@Override
public RRemoteService getRemoteService(String name, Codec codec) {
String executorId;
if (codec == connectionManager.getCodec()) {
executorId = connectionManager.getId().toString();
} else {
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses);
}
@Override
public RBitSetRx getBitSet(String name) {

@ -581,6 +581,40 @@ public interface RedissonReactiveClient {
*/
RAtomicDoubleReactive getAtomicDouble(String name);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
*
* @return RemoteService object
*/
RRemoteService getRemoteService();
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
* and uses provided codec for method arguments and result.
*
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(Codec codec);
/**
* Returns object for remote operations prefixed with the specified name
*
* @param name - the name used as the Redis key prefix for the services
* @return RemoteService object
*/
RRemoteService getRemoteService(String name);
/**
* Returns object for remote operations prefixed with the specified name
* and uses provided codec for method arguments and result.
*
* @param name - the name used as the Redis key prefix for the services
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(String name, Codec codec);
/**
* Returns bitSet instance by name.
*

@ -569,6 +569,40 @@ public interface RedissonRxClient {
*/
RAtomicDoubleRx getAtomicDouble(String name);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
*
* @return RemoteService object
*/
RRemoteService getRemoteService();
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
* and uses provided codec for method arguments and result.
*
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(Codec codec);
/**
* Returns object for remote operations prefixed with the specified name
*
* @param name - the name used as the Redis key prefix for the services
* @return RemoteService object
*/
RRemoteService getRemoteService(String name);
/**
* Returns object for remote operations prefixed with the specified name
* and uses provided codec for method arguments and result.
*
* @param name - the name used as the Redis key prefix for the services
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(String name, Codec codec);
/**
* Returns bitSet instance by name.
*

@ -0,0 +1,49 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation used to mark interface as Reactive
* client interface for remote service interface.
* <p>
* All method signatures must match with remote service interface,
* but return type must be <code>reactor.core.publisher.Mono</code>.
* <p>
* It's not necessary to add all methods from remote service.
* Add only those which are needed.
*
* @see reactor.core.publisher.Mono
*
* @author Nikita Koksharov
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RRemoteReactive {
/**
* Remote interface class used to register
*
* @return class used to register
*/
Class<?> value();
}

@ -0,0 +1,56 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation used to mark interface as RxJava2
* client interface for remote service interface.
* <p>
* All method signatures must match with remote service interface,
* but return type must be one of the following:
* <ul>
* <li>io.reactivex.Completable</li>
* <li>io.reactivex.Single</li>
* <li>io.reactivex.Maybe</li>
* </ul>
* <p>
* It's not necessary to add all methods from remote service.
* Add only those which are needed.
*
* @see io.reactivex.Completable
* @see io.reactivex.Single
* @see io.reactivex.Maybe
*
* @author Nikita Koksharov
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RRemoteRx {
/**
* Remote interface class used to register
*
* @return class used to register
*/
Class<?> value();
}

@ -20,9 +20,8 @@ import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncService;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
@ -33,9 +32,9 @@ import org.redisson.remote.ResponseEntry;
*/
public class RedissonExecutorRemoteService extends RedissonRemoteService {
public RedissonExecutorRemoteService(Codec codec, RedissonClient redisson, String name,
CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
@Override

@ -21,7 +21,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
@ -41,8 +40,8 @@ public class ScheduledTasksService extends TasksService {
private RequestId requestId;
public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, redissonId, responses);
public ScheduledTasksService(Codec codec, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, redissonId, responses);
}
public void setRequestId(RequestId requestId) {

@ -19,7 +19,6 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
@ -35,8 +34,8 @@ public class TasksBatchService extends TasksService {
private CommandBatchService batchCommandService;
public TasksBatchService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public TasksBatchService(Codec codec, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
batchCommandService = new CommandBatchService(commandExecutor.getConnectionManager());
}

@ -161,7 +161,7 @@ public class TasksRunnerService implements RemoteExecutorService {
* @return
*/
private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed(String executorId, String requestId) {
ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses);
scheduledRemoteService.setTerminationTopicName(terminationTopicName);
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);

@ -22,12 +22,10 @@ import java.util.concurrent.TimeUnit;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.RPromise;
@ -55,8 +53,8 @@ public class TasksService extends BaseRemoteService {
protected String tasksRetryIntervalName;
protected long tasksRetryInterval;
public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
public void setTasksRetryIntervalName(String tasksRetryIntervalName) {
@ -194,7 +192,7 @@ public class TasksService extends BaseRemoteService {
return;
}
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceCancelRequest> canceledRequests = getMap(cancelRequestMapName);
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -19,14 +19,17 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonBucket;
import org.redisson.RedissonList;
import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
@ -48,13 +51,40 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
protected final String cancelRequestMapName;
public AsyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, RedissonClient redisson, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, remoteService);
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService);
this.cancelRequestMapName = cancelRequestMapName;
}
protected List<Class<?>> permittedClasses() {
return Arrays.asList(RFuture.class);
}
public <T> T create(Class<T> remoteInterface, RemoteInvocationOptions options,
Class<?> syncInterface) {
for (Method m : remoteInterface.getMethods()) {
try {
syncInterface.getMethod(m.getName(), m.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Method '" + m.getName() + "' with params '"
+ Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
boolean permitted = false;
for (Class<?> clazz : permittedClasses()) {
if (clazz.isAssignableFrom(m.getReturnType())) {
permitted = true;
break;
}
}
if (!permitted) {
throw new IllegalArgumentException(
m.getReturnType().getClass() + " isn't allowed as return type");
}
}
// local copy of the options, to prevent mutation
RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
InvocationHandler handler = new InvocationHandler() {
@ -165,16 +195,20 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
});
return result;
return convertResult(result, method.getReturnType());
}
};
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
return result;
}
private void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> result,
String ackName, RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
RFuture<Boolean> deleteFuture = new RedissonBucket<>(commandExecutor, ackName).deleteAsync();
deleteFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
@ -252,7 +286,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
boolean ackNotSent = commandExecutor.get(future);
if (ackNotSent) {
RList<Object> list = redisson.getList(requestQueueName, LongCodec.INSTANCE);
RList<Object> list = new RedissonList<>(LongCodec.INSTANCE, commandExecutor, requestQueueName, null);
list.remove(requestId.toString());
super.cancel(mayInterruptIfRunning);
return true;
@ -294,7 +328,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceCancelRequest> canceledRequests = new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, cancelRequestMapName, null, null, null);
canceledRequests.fastPutAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);

@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.redisson.RedissonBlockingQueue;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
@ -54,19 +54,17 @@ public abstract class BaseRemoteProxy {
private final String name;
final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
final RedissonClient redisson;
final Codec codec;
final String executorId;
final BaseRemoteService remoteService;
BaseRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, RedissonClient redisson, Codec codec, String executorId, BaseRemoteService remoteService) {
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService) {
super();
this.commandExecutor = commandExecutor;
this.name = name;
this.responseQueueName = responseQueueName;
this.responses = responses;
this.redisson = redisson;
this.codec = codec;
this.executorId = executorId;
this.remoteService = remoteService;
@ -203,12 +201,16 @@ public abstract class BaseRemoteProxy {
return responseFuture;
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
}
private void pollResponse(ResponseEntry ent) {
if (!ent.getStarted().compareAndSet(false, true)) {
return;
}
RBlockingQueue<RRemoteServiceResponse> queue = redisson.getBlockingQueue(responseQueueName, codec);
RBlockingQueue<RRemoteServiceResponse> queue = getBlockingQueue(responseQueueName, codec);
RFuture<RRemoteServiceResponse> future = queue.takeAsync();
future.onComplete(createResponseListener());
}
@ -230,7 +232,7 @@ public abstract class BaseRemoteProxy {
RequestId key = new RequestId(response.getId());
List<Result> list = entry.getResponses().get(key);
if (list == null) {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
RBlockingQueue<RRemoteServiceResponse> responseQueue = getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().onComplete(createResponseListener());
return;
}
@ -246,7 +248,7 @@ public abstract class BaseRemoteProxy {
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
} else {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
RBlockingQueue<RRemoteServiceResponse> responseQueue = getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().onComplete(createResponseListener());
}
}

@ -26,11 +26,13 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.api.annotation.RRemoteReactive;
import org.redisson.api.annotation.RRemoteRx;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
@ -56,7 +58,6 @@ public abstract class BaseRemoteService {
private final ConcurrentMap<Method, long[]> methodSignaturesCache = new ConcurrentHashMap<>();
protected final Codec codec;
protected final RedissonClient redisson;
protected final String name;
protected final CommandAsyncExecutor commandExecutor;
protected final String executorId;
@ -66,9 +67,8 @@ public abstract class BaseRemoteService {
protected final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
public BaseRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
this.codec = codec;
this.redisson = redisson;
this.name = name;
this.commandExecutor = commandExecutor;
this.executorId = executorId;
@ -127,27 +127,24 @@ public abstract class BaseRemoteService {
for (Annotation annotation : remoteInterface.getAnnotations()) {
if (annotation.annotationType() == RRemoteAsync.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteAsync) annotation).value();
for (Method m : remoteInterface.getMethods()) {
try {
syncInterface.getMethod(m.getName(), m.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Method '" + m.getName() + "' with params '"
+ Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
if (!m.getReturnType().getClass().isInstance(RFuture.class)) {
throw new IllegalArgumentException(
m.getReturnType().getClass() + " isn't allowed as return type");
}
}
AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, cancelRequestMapName, this);
AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface);
}
if (annotation.annotationType() == RRemoteReactive.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteReactive) annotation).value();
ReactiveRemoteProxy proxy = new ReactiveRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface);
}
if (annotation.annotationType() == RRemoteRx.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteRx) annotation).value();
RxRemoteProxy proxy = new RxRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface);
}
}
SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, this);
SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, this);
return proxy.create(remoteInterface, options);
}
@ -155,6 +152,10 @@ public abstract class BaseRemoteService {
return executionTimeoutInMillis;
}
protected <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, name, null, null, null);
}
protected <T> void scheduleCheck(String mapName, RequestId requestId, RPromise<T> cancelRequest) {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
@ -163,7 +164,7 @@ public abstract class BaseRemoteService {
return;
}
RMap<String, T> canceledRequests = redisson.getMap(mapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, T> canceledRequests = getMap(mapName);
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
future.onComplete((request, ex) -> {
if (cancelRequest.isDone()) {

@ -0,0 +1,53 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.reactive.CommandReactiveExecutor;
import reactor.core.publisher.Mono;
/**
*
* @author Nikita Koksharov
*
*/
public class ReactiveRemoteProxy extends AsyncRemoteProxy {
public ReactiveRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId,
String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName,
remoteService);
}
@Override
protected List<Class<?>> permittedClasses() {
return Arrays.asList(Mono.class);
}
@Override
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
return ((CommandReactiveExecutor) commandExecutor).reactive(() -> result);
}
}

@ -0,0 +1,64 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.rx.CommandRxExecutor;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
/**
*
* @author Nikita Koksharov
*
*/
public class RxRemoteProxy extends AsyncRemoteProxy {
public RxRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId,
String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName,
remoteService);
}
@Override
protected List<Class<?>> permittedClasses() {
return Arrays.asList(Completable.class, Single.class, Maybe.class);
}
@Override
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
Flowable<Object> flowable = ((CommandRxExecutor) commandExecutor).flowable(() -> result);
if (returnType == Completable.class) {
return flowable.ignoreElements();
}
if (returnType == Single.class) {
return flowable.singleOrError();
}
return flowable.singleElement();
}
}

@ -20,8 +20,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
@ -37,8 +37,8 @@ import org.redisson.misc.RPromise;
public class SyncRemoteProxy extends BaseRemoteProxy {
public SyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, RedissonClient redisson, Codec codec, String executorId, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, remoteService);
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService);
}
public <T> T create(Class<T> remoteInterface, RemoteInvocationOptions options) {
@ -120,7 +120,7 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
+ optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
}
}
redisson.getBucket(ackName).delete();
new RedissonBucket<>(commandExecutor, ackName).delete();
}
// poll for the response only if expected

@ -22,13 +22,21 @@ import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.api.annotation.RRemoteReactive;
import org.redisson.api.annotation.RRemoteRx;
import org.redisson.codec.FstCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceTimeoutException;
import io.reactivex.Completable;
import io.reactivex.Single;
import reactor.core.publisher.Mono;
public class RedissonRemoteServiceTest extends BaseTest {
public static class Pojo {
@ -87,6 +95,40 @@ public class RedissonRemoteServiceTest extends BaseTest {
RFuture<Void> timeoutMethod();
}
@RRemoteReactive(RemoteInterface.class)
public interface RemoteInterfaceReactive {
Mono<Void> cancelMethod();
Mono<Void> voidMethod(String name, Long param);
Mono<Long> resultMethod(Long value);
Mono<Void> errorMethod();
Mono<Void> errorMethodWithCause();
Mono<Void> timeoutMethod();
}
@RRemoteRx(RemoteInterface.class)
public interface RemoteInterfaceRx {
Completable cancelMethod();
Completable voidMethod(String name, Long param);
Single<Long> resultMethod(Long value);
Completable errorMethod();
Completable errorMethodWithCause();
Completable timeoutMethod();
}
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceWrongMethodAsync {
@ -272,7 +314,7 @@ public class RedissonRemoteServiceTest extends BaseTest {
assertThat(iterations.get()).isLessThan(Integer.MAX_VALUE / 2);
assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
assertThat(executor.awaitTermination(2, TimeUnit.SECONDS)).isTrue();
}
@ -303,6 +345,40 @@ public class RedissonRemoteServiceTest extends BaseTest {
r1.shutdown();
r2.shutdown();
}
@Test
public void testReactive() throws InterruptedException {
RedissonReactiveClient r1 = Redisson.createReactive(createConfig());
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RedissonReactiveClient r2 = Redisson.createReactive(createConfig());
RemoteInterfaceReactive ri = r2.getRemoteService().get(RemoteInterfaceReactive.class);
Mono<Void> f = ri.voidMethod("someName", 100L);
f.block();
Mono<Long> resFuture = ri.resultMethod(100L);
assertThat(resFuture.block()).isEqualTo(200);
r1.shutdown();
r2.shutdown();
}
@Test
public void testRx() throws InterruptedException {
RedissonRxClient r1 = Redisson.createRx(createConfig());
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RedissonRxClient r2 = Redisson.createRx(createConfig());
RemoteInterfaceRx ri = r2.getRemoteService().get(RemoteInterfaceRx.class);
Completable f = ri.voidMethod("someName", 100L);
f.blockingGet();
Single<Long> resFuture = ri.resultMethod(100L);
assertThat(resFuture.blockingGet()).isEqualTo(200);
r1.shutdown();
r2.shutdown();
}
@Test
public void testExecutorAsync() throws InterruptedException {

Loading…
Cancel
Save