diff --git a/src/main/java/org/redisson/RedissonExecutorService.java b/src/main/java/org/redisson/RedissonExecutorService.java index 29657106e..cba86a6ac 100644 --- a/src/main/java/org/redisson/RedissonExecutorService.java +++ b/src/main/java/org/redisson/RedissonExecutorService.java @@ -98,7 +98,7 @@ public class RedissonExecutorService implements RExecutorService { this.name = name; this.redisson = redisson; - String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}"; + String objectName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}"; tasksCounter = redisson.getAtomicLong(objectName + ":counter"); status = redisson.getBucket(objectName + ":status"); topic = redisson.getTopic(objectName + ":topic"); @@ -114,7 +114,7 @@ public class RedissonExecutorService implements RExecutorService { @Override public void registerExecutors(int executors) { - String objectName = name + ":{"+ RemoteExecutorService.class.getName() + "}"; + String objectName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}"; RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, objectName); service.setStatusName(status.getName()); diff --git a/src/main/java/org/redisson/RedissonRemoteService.java b/src/main/java/org/redisson/RedissonRemoteService.java index 81f54cdac..1ee3158dd 100644 --- a/src/main/java/org/redisson/RedissonRemoteService.java +++ b/src/main/java/org/redisson/RedissonRemoteService.java @@ -111,11 +111,23 @@ public class RedissonRemoteService implements RRemoteService { } for (int i = 0; i < executorsAmount; i++) { - String requestQueueName = name + ":{" + remoteInterface.getName() + "}"; + String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); subscribe(remoteInterface, requestQueue); } } + + private String getAckName(Class remoteInterface, String requestId) { + return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":ack"; + } + + private String getResponseQueueName(Class remoteInterface, String requestId) { + return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId; + } + + private String getRequestQueueName(Class remoteInterface) { + return "{" + name + ":" + remoteInterface.getName() + "}"; + } private Codec getCodec() { if (codec != null) { @@ -159,7 +171,7 @@ public class RedissonRemoteService implements RRemoteService { } final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName())); - final String responseName = name + ":{" + remoteInterface.getName() + "}:" + request.getRequestId(); + final String responseName = getResponseQueueName(remoteInterface, request.getRequestId()); // send the ack only if expected if (request.getOptions().isAckExpected()) { @@ -306,8 +318,8 @@ public class RedissonRemoteService implements RRemoteService { final String requestId = generateRequestId(); - final String requestQueueName = name + ":{" + syncInterface.getName() + "}"; - final String responseName = name + ":{" + syncInterface.getName() + "}:" + requestId; + final String requestQueueName = getRequestQueueName(syncInterface); + final String responseName = getResponseQueueName(syncInterface, requestId); final String ackName = getAckName(syncInterface, requestId); final RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); @@ -475,7 +487,7 @@ public class RedissonRemoteService implements RRemoteService { String requestId = generateRequestId(); - String requestQueueName = name + ":{" + interfaceName + "}"; + String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec()); RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, optionsCopy, System.currentTimeMillis()); @@ -483,7 +495,7 @@ public class RedissonRemoteService implements RRemoteService { RBlockingQueue responseQueue = null; if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { - String responseName = name + ":{" + interfaceName + "}:" + requestId; + String responseName = getResponseQueueName(remoteInterface, requestId); responseQueue = redisson.getBlockingQueue(responseName, getCodec()); } @@ -519,10 +531,6 @@ public class RedissonRemoteService implements RRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler); } - private String getAckName(Class remoteInterface, String requestId) { - return name + ":{" + remoteInterface.getName() + "}:" + requestId + ":ack"; - } - private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, RBlockingQueue responseQueue, String ackName) throws InterruptedException { Future ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,