diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 541e359bd..ea86d7fdf 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -27,12 +27,10 @@ import org.redisson.connection.ConnectionManager; import org.redisson.connection.ServiceManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.liveobject.core.RedissonObjectBuilder; -import org.redisson.misc.WrappedLock; import org.redisson.redisnode.RedissonClusterNodes; import org.redisson.redisnode.RedissonMasterSlaveNodes; import org.redisson.redisnode.RedissonSentinelMasterSlaveNodes; import org.redisson.redisnode.RedissonSingleNode; -import org.redisson.remote.ResponseEntry; import org.redisson.transaction.RedissonTransaction; import java.util.concurrent.ConcurrentHashMap; @@ -52,7 +50,6 @@ public final class Redisson implements RedissonClient { RedissonReference.warmUp(); } - private final QueueTransferService queueTransferService = new QueueTransferService(); private final EvictionScheduler evictionScheduler; private final WriteBehindService writeBehindService; private final ConnectionManager connectionManager; @@ -61,10 +58,6 @@ public final class Redisson implements RedissonClient { private final ConcurrentMap, Class> liveObjectClassCache = new ConcurrentHashMap<>(); private final Config config; - private final ConcurrentMap responses = new ConcurrentHashMap<>(); - - private final WrappedLock responsesLock = new WrappedLock(); - Redisson(Config config) { this.config = config; Config configCopy = new Config(config); @@ -137,7 +130,7 @@ public final class Redisson implements RedissonClient { @Override public RedissonRxClient rxJava() { - return new RedissonRx(connectionManager, evictionScheduler, writeBehindService, responses); + return new RedissonRx(connectionManager, evictionScheduler, writeBehindService); } /* @@ -160,7 +153,7 @@ public final class Redisson implements RedissonClient { @Override public RedissonReactiveClient reactive() { - return new RedissonReactive(connectionManager, evictionScheduler, writeBehindService, responses); + return new RedissonReactive(connectionManager, evictionScheduler, writeBehindService); } @Override @@ -447,7 +440,7 @@ public final class Redisson implements RedissonClient { @Override public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) { - return new RedissonExecutorService(codec, commandExecutor, this, name, queueTransferService, responses, responsesLock, options); + return new RedissonExecutorService(codec, commandExecutor, this, name, options); } @Override @@ -471,7 +464,7 @@ public final class Redisson implements RedissonClient { if (codec != connectionManager.getServiceManager().getCfg().getCodec()) { executorId = executorId + ":" + name; } - return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); + return new RedissonRemoteService(codec, name, commandExecutor, executorId); } @Override @@ -544,7 +537,7 @@ public final class Redisson implements RedissonClient { if (destinationQueue == null) { throw new NullPointerException(); } - return new RedissonDelayedQueue(queueTransferService, destinationQueue.getCodec(), commandExecutor, destinationQueue.getName()); + return new RedissonDelayedQueue(destinationQueue.getCodec(), commandExecutor, destinationQueue.getName()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index 58a2a7d5d..da00242ba 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -35,12 +35,11 @@ import java.util.concurrent.TimeUnit; */ public class RedissonDelayedQueue extends RedissonExpirable implements RDelayedQueue { - private final QueueTransferService queueTransferService; private final String channelName; private final String queueName; private final String timeoutSetName; - protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { + protected RedissonDelayedQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); channelName = prefixName("redisson_delay_queue_channel", getRawName()); queueName = prefixName("redisson_delay_queue", getRawName()); @@ -75,10 +74,8 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } }; - - queueTransferService.schedule(queueName, task); - - this.queueTransferService = queueTransferService; + + commandExecutor.getServiceManager().getQueueTransferService().schedule(queueName, task); } @Override @@ -524,7 +521,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override public void destroy() { - queueTransferService.remove(queueName); + commandExecutor.getServiceManager().getQueueTransferService().remove(queueName); } } diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index c9699e4c6..9c6070377 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -26,8 +26,8 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.*; import org.redisson.executor.params.*; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.WrappedLock; import org.redisson.misc.Injector; +import org.redisson.misc.WrappedLock; import org.redisson.remote.ResponseEntry; import org.redisson.remote.ResponseEntry.Result; import org.slf4j.Logger; @@ -93,7 +93,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final String responseQueueName; private final QueueTransferService queueTransferService; private final String executorId; - private final ConcurrentMap responses; + private final Map responses; private final ReferenceQueue> referenceDueue = new ReferenceQueue<>(); private final Collection references = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -101,17 +101,15 @@ public class RedissonExecutorService implements RScheduledExecutorService { private final IdGenerator idGenerator; public RedissonExecutorService(Codec codec, CommandAsyncExecutor commandExecutor, Redisson redisson, - String name, QueueTransferService queueTransferService, ConcurrentMap responses, - WrappedLock responsesLock, - ExecutorOptions options) { + String name, ExecutorOptions options) { super(); this.codec = codec; this.commandExecutor = commandExecutor; this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name); this.redisson = redisson; - this.queueTransferService = queueTransferService; - this.responses = responses; - this.responsesLock = responsesLock; + this.queueTransferService = commandExecutor.getServiceManager().getQueueTransferService(); + this.responses = commandExecutor.getServiceManager().getResponses(); + this.responsesLock = commandExecutor.getServiceManager().getResponsesLock(); if (codec == commandExecutor.getServiceManager().getCfg().getCodec()) { this.executorId = commandExecutor.getServiceManager().getId(); @@ -119,7 +117,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { this.executorId = commandExecutor.getServiceManager().getId() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name; } - remoteService = new RedissonExecutorRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); + remoteService = new RedissonExecutorRemoteService(codec, name, commandExecutor, executorId); requestQueueName = remoteService.getRequestQueueName(RemoteExecutorService.class); responseQueueName = remoteService.getResponseQueueName(executorId); String objectName = requestQueueName; @@ -146,7 +144,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { remoteService.setTasksRetryIntervalName(tasksRetryIntervalName); remoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); - executorRemoteService = new TasksService(codec, name, commandExecutor, executorId, responses, responsesLock); + executorRemoteService = new TasksService(codec, name, commandExecutor, executorId); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); executorRemoteService.setStatusName(statusName); @@ -159,7 +157,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS); asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); - scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses, responsesLock); + scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId); scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); scheduledRemoteService.setTasksCounterName(tasksCounterName); scheduledRemoteService.setStatusName(statusName); @@ -292,7 +290,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { queueTransferService.schedule(getName(), task); TasksRunnerService service = - new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName, responses, responsesLock); + new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName); service.setStatusName(statusName); service.setTasksCounterName(tasksCounterName); service.setTasksName(tasksName); @@ -355,7 +353,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { } private TasksBatchService createBatchService() { - TasksBatchService executorRemoteService = new TasksBatchService(codec, getName(), commandExecutor, executorId, responses, responsesLock); + TasksBatchService executorRemoteService = new TasksBatchService(codec, getName(), commandExecutor, executorId); executorRemoteService.setTasksExpirationTimeName(tasksExpirationTimeName); executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0)); executorRemoteService.setTasksCounterName(tasksCounterName); diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 28dd57fa3..0dc445b74 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -23,15 +23,11 @@ import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.liveobject.core.RedissonObjectBuilder; -import org.redisson.misc.WrappedLock; import org.redisson.reactive.*; -import org.redisson.remote.ResponseEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * Main infrastructure class allows to get access @@ -46,8 +42,6 @@ public class RedissonReactive implements RedissonReactiveClient { protected final EvictionScheduler evictionScheduler; protected final CommandReactiveExecutor commandExecutor; protected final ConnectionManager connectionManager; - protected final ConcurrentMap responses; - private final WrappedLock responsesLock = new WrappedLock(); protected RedissonReactive(Config config) { Config configCopy = new Config(config); @@ -60,11 +54,10 @@ public class RedissonReactive implements RedissonReactiveClient { commandExecutor = new CommandReactiveService(connectionManager, objectBuilder); evictionScheduler = new EvictionScheduler(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor); - responses = new ConcurrentHashMap<>(); } protected RedissonReactive(ConnectionManager connectionManager, EvictionScheduler evictionScheduler, - WriteBehindService writeBehindService, ConcurrentMap responses) { + WriteBehindService writeBehindService) { this.connectionManager = connectionManager; RedissonObjectBuilder objectBuilder = null; if (connectionManager.getServiceManager().getCfg().isReferenceEnabled()) { @@ -73,7 +66,6 @@ public class RedissonReactive implements RedissonReactiveClient { commandExecutor = new CommandReactiveService(connectionManager, objectBuilder); this.evictionScheduler = evictionScheduler; this.writeBehindService = writeBehindService; - this.responses = responses; } public EvictionScheduler getEvictionScheduler() { @@ -536,7 +528,7 @@ public class RedissonReactive implements RedissonReactiveClient { if (codec != connectionManager.getServiceManager().getCfg().getCodec()) { executorId = executorId + ":" + name; } - return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); + return new RedissonRemoteService(codec, name, commandExecutor, executorId); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 547ebeeaa..1d18af059 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -23,7 +23,6 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.WrappedLock; import org.redisson.remote.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +67,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS private final Map, Entry> remoteMap = new ConcurrentHashMap<>(); - public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, - ConcurrentMap responses, WrappedLock locked) { - super(codec, name, commandExecutor, executorId, responses, locked); + public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) { + super(codec, name, commandExecutor, executorId); } public String getRequestTasksMapName(Class remoteInterface) { diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index f337ad6bc..bdd4e32e3 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -23,13 +23,9 @@ import org.redisson.config.ConfigSupport; import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.liveobject.core.RedissonObjectBuilder; -import org.redisson.misc.WrappedLock; -import org.redisson.remote.ResponseEntry; import org.redisson.rx.*; import java.util.Arrays; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * Main infrastructure class allows to get access @@ -44,9 +40,7 @@ public class RedissonRx implements RedissonRxClient { protected final EvictionScheduler evictionScheduler; protected final CommandRxExecutor commandExecutor; protected final ConnectionManager connectionManager; - protected final ConcurrentMap responses; - private final WrappedLock responsesLock = new WrappedLock(); - + protected RedissonRx(Config config) { Config configCopy = new Config(config); @@ -58,11 +52,9 @@ public class RedissonRx implements RedissonRxClient { commandExecutor = new CommandRxService(connectionManager, objectBuilder); evictionScheduler = new EvictionScheduler(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor); - responses = new ConcurrentHashMap<>(); } - protected RedissonRx(ConnectionManager connectionManager, EvictionScheduler evictionScheduler, - WriteBehindService writeBehindService, ConcurrentMap responses) { + protected RedissonRx(ConnectionManager connectionManager, EvictionScheduler evictionScheduler, WriteBehindService writeBehindService) { this.connectionManager = connectionManager; RedissonObjectBuilder objectBuilder = null; if (connectionManager.getServiceManager().getCfg().isReferenceEnabled()) { @@ -71,7 +63,6 @@ public class RedissonRx implements RedissonRxClient { commandExecutor = new CommandRxService(connectionManager, objectBuilder); this.evictionScheduler = evictionScheduler; this.writeBehindService = writeBehindService; - this.responses = responses; } public CommandRxExecutor getCommandExecutor() { @@ -515,7 +506,7 @@ public class RedissonRx implements RedissonRxClient { if (codec != connectionManager.getServiceManager().getCfg().getCodec()) { executorId = executorId + ":" + name; } - return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses, responsesLock); + return new RedissonRemoteService(codec, name, commandExecutor, executorId); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index d59602cc6..5b035bd2e 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -41,6 +41,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.*; import io.netty.util.internal.PlatformDependent; import org.redisson.ElementsSubscribeService; +import org.redisson.QueueTransferService; import org.redisson.Version; import org.redisson.api.NatMapper; import org.redisson.api.RFuture; @@ -56,6 +57,8 @@ import org.redisson.config.TransportMode; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RedisURI; +import org.redisson.misc.WrappedLock; +import org.redisson.remote.ResponseEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +140,12 @@ public class ServiceManager { private static final Map SHA_CACHE = new LRUCacheMap<>(500, 0, 0); + private final Map responses = new ConcurrentHashMap<>(); + + private final WrappedLock responsesLock = new WrappedLock(); + + private final QueueTransferService queueTransferService = new QueueTransferService(); + public ServiceManager(Config cfg) { Version.logVersion(); @@ -570,4 +579,15 @@ public class ServiceManager { return RedisCommands.HRANDFIELD; } + public Map getResponses() { + return responses; + } + + public WrappedLock getResponsesLock() { + return responsesLock; + } + + public QueueTransferService getQueueTransferService() { + return queueTransferService; + } } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index 2e66d659e..ee6015547 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -15,14 +15,16 @@ */ package org.redisson.executor; -import org.redisson.*; +import org.redisson.RedissonExecutorService; +import org.redisson.RedissonObject; +import org.redisson.RedissonRemoteService; +import org.redisson.RedissonShutdownException; import org.redisson.api.RFuture; import org.redisson.api.RMap; import org.redisson.api.executor.*; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; -import org.redisson.misc.WrappedLock; import org.redisson.remote.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +33,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -57,9 +58,8 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { private List successListeners; public RedissonExecutorRemoteService(Codec codec, String name, - CommandAsyncExecutor commandExecutor, String executorId, - ConcurrentMap responses, WrappedLock locked) { - super(codec, name, commandExecutor, executorId, responses, locked); + CommandAsyncExecutor commandExecutor, String executorId) { + super(codec, name, commandExecutor, executorId); } @Override diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index dc29ffd5d..0c7548a5a 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -15,7 +15,6 @@ */ package org.redisson.executor; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonExecutorService; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; @@ -25,11 +24,9 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.params.ScheduledParameters; import org.redisson.remote.RemoteServiceRequest; -import org.redisson.remote.ResponseEntry; import java.util.Arrays; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; /** * @@ -40,9 +37,8 @@ public class ScheduledTasksService extends TasksService { private String requestId; - public ScheduledTasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String redissonId, - ConcurrentMap responses, WrappedLock locked) { - super(codec, name, commandExecutor, redissonId, responses, locked); + public ScheduledTasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String redissonId) { + super(codec, name, commandExecutor, redissonId); } public void setRequestId(String requestId) { diff --git a/redisson/src/main/java/org/redisson/executor/TasksBatchService.java b/redisson/src/main/java/org/redisson/executor/TasksBatchService.java index b6f370e4d..63cee8fcb 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksBatchService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksBatchService.java @@ -15,15 +15,12 @@ */ package org.redisson.executor; -import org.redisson.misc.WrappedLock; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; -import org.redisson.remote.ResponseEntry; import java.util.List; -import java.util.concurrent.ConcurrentMap; /** * @@ -34,9 +31,8 @@ public class TasksBatchService extends TasksService { private final CommandBatchService batchCommandService; - public TasksBatchService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, - ConcurrentMap responses, WrappedLock locked) { - super(codec, name, commandExecutor, executorId, responses, locked); + public TasksBatchService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) { + super(codec, name, commandExecutor, executorId); batchCommandService = new CommandBatchService(commandExecutor); } diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index 378b94c3d..05d232181 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -17,7 +17,6 @@ package org.redisson.executor; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonExecutorService; import org.redisson.RedissonShutdownException; import org.redisson.api.RFuture; @@ -35,7 +34,6 @@ import org.redisson.executor.params.*; import org.redisson.misc.Hash; import org.redisson.misc.HashValue; import org.redisson.misc.Injector; -import org.redisson.remote.ResponseEntry; import java.io.ByteArrayInputStream; import java.io.ObjectInput; @@ -44,7 +42,6 @@ import java.util.Date; import java.util.Map; import java.util.TimeZone; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -74,17 +71,11 @@ public class TasksRunnerService implements RemoteExecutorService { private String tasksExpirationTimeName; private TasksInjector tasksInjector; - private ConcurrentMap responses; - private WrappedLock locked; - - public TasksRunnerService(CommandAsyncExecutor commandExecutor, RedissonClient redisson, Codec codec, String name, - ConcurrentMap responses, WrappedLock locked) { + + public TasksRunnerService(CommandAsyncExecutor commandExecutor, RedissonClient redisson, Codec codec, String name) { this.commandExecutor = commandExecutor; this.name = name; this.redisson = redisson; - this.responses = responses; - this.locked = locked; - this.codec = codec; } @@ -161,7 +152,7 @@ public class TasksRunnerService implements RemoteExecutorService { * @return */ private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed(String executorId, String requestId) { - ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses, locked); + ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId); scheduledRemoteService.setTerminationTopicName(terminationTopicName); scheduledRemoteService.setTasksCounterName(tasksCounterName); scheduledRemoteService.setStatusName(statusName); diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 0704fd8eb..f2514b083 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -15,7 +15,6 @@ */ package org.redisson.executor; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonExecutorService; import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; @@ -31,7 +30,6 @@ import org.redisson.remote.*; import java.util.Arrays; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; /** @@ -51,9 +49,8 @@ public class TasksService extends BaseRemoteService { protected String tasksExpirationTimeName; protected long tasksRetryInterval; - public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, - ConcurrentMap responses, WrappedLock locked) { - super(codec, name, commandExecutor, executorId, responses, locked); + public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) { + super(codec, name, commandExecutor, executorId); } public void setTasksExpirationTimeName(String tasksExpirationTimeName) { diff --git a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java index 881ec1e7e..99248c71e 100644 --- a/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/AsyncRemoteProxy.java @@ -15,7 +15,6 @@ */ package org.redisson.remote; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonBucket; import org.redisson.RedissonList; import org.redisson.RedissonMap; @@ -40,7 +39,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; /** @@ -53,9 +51,8 @@ public class AsyncRemoteProxy extends BaseRemoteProxy { protected final String cancelRequestMapName; public AsyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, - ConcurrentMap responses, Codec codec, String executorId, String cancelRequestMapName, - BaseRemoteService remoteService, WrappedLock locked) { - super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService, locked); + Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) { + super(commandExecutor, name, responseQueueName, codec, executorId, remoteService); this.cancelRequestMapName = cancelRequestMapName; } diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java index 96dd590e0..b3ab43c93 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java @@ -16,7 +16,6 @@ package org.redisson.remote; import io.netty.util.concurrent.ScheduledFuture; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonShutdownException; import org.redisson.api.RBlockingQueue; @@ -26,6 +25,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.misc.WrappedLock; import org.redisson.remote.ResponseEntry.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,18 +54,18 @@ public abstract class BaseRemoteProxy { final String executorId; final BaseRemoteService remoteService; final WrappedLock locked; - + BaseRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, - Map responses, Codec codec, String executorId, BaseRemoteService remoteService, WrappedLock locked) { + Codec codec, String executorId, BaseRemoteService remoteService) { super(); this.commandExecutor = commandExecutor; this.name = name; this.responseQueueName = responseQueueName; - this.responses = responses; + this.responses = commandExecutor.getServiceManager().getResponses(); this.codec = codec; this.executorId = executorId; this.remoteService = remoteService; - this.locked = locked; + this.locked = commandExecutor.getServiceManager().getResponsesLock(); } private final Map, String> requestQueueNameCache = new ConcurrentHashMap<>(); diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java index 7bc3fea5d..b8c164869 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteService.java @@ -20,7 +20,6 @@ import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonMap; import org.redisson.api.RBlockingQueue; @@ -66,20 +65,15 @@ public abstract class BaseRemoteService { protected final String cancelRequestMapName; protected final String cancelResponseMapName; protected final String responseQueueName; - private final ConcurrentMap responses; - private final WrappedLock locked; - public BaseRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, - ConcurrentMap responses, WrappedLock locked) { + public BaseRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId) { this.codec = codec; this.name = commandExecutor.getServiceManager().getConfig().getNameMapper().map(name); this.commandExecutor = commandExecutor; this.executorId = executorId; - this.responses = responses; this.cancelRequestMapName = "{" + name + ":remote" + "}:cancel-request"; this.cancelResponseMapName = "{" + name + ":remote" + "}:cancel-response"; this.responseQueueName = getResponseQueueName(executorId); - this.locked = locked; } public String getResponseQueueName(String executorId) { @@ -121,27 +115,27 @@ public abstract class BaseRemoteService { for (Annotation annotation : remoteInterface.getAnnotations()) { if (annotation.annotationType() == RRemoteAsync.class) { Class syncInterface = (Class) ((RRemoteAsync) annotation).value(); - AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, responses, - codec, executorId, cancelRequestMapName, this, locked); + AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, + codec, executorId, cancelRequestMapName, this); return proxy.create(remoteInterface, options, syncInterface); } if (annotation.annotationType() == RRemoteReactive.class) { Class syncInterface = (Class) ((RRemoteReactive) annotation).value(); - ReactiveRemoteProxy proxy = new ReactiveRemoteProxy(commandExecutor, name, responseQueueName, responses, - codec, executorId, cancelRequestMapName, this, locked); + ReactiveRemoteProxy proxy = new ReactiveRemoteProxy(commandExecutor, name, responseQueueName, + codec, executorId, cancelRequestMapName, this); return proxy.create(remoteInterface, options, syncInterface); } if (annotation.annotationType() == RRemoteRx.class) { Class syncInterface = (Class) ((RRemoteRx) annotation).value(); - RxRemoteProxy proxy = new RxRemoteProxy(commandExecutor, name, responseQueueName, responses, - codec, executorId, cancelRequestMapName, this, locked); + RxRemoteProxy proxy = new RxRemoteProxy(commandExecutor, name, responseQueueName, + codec, executorId, cancelRequestMapName, this); return proxy.create(remoteInterface, options, syncInterface); } } - SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, this, locked); + SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, codec, executorId, this); return proxy.create(remoteInterface, options); } diff --git a/redisson/src/main/java/org/redisson/remote/ReactiveRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/ReactiveRemoteProxy.java index 728abfae8..e1d5d23f3 100644 --- a/redisson/src/main/java/org/redisson/remote/ReactiveRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/ReactiveRemoteProxy.java @@ -15,19 +15,16 @@ */ package org.redisson.remote; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -import org.redisson.misc.WrappedLock; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.reactive.CommandReactiveExecutor; - import reactor.core.publisher.Mono; +import java.util.Arrays; +import java.util.List; + /** * * @author Nikita Koksharov @@ -36,10 +33,8 @@ import reactor.core.publisher.Mono; public class ReactiveRemoteProxy extends AsyncRemoteProxy { public ReactiveRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, - ConcurrentMap responses, Codec codec, String executorId, - String cancelRequestMapName, BaseRemoteService remoteService, WrappedLock locked) { - super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, - remoteService, locked); + Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) { + super(commandExecutor, name, responseQueueName, codec, executorId, cancelRequestMapName, remoteService); } @Override diff --git a/redisson/src/main/java/org/redisson/remote/RxRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/RxRemoteProxy.java index 612c58d1c..12a687c9b 100644 --- a/redisson/src/main/java/org/redisson/remote/RxRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/RxRemoteProxy.java @@ -15,21 +15,18 @@ */ package org.redisson.remote; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -import org.redisson.misc.WrappedLock; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; import org.redisson.client.codec.Codec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.executor.RemotePromise; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.rx.CommandRxExecutor; -import io.reactivex.rxjava3.core.Completable; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Maybe; -import io.reactivex.rxjava3.core.Single; +import java.util.Arrays; +import java.util.List; /** * @@ -39,10 +36,8 @@ import io.reactivex.rxjava3.core.Single; public class RxRemoteProxy extends AsyncRemoteProxy { public RxRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, - ConcurrentMap responses, Codec codec, String executorId, - String cancelRequestMapName, BaseRemoteService remoteService, WrappedLock locked) { - super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, - remoteService, locked); + Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) { + super(commandExecutor, name, responseQueueName, codec, executorId, cancelRequestMapName, remoteService); } @Override diff --git a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java index f1ed67786..3273f6e47 100644 --- a/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/SyncRemoteProxy.java @@ -15,7 +15,6 @@ */ package org.redisson.remote; -import org.redisson.misc.WrappedLock; import org.redisson.RedissonBucket; import org.redisson.api.RemoteInvocationOptions; import org.redisson.client.RedisException; @@ -37,8 +36,8 @@ import java.util.concurrent.*; public class SyncRemoteProxy extends BaseRemoteProxy { public SyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName, - ConcurrentMap responses, Codec codec, String executorId, BaseRemoteService remoteService, WrappedLock locked) { - super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService, locked); + Codec codec, String executorId, BaseRemoteService remoteService) { + super(commandExecutor, name, responseQueueName, codec, executorId, remoteService); } public T create(Class remoteInterface, RemoteInvocationOptions options) {