refactoring

pull/5520/head
Nikita Koksharov 1 year ago
parent 00d758eb93
commit 4ae1d46fd7

@ -27,12 +27,10 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ServiceManager; import org.redisson.connection.ServiceManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.WrappedLock;
import org.redisson.redisnode.RedissonClusterNodes; import org.redisson.redisnode.RedissonClusterNodes;
import org.redisson.redisnode.RedissonMasterSlaveNodes; import org.redisson.redisnode.RedissonMasterSlaveNodes;
import org.redisson.redisnode.RedissonSentinelMasterSlaveNodes; import org.redisson.redisnode.RedissonSentinelMasterSlaveNodes;
import org.redisson.redisnode.RedissonSingleNode; import org.redisson.redisnode.RedissonSingleNode;
import org.redisson.remote.ResponseEntry;
import org.redisson.transaction.RedissonTransaction; import org.redisson.transaction.RedissonTransaction;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -52,7 +50,6 @@ public final class Redisson implements RedissonClient {
RedissonReference.warmUp(); RedissonReference.warmUp();
} }
private final QueueTransferService queueTransferService = new QueueTransferService();
private final EvictionScheduler evictionScheduler; private final EvictionScheduler evictionScheduler;
private final WriteBehindService writeBehindService; private final WriteBehindService writeBehindService;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
@ -61,10 +58,6 @@ public final class Redisson implements RedissonClient {
private final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>(); private final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = new ConcurrentHashMap<>();
private final Config config; private final Config config;
private final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
private final WrappedLock responsesLock = new WrappedLock();
Redisson(Config config) { Redisson(Config config) {
this.config = config; this.config = config;
Config configCopy = new Config(config); Config configCopy = new Config(config);
@ -137,7 +130,7 @@ public final class Redisson implements RedissonClient {
@Override @Override
public RedissonRxClient rxJava() { public RedissonRxClient rxJava() {
return new RedissonRx(connectionManager, evictionScheduler, writeBehindService, responses); return new RedissonRx(connectionManager, evictionScheduler, writeBehindService);
} }
/* /*
@ -160,7 +153,7 @@ public final class Redisson implements RedissonClient {
@Override @Override
public RedissonReactiveClient reactive() { public RedissonReactiveClient reactive() {
return new RedissonReactive(connectionManager, evictionScheduler, writeBehindService, responses); return new RedissonReactive(connectionManager, evictionScheduler, writeBehindService);
} }
@Override @Override
@ -447,7 +440,7 @@ public final class Redisson implements RedissonClient {
@Override @Override
public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) { public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) {
return new RedissonExecutorService(codec, commandExecutor, this, name, queueTransferService, responses, responsesLock, options); return new RedissonExecutorService(codec, commandExecutor, this, name, options);
} }
@Override @Override
@ -471,7 +464,7 @@ public final class Redisson implements RedissonClient {
if (codec != connectionManager.getServiceManager().getCfg().getCodec()) { if (codec != connectionManager.getServiceManager().getCfg().getCodec()) {
executorId = executorId + ":" + name; executorId = executorId + ":" + name;
} }
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); return new RedissonRemoteService(codec, name, commandExecutor, executorId);
} }
@Override @Override
@ -544,7 +537,7 @@ public final class Redisson implements RedissonClient {
if (destinationQueue == null) { if (destinationQueue == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), commandExecutor, destinationQueue.getName()); return new RedissonDelayedQueue<V>(destinationQueue.getCodec(), commandExecutor, destinationQueue.getName());
} }
@Override @Override

@ -35,12 +35,11 @@ import java.util.concurrent.TimeUnit;
*/ */
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> { public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {
private final QueueTransferService queueTransferService;
private final String channelName; private final String channelName;
private final String queueName; private final String queueName;
private final String timeoutSetName; private final String timeoutSetName;
protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { protected RedissonDelayedQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name); super(codec, commandExecutor, name);
channelName = prefixName("redisson_delay_queue_channel", getRawName()); channelName = prefixName("redisson_delay_queue_channel", getRawName());
queueName = prefixName("redisson_delay_queue", getRawName()); queueName = prefixName("redisson_delay_queue", getRawName());
@ -75,10 +74,8 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
} }
}; };
queueTransferService.schedule(queueName, task); commandExecutor.getServiceManager().getQueueTransferService().schedule(queueName, task);
this.queueTransferService = queueTransferService;
} }
@Override @Override
@ -524,7 +521,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override @Override
public void destroy() { public void destroy() {
queueTransferService.remove(queueName); commandExecutor.getServiceManager().getQueueTransferService().remove(queueName);
} }
} }

@ -26,8 +26,8 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.*; import org.redisson.executor.*;
import org.redisson.executor.params.*; import org.redisson.executor.params.*;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.WrappedLock;
import org.redisson.misc.Injector; import org.redisson.misc.Injector;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry;
import org.redisson.remote.ResponseEntry.Result; import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -93,7 +93,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String responseQueueName; private final String responseQueueName;
private final QueueTransferService queueTransferService; private final QueueTransferService queueTransferService;
private final String executorId; private final String executorId;
private final ConcurrentMap<String, ResponseEntry> responses; private final Map<String, ResponseEntry> responses;
private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<>(); private final ReferenceQueue<RExecutorFuture<?>> referenceDueue = new ReferenceQueue<>();
private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Collection<RedissonExecutorFutureReference> references = Collections.newSetFromMap(new ConcurrentHashMap<>());
@ -101,17 +101,15 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final IdGenerator idGenerator; private final IdGenerator idGenerator;
public RedissonExecutorService(Codec codec, CommandAsyncExecutor commandExecutor, Redisson redisson, public RedissonExecutorService(Codec codec, CommandAsyncExecutor commandExecutor, Redisson redisson,
String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, String name, ExecutorOptions options) {
WrappedLock responsesLock,
ExecutorOptions options) {
super(); super();
this.codec = codec; this.codec = codec;
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name); this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name);
this.redisson = redisson; this.redisson = redisson;
this.queueTransferService = queueTransferService; this.queueTransferService = commandExecutor.getServiceManager().getQueueTransferService();
this.responses = responses; this.responses = commandExecutor.getServiceManager().getResponses();
this.responsesLock = responsesLock; this.responsesLock = commandExecutor.getServiceManager().getResponsesLock();
if (codec == commandExecutor.getServiceManager().getCfg().getCodec()) { if (codec == commandExecutor.getServiceManager().getCfg().getCodec()) {
this.executorId = commandExecutor.getServiceManager().getId(); this.executorId = commandExecutor.getServiceManager().getId();
@ -119,7 +117,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = commandExecutor.getServiceManager().getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name; this.executorId = commandExecutor.getServiceManager().getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
} }
remoteService = new RedissonExecutorRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); remoteService = new RedissonExecutorRemoteService(codec, name, commandExecutor, executorId);
requestQueueName = remoteService.getRequestQueueName(RemoteExecutorService.class); requestQueueName = remoteService.getRequestQueueName(RemoteExecutorService.class);
responseQueueName = remoteService.getResponseQueueName(executorId); responseQueueName = remoteService.getResponseQueueName(executorId);
String objectName = requestQueueName; String objectName = requestQueueName;
@ -146,7 +144,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
remoteService.setTasksRetryIntervalName(tasksRetryIntervalName); remoteService.setTasksRetryIntervalName(tasksRetryIntervalName);
remoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); remoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService = new TasksService(codec, name, commandExecutor, executorId, responses, responsesLock); executorRemoteService = new TasksService(codec, name, commandExecutor, executorId);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName); executorRemoteService.setStatusName(statusName);
@ -159,7 +157,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses, responsesLock); scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId);
scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
scheduledRemoteService.setTasksCounterName(tasksCounterName); scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName); scheduledRemoteService.setStatusName(statusName);
@ -292,7 +290,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
queueTransferService.schedule(getName(), task); queueTransferService.schedule(getName(), task);
TasksRunnerService service = TasksRunnerService service =
new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName, responses, responsesLock); new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName);
service.setStatusName(statusName); service.setStatusName(statusName);
service.setTasksCounterName(tasksCounterName); service.setTasksCounterName(tasksCounterName);
service.setTasksName(tasksName); service.setTasksName(tasksName);
@ -355,7 +353,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
private TasksBatchService createBatchService() { private TasksBatchService createBatchService() {
TasksBatchService executorRemoteService = new TasksBatchService(codec, getName(), commandExecutor, executorId, responses, responsesLock); TasksBatchService executorRemoteService = new TasksBatchService(codec, getName(), commandExecutor, executorId);
executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setTasksCounterName(tasksCounterName);

@ -23,15 +23,11 @@ import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.WrappedLock;
import org.redisson.reactive.*; import org.redisson.reactive.*;
import org.redisson.remote.ResponseEntry;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** /**
* Main infrastructure class allows to get access * Main infrastructure class allows to get access
@ -46,8 +42,6 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final EvictionScheduler evictionScheduler; protected final EvictionScheduler evictionScheduler;
protected final CommandReactiveExecutor commandExecutor; protected final CommandReactiveExecutor commandExecutor;
protected final ConnectionManager connectionManager; protected final ConnectionManager connectionManager;
protected final ConcurrentMap<String, ResponseEntry> responses;
private final WrappedLock responsesLock = new WrappedLock();
protected RedissonReactive(Config config) { protected RedissonReactive(Config config) {
Config configCopy = new Config(config); Config configCopy = new Config(config);
@ -60,11 +54,10 @@ public class RedissonReactive implements RedissonReactiveClient {
commandExecutor = new CommandReactiveService(connectionManager, objectBuilder); commandExecutor = new CommandReactiveService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor); evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor);
responses = new ConcurrentHashMap<>();
} }
protected RedissonReactive(ConnectionManager connectionManager, EvictionScheduler evictionScheduler, protected RedissonReactive(ConnectionManager connectionManager, EvictionScheduler evictionScheduler,
WriteBehindService writeBehindService, ConcurrentMap<String, ResponseEntry> responses) { WriteBehindService writeBehindService) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
RedissonObjectBuilder objectBuilder = null; RedissonObjectBuilder objectBuilder = null;
if (connectionManager.getServiceManager().getCfg().isReferenceEnabled()) { if (connectionManager.getServiceManager().getCfg().isReferenceEnabled()) {
@ -73,7 +66,6 @@ public class RedissonReactive implements RedissonReactiveClient {
commandExecutor = new CommandReactiveService(connectionManager, objectBuilder); commandExecutor = new CommandReactiveService(connectionManager, objectBuilder);
this.evictionScheduler = evictionScheduler; this.evictionScheduler = evictionScheduler;
this.writeBehindService = writeBehindService; this.writeBehindService = writeBehindService;
this.responses = responses;
} }
public EvictionScheduler getEvictionScheduler() { public EvictionScheduler getEvictionScheduler() {
@ -536,7 +528,7 @@ public class RedissonReactive implements RedissonReactiveClient {
if (codec != connectionManager.getServiceManager().getCfg().getCodec()) { if (codec != connectionManager.getServiceManager().getCfg().getCodec()) {
executorId = executorId + ":" + name; executorId = executorId + ":" + name;
} }
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); return new RedissonRemoteService(codec, name, commandExecutor, executorId);
} }
@Override @Override

@ -23,7 +23,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise; import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.*; import org.redisson.remote.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,9 +67,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap<>(); private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap<>();
public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) {
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) { super(codec, name, commandExecutor, executorId);
super(codec, name, commandExecutor, executorId, responses, locked);
} }
public String getRequestTasksMapName(Class<?> remoteInterface) { public String getRequestTasksMapName(Class<?> remoteInterface) {

@ -23,13 +23,9 @@ import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.ResponseEntry;
import org.redisson.rx.*; import org.redisson.rx.*;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** /**
* Main infrastructure class allows to get access * Main infrastructure class allows to get access
@ -44,9 +40,7 @@ public class RedissonRx implements RedissonRxClient {
protected final EvictionScheduler evictionScheduler; protected final EvictionScheduler evictionScheduler;
protected final CommandRxExecutor commandExecutor; protected final CommandRxExecutor commandExecutor;
protected final ConnectionManager connectionManager; protected final ConnectionManager connectionManager;
protected final ConcurrentMap<String, ResponseEntry> responses;
private final WrappedLock responsesLock = new WrappedLock();
protected RedissonRx(Config config) { protected RedissonRx(Config config) {
Config configCopy = new Config(config); Config configCopy = new Config(config);
@ -58,11 +52,9 @@ public class RedissonRx implements RedissonRxClient {
commandExecutor = new CommandRxService(connectionManager, objectBuilder); commandExecutor = new CommandRxService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor); evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor);
responses = new ConcurrentHashMap<>();
} }
protected RedissonRx(ConnectionManager connectionManager, EvictionScheduler evictionScheduler, protected RedissonRx(ConnectionManager connectionManager, EvictionScheduler evictionScheduler, WriteBehindService writeBehindService) {
WriteBehindService writeBehindService, ConcurrentMap<String, ResponseEntry> responses) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
RedissonObjectBuilder objectBuilder = null; RedissonObjectBuilder objectBuilder = null;
if (connectionManager.getServiceManager().getCfg().isReferenceEnabled()) { if (connectionManager.getServiceManager().getCfg().isReferenceEnabled()) {
@ -71,7 +63,6 @@ public class RedissonRx implements RedissonRxClient {
commandExecutor = new CommandRxService(connectionManager, objectBuilder); commandExecutor = new CommandRxService(connectionManager, objectBuilder);
this.evictionScheduler = evictionScheduler; this.evictionScheduler = evictionScheduler;
this.writeBehindService = writeBehindService; this.writeBehindService = writeBehindService;
this.responses = responses;
} }
public CommandRxExecutor getCommandExecutor() { public CommandRxExecutor getCommandExecutor() {
@ -515,7 +506,7 @@ public class RedissonRx implements RedissonRxClient {
if (codec != connectionManager.getServiceManager().getCfg().getCodec()) { if (codec != connectionManager.getServiceManager().getCfg().getCodec()) {
executorId = executorId + ":" + name; executorId = executorId + ":" + name;
} }
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); return new RedissonRemoteService(codec, name, commandExecutor, executorId);
} }
@Override @Override

@ -41,6 +41,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.*; import io.netty.util.concurrent.*;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService; import org.redisson.ElementsSubscribeService;
import org.redisson.QueueTransferService;
import org.redisson.Version; import org.redisson.Version;
import org.redisson.api.NatMapper; import org.redisson.api.NatMapper;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -56,6 +57,8 @@ import org.redisson.config.TransportMode;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -137,6 +140,12 @@ public class ServiceManager {
private static final Map<String, String> SHA_CACHE = new LRUCacheMap<>(500, 0, 0); private static final Map<String, String> SHA_CACHE = new LRUCacheMap<>(500, 0, 0);
private final Map<String, ResponseEntry> responses = new ConcurrentHashMap<>();
private final WrappedLock responsesLock = new WrappedLock();
private final QueueTransferService queueTransferService = new QueueTransferService();
public ServiceManager(Config cfg) { public ServiceManager(Config cfg) {
Version.logVersion(); Version.logVersion();
@ -570,4 +579,15 @@ public class ServiceManager {
return RedisCommands.HRANDFIELD; return RedisCommands.HRANDFIELD;
} }
public Map<String, ResponseEntry> getResponses() {
return responses;
}
public WrappedLock getResponsesLock() {
return responsesLock;
}
public QueueTransferService getQueueTransferService() {
return queueTransferService;
}
} }

@ -15,14 +15,16 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import org.redisson.*; import org.redisson.RedissonExecutorService;
import org.redisson.RedissonObject;
import org.redisson.RedissonRemoteService;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.executor.*; import org.redisson.api.executor.*;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.*; import org.redisson.remote.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,7 +33,6 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -57,9 +58,8 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
private List<TaskSuccessListener> successListeners; private List<TaskSuccessListener> successListeners;
public RedissonExecutorRemoteService(Codec codec, String name, public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncExecutor commandExecutor, String executorId, CommandAsyncExecutor commandExecutor, String executorId) {
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) { super(codec, name, commandExecutor, executorId);
super(codec, name, commandExecutor, executorId, responses, locked);
} }
@Override @Override

@ -15,7 +15,6 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -25,11 +24,9 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.ScheduledParameters; import org.redisson.executor.params.ScheduledParameters;
import org.redisson.remote.RemoteServiceRequest; import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
/** /**
* *
@ -40,9 +37,8 @@ public class ScheduledTasksService extends TasksService {
private String requestId; private String requestId;
public ScheduledTasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String redissonId, public ScheduledTasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String redissonId) {
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) { super(codec, name, commandExecutor, redissonId);
super(codec, name, commandExecutor, redissonId, responses, locked);
} }
public void setRequestId(String requestId) { public void setRequestId(String requestId) {

@ -15,15 +15,12 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import org.redisson.misc.WrappedLock;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService; import org.redisson.command.CommandBatchService;
import org.redisson.remote.ResponseEntry;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap;
/** /**
* *
@ -34,9 +31,8 @@ public class TasksBatchService extends TasksService {
private final CommandBatchService batchCommandService; private final CommandBatchService batchCommandService;
public TasksBatchService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, public TasksBatchService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) {
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) { super(codec, name, commandExecutor, executorId);
super(codec, name, commandExecutor, executorId, responses, locked);
batchCommandService = new CommandBatchService(commandExecutor); batchCommandService = new CommandBatchService(commandExecutor);
} }

@ -17,7 +17,6 @@ package org.redisson.executor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -35,7 +34,6 @@ import org.redisson.executor.params.*;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.HashValue; import org.redisson.misc.HashValue;
import org.redisson.misc.Injector; import org.redisson.misc.Injector;
import org.redisson.remote.ResponseEntry;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ObjectInput; import java.io.ObjectInput;
@ -44,7 +42,6 @@ import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -74,17 +71,11 @@ public class TasksRunnerService implements RemoteExecutorService {
private String tasksExpirationTimeName; private String tasksExpirationTimeName;
private TasksInjector tasksInjector; private TasksInjector tasksInjector;
private ConcurrentMap<String, ResponseEntry> responses;
private WrappedLock locked; public TasksRunnerService(CommandAsyncExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) {
public TasksRunnerService(CommandAsyncExecutor commandExecutor, RedissonClient redisson, Codec codec, String name,
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) {
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.name = name; this.name = name;
this.redisson = redisson; this.redisson = redisson;
this.responses = responses;
this.locked = locked;
this.codec = codec; this.codec = codec;
} }
@ -161,7 +152,7 @@ public class TasksRunnerService implements RemoteExecutorService {
* @return * @return
*/ */
private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed(String executorId, String requestId) { private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed(String executorId, String requestId) {
ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses, locked); ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId);
scheduledRemoteService.setTerminationTopicName(terminationTopicName); scheduledRemoteService.setTerminationTopicName(terminationTopicName);
scheduledRemoteService.setTasksCounterName(tasksCounterName); scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName); scheduledRemoteService.setStatusName(statusName);

@ -15,7 +15,6 @@
*/ */
package org.redisson.executor; package org.redisson.executor;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonExecutorService; import org.redisson.RedissonExecutorService;
import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -31,7 +30,6 @@ import org.redisson.remote.*;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -51,9 +49,8 @@ public class TasksService extends BaseRemoteService {
protected String tasksExpirationTimeName; protected String tasksExpirationTimeName;
protected long tasksRetryInterval; protected long tasksRetryInterval;
public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) {
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) { super(codec, name, commandExecutor, executorId);
super(codec, name, commandExecutor, executorId, responses, locked);
} }
public void setTasksExpirationTimeName(String tasksExpirationTimeName) { public void setTasksExpirationTimeName(String tasksExpirationTimeName) {

@ -15,7 +15,6 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonBucket; import org.redisson.RedissonBucket;
import org.redisson.RedissonList; import org.redisson.RedissonList;
import org.redisson.RedissonMap; import org.redisson.RedissonMap;
@ -40,7 +39,6 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -53,9 +51,8 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
protected final String cancelRequestMapName; protected final String cancelRequestMapName;
public AsyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, public AsyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, String cancelRequestMapName, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
BaseRemoteService remoteService, WrappedLock locked) { super(commandExecutor, name, responseQueueName, codec, executorId, remoteService);
super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService, locked);
this.cancelRequestMapName = cancelRequestMapName; this.cancelRequestMapName = cancelRequestMapName;
} }

@ -16,7 +16,6 @@
package org.redisson.remote; package org.redisson.remote;
import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueue;
@ -26,6 +25,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.ResponseEntry.Result; import org.redisson.remote.ResponseEntry.Result;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,18 +54,18 @@ public abstract class BaseRemoteProxy {
final String executorId; final String executorId;
final BaseRemoteService remoteService; final BaseRemoteService remoteService;
final WrappedLock locked; final WrappedLock locked;
BaseRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, BaseRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
Map<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService, WrappedLock locked) { Codec codec, String executorId, BaseRemoteService remoteService) {
super(); super();
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.name = name; this.name = name;
this.responseQueueName = responseQueueName; this.responseQueueName = responseQueueName;
this.responses = responses; this.responses = commandExecutor.getServiceManager().getResponses();
this.codec = codec; this.codec = codec;
this.executorId = executorId; this.executorId = executorId;
this.remoteService = remoteService; this.remoteService = remoteService;
this.locked = locked; this.locked = commandExecutor.getServiceManager().getResponsesLock();
} }
private final Map<Class<?>, String> requestQueueNameCache = new ConcurrentHashMap<>(); private final Map<Class<?>, String> requestQueueNameCache = new ConcurrentHashMap<>();

@ -20,7 +20,6 @@ import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonMap; import org.redisson.RedissonMap;
import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueue;
@ -66,20 +65,15 @@ public abstract class BaseRemoteService {
protected final String cancelRequestMapName; protected final String cancelRequestMapName;
protected final String cancelResponseMapName; protected final String cancelResponseMapName;
protected final String responseQueueName; protected final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
private final WrappedLock locked;
public BaseRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, public BaseRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) {
ConcurrentMap<String, ResponseEntry> responses, WrappedLock locked) {
this.codec = codec; this.codec = codec;
this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name); this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name);
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.executorId = executorId; this.executorId = executorId;
this.responses = responses;
this.cancelRequestMapName = "{" + name + ":remote" + "}:cancel-request"; this.cancelRequestMapName = "{" + name + ":remote" + "}:cancel-request";
this.cancelResponseMapName = "{" + name + ":remote" + "}:cancel-response"; this.cancelResponseMapName = "{" + name + ":remote" + "}:cancel-response";
this.responseQueueName = getResponseQueueName(executorId); this.responseQueueName = getResponseQueueName(executorId);
this.locked = locked;
} }
public String getResponseQueueName(String executorId) { public String getResponseQueueName(String executorId) {
@ -121,27 +115,27 @@ public abstract class BaseRemoteService {
for (Annotation annotation : remoteInterface.getAnnotations()) { for (Annotation annotation : remoteInterface.getAnnotations()) {
if (annotation.annotationType() == RRemoteAsync.class) { if (annotation.annotationType() == RRemoteAsync.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteAsync) annotation).value(); Class<T> syncInterface = (Class<T>) ((RRemoteAsync) annotation).value();
AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, responses, AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName,
codec, executorId, cancelRequestMapName, this, locked); codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface); return proxy.create(remoteInterface, options, syncInterface);
} }
if (annotation.annotationType() == RRemoteReactive.class) { if (annotation.annotationType() == RRemoteReactive.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteReactive) annotation).value(); Class<T> syncInterface = (Class<T>) ((RRemoteReactive) annotation).value();
ReactiveRemoteProxy proxy = new ReactiveRemoteProxy(commandExecutor, name, responseQueueName, responses, ReactiveRemoteProxy proxy = new ReactiveRemoteProxy(commandExecutor, name, responseQueueName,
codec, executorId, cancelRequestMapName, this, locked); codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface); return proxy.create(remoteInterface, options, syncInterface);
} }
if (annotation.annotationType() == RRemoteRx.class) { if (annotation.annotationType() == RRemoteRx.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteRx) annotation).value(); Class<T> syncInterface = (Class<T>) ((RRemoteRx) annotation).value();
RxRemoteProxy proxy = new RxRemoteProxy(commandExecutor, name, responseQueueName, responses, RxRemoteProxy proxy = new RxRemoteProxy(commandExecutor, name, responseQueueName,
codec, executorId, cancelRequestMapName, this, locked); codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface); return proxy.create(remoteInterface, options, syncInterface);
} }
} }
SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, this, locked); SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, codec, executorId, this);
return proxy.create(remoteInterface, options); return proxy.create(remoteInterface, options);
} }

@ -15,19 +15,16 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.misc.WrappedLock;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise; import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.reactive.CommandReactiveExecutor; import org.redisson.reactive.CommandReactiveExecutor;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -36,10 +33,8 @@ import reactor.core.publisher.Mono;
public class ReactiveRemoteProxy extends AsyncRemoteProxy { public class ReactiveRemoteProxy extends AsyncRemoteProxy {
public ReactiveRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, public ReactiveRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
String cancelRequestMapName, BaseRemoteService remoteService, WrappedLock locked) { super(commandExecutor, name, responseQueueName, codec, executorId, cancelRequestMapName, remoteService);
super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName,
remoteService, locked);
} }
@Override @Override

@ -15,21 +15,18 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
import java.util.Arrays; import io.reactivex.rxjava3.core.Completable;
import java.util.List; import io.reactivex.rxjava3.core.Flowable;
import java.util.concurrent.ConcurrentMap; import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.redisson.misc.WrappedLock;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise; import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.rx.CommandRxExecutor; import org.redisson.rx.CommandRxExecutor;
import io.reactivex.rxjava3.core.Completable; import java.util.Arrays;
import io.reactivex.rxjava3.core.Flowable; import java.util.List;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
/** /**
* *
@ -39,10 +36,8 @@ import io.reactivex.rxjava3.core.Single;
public class RxRemoteProxy extends AsyncRemoteProxy { public class RxRemoteProxy extends AsyncRemoteProxy {
public RxRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, public RxRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
String cancelRequestMapName, BaseRemoteService remoteService, WrappedLock locked) { super(commandExecutor, name, responseQueueName, codec, executorId, cancelRequestMapName, remoteService);
super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName,
remoteService, locked);
} }
@Override @Override

@ -15,7 +15,6 @@
*/ */
package org.redisson.remote; package org.redisson.remote;
import org.redisson.misc.WrappedLock;
import org.redisson.RedissonBucket; import org.redisson.RedissonBucket;
import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
@ -37,8 +36,8 @@ import java.util.concurrent.*;
public class SyncRemoteProxy extends BaseRemoteProxy { public class SyncRemoteProxy extends BaseRemoteProxy {
public SyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, public SyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService, WrappedLock locked) { Codec codec, String executorId, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService, locked); super(commandExecutor, name, responseQueueName, codec, executorId, remoteService);
} }
public <T> T create(Class<T> remoteInterface, RemoteInvocationOptions options) { public <T> T create(Class<T> remoteInterface, RemoteInvocationOptions options) {

Loading…
Cancel
Save