diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 2c4a4f483..75c2e10fa 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -23,6 +23,7 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -60,6 +61,7 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.ThreadLocalRandom; /** @@ -71,12 +73,18 @@ public abstract class BaseRemoteService { private static final Logger log = LoggerFactory.getLogger(BaseRemoteService.class); + private final Map, String> requestQueueNameCache = PlatformDependent.newConcurrentHashMap(); + private final Map> methodSignaturesCache = PlatformDependent.newConcurrentHashMap(); + protected final Codec codec; protected final RedissonClient redisson; protected final String name; protected final CommandAsyncExecutor commandExecutor; protected final String executorId; + protected final String cancelRequestMapName; + protected final String cancelResponseMapName; + protected final String responseQueueName; private final ConcurrentMap responses; public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap responses) { @@ -86,30 +94,26 @@ public abstract class BaseRemoteService { this.commandExecutor = commandExecutor; this.executorId = executorId; this.responses = responses; + this.cancelRequestMapName = "{" + name + ":remote" + "}:cancel-request"; + this.cancelResponseMapName = "{" + name + ":remote" + "}:cancel-response"; + this.responseQueueName = getResponseQueueName(executorId); } - protected String getCancelRequestMapName(Class remoteInterface) { - // return "{" + name + ":" + remoteInterface.getName() + "}:cancel-request"; - return "{" + name + ":remote" + "}:cancel-request"; + protected String getResponseQueueName(String executorId) { + return "{remote_response}:" + executorId; } - protected String getCancelResponseMapName(Class remoteInterface) { - // return "{" + name + ":" + remoteInterface.getName() + "}:cancel-response"; - return "{" + name + ":remote" + "}:cancel-response"; - } - - protected String getAckName(Class remoteInterface, String requestId) { - // return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":ack"; + protected String getAckName(String requestId) { return "{" + name + ":remote" + "}:" + requestId + ":ack"; } - protected String getResponseQueueName(Class remoteInterface, String executorId) { - // return "{" + name + ":" + remoteInterface.getName() + "}:" + executorId; - return "{remote_response}:" + executorId; - } - public String getRequestQueueName(Class remoteInterface) { - return "{" + name + ":" + remoteInterface.getName() + "}"; + String str = requestQueueNameCache.get(remoteInterface); + if (str == null) { + str = "{" + name + ":" + remoteInterface.getName() + "}"; + requestQueueNameCache.put(remoteInterface, str); + } + return str; } protected ByteBuf encode(Object obj) { @@ -164,17 +168,17 @@ public abstract class BaseRemoteService { final Class syncInterface) { // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); - final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" - + generateRequestId(); InvocationHandler handler = new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + final String requestId = generateRequestId(); + if (method.getName().equals("toString")) { - return toString; + return getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId; } else if (method.getName().equals("equals")) { return proxy == args[0]; } else if (method.getName().equals("hashCode")) { - return toString.hashCode(); + return (getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + requestId).hashCode(); } if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) @@ -182,13 +186,9 @@ public abstract class BaseRemoteService { throw new IllegalArgumentException("The noResult option only supports void return value"); } - final String requestId = generateRequestId(); - final String requestQueueName = getRequestQueueName(syncInterface); - final String responseName = getResponseQueueName(syncInterface, executorId); - final String ackName = getAckName(syncInterface, requestId); - final RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); final RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId, method.getName(), getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis()); @@ -204,8 +204,10 @@ public abstract class BaseRemoteService { return false; } + if (optionsCopy.isAckExpected()) { - RFuture future = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE, + String ackName = getAckName(requestId); + RFuture future = commandExecutor.evalWriteAsync(responseQueueName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[2]);" @@ -214,7 +216,7 @@ public abstract class BaseRemoteService { + "return 1;" + "end;" + "return 0;", - Arrays. asList(ackName, responseName, requestQueueName), + Arrays. asList(ackName, responseQueueName, requestQueueName), encode(request), request.getOptions().getAckTimeoutInMillis()); boolean ackNotSent = commandExecutor.get(future); @@ -223,19 +225,20 @@ public abstract class BaseRemoteService { return true; } - return cancel(syncInterface, requestId, request, mayInterruptIfRunning); + return cancel(requestId, request, mayInterruptIfRunning); } + RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); boolean removed = commandExecutor.get(removeAsync(requestQueue, request)); if (removed) { super.cancel(mayInterruptIfRunning); return true; } - return cancel(syncInterface, requestId, request, mayInterruptIfRunning); + return cancel(requestId, request, mayInterruptIfRunning); } - private boolean cancel(Class remoteInterface, String requestId, RemoteServiceRequest request, + private boolean cancel(String requestId, RemoteServiceRequest request, boolean mayInterruptIfRunning) { if (isCancelled()) { return true; @@ -245,8 +248,7 @@ public abstract class BaseRemoteService { return false; } - String canceRequestName = getCancelRequestMapName(remoteInterface); - cancelExecution(optionsCopy, responseName, request, mayInterruptIfRunning, canceRequestName, this); + cancelExecution(optionsCopy, request, mayInterruptIfRunning, this); try { awaitUninterruptibly(60, TimeUnit.SECONDS); @@ -268,7 +270,7 @@ public abstract class BaseRemoteService { } if (optionsCopy.isAckExpected()) { - RPromise ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), request.getId(), responseName); + RPromise ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId); ackFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -279,8 +281,9 @@ public abstract class BaseRemoteService { RemoteServiceAck ack = future.getNow(); if (ack == null) { + final String ackName = getAckName(requestId); RFuture ackFutureAttempt = - tryPollAckAgainAsync(optionsCopy, ackName, request.getId(), responseName); + tryPollAckAgainAsync(optionsCopy, ackName, request.getId()); ackFutureAttempt.addListener(new FutureListener() { @Override @@ -300,17 +303,17 @@ public abstract class BaseRemoteService { return; } - awaitResultAsync(optionsCopy, result, request, responseName, ackName); + awaitResultAsync(optionsCopy, result, request, ackName); } }); } else { - awaitResultAsync(optionsCopy, result, request, responseName); + awaitResultAsync(optionsCopy, result, request); } } }); } else { - awaitResultAsync(optionsCopy, result, request, responseName); + awaitResultAsync(optionsCopy, result, request); } } }); @@ -323,7 +326,7 @@ public abstract class BaseRemoteService { } private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RemoteServiceRequest request, final String responseName, final String ackName) { + final RemoteServiceRequest request, final String ackName) { RFuture deleteFuture = redisson.getBucket(ackName).deleteAsync(); deleteFuture.addListener(new FutureListener() { @Override @@ -333,19 +336,20 @@ public abstract class BaseRemoteService { return; } - awaitResultAsync(optionsCopy, result, request, responseName); + awaitResultAsync(optionsCopy, result, request); } }); } protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RemoteServiceRequest request, final String responseName) { + final RemoteServiceRequest request) { // poll for the response only if expected if (!optionsCopy.isResultExpected()) { return; } - RPromise responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), request.getId(), responseName); + String requestId = request.getId(); + RPromise responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId); responseFuture.addListener(new FutureListener() { @@ -358,7 +362,7 @@ public abstract class BaseRemoteService { if (future.getNow() == null) { RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " - + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); + + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + requestId); result.tryFailure(e); return; } @@ -380,16 +384,16 @@ public abstract class BaseRemoteService { } private RPromise poll(final long timeout, - final String requestId, final String responseName) { + String requestId) { final RPromise responseFuture = new RedissonPromise(); final Key key = new Key(requestId); ResponseEntry entry; synchronized (responses) { - entry = responses.get(responseName); + entry = responses.get(responseQueueName); if (entry == null) { entry = new ResponseEntry(); - ResponseEntry oldEntry = responses.putIfAbsent(responseName, entry); + ResponseEntry oldEntry = responses.putIfAbsent(responseQueueName, entry); if (oldEntry != null) { entry = oldEntry; } @@ -403,7 +407,7 @@ public abstract class BaseRemoteService { @Override public void run() { synchronized (responses) { - ResponseEntry entry = responses.get(responseName); + ResponseEntry entry = responses.get(responseQueueName); if (entry == null) { return; } @@ -413,7 +417,7 @@ public abstract class BaseRemoteService { entry.getTimeouts().remove(key); entry.getResponses().remove(key, responseFuture); if (entry.getResponses().isEmpty()) { - responses.remove(responseName, entry); + responses.remove(responseQueueName, entry); } } } @@ -421,30 +425,30 @@ public abstract class BaseRemoteService { }, timeout, TimeUnit.MILLISECONDS); entry.getTimeouts().put(key, future); - pollTasks(entry, responseName); + pollTasks(entry); return responseFuture; } - private void pollTasks(final ResponseEntry entry, final String responseName) { + private void pollTasks(final ResponseEntry entry) { if (!entry.getStarted().compareAndSet(false, true)) { return; } - final RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName, codec); + RBlockingQueue responseQueue = redisson.getBlockingQueue(responseQueueName, codec); RFuture future = responseQueue.takeAsync(); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - log.error("Can't get response from " + responseName, future.cause()); + log.error("Can't get response from " + responseQueueName, future.cause()); return; } RRemoteServiceResponse response = future.getNow(); RPromise promise; synchronized (responses) { - ResponseEntry entry = responses.get(responseName); + ResponseEntry entry = responses.get(responseQueueName); if (entry == null) { return; } @@ -456,8 +460,9 @@ public abstract class BaseRemoteService { timeoutFuture.cancel(false); if (entryResponses.isEmpty()) { - responses.remove(responseName, entry); + responses.remove(responseQueueName, entry); } else { + RBlockingQueue responseQueue = redisson.getBlockingQueue(responseQueueName, codec); responseQueue.takeAsync().addListener(this); } } @@ -500,13 +505,12 @@ public abstract class BaseRemoteService { RBlockingQueue responseQueue = null; if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { - String responseName = getResponseQueueName(remoteInterface, executorId); - responseQueue = redisson.getBlockingQueue(responseName, codec); + responseQueue = redisson.getBlockingQueue(responseQueueName, codec); } // poll for the ack only if expected if (optionsCopy.isAckExpected()) { - String ackName = getAckName(remoteInterface, requestId); + String ackName = getAckName(requestId); RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); if (ack == null) { @@ -560,8 +564,7 @@ public abstract class BaseRemoteService { } private RFuture tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy, - String ackName, final String requestId, final String responseName) - throws InterruptedException { + String ackName, final String requestId) { final RPromise promise = new RedissonPromise(); RFuture ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " @@ -580,7 +583,7 @@ public abstract class BaseRemoteService { } if (future.getNow()) { - RPromise ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, responseName); + RPromise ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId); ackFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -651,9 +654,9 @@ public abstract class BaseRemoteService { return requestQueue.removeAsync(request); } - private void cancelExecution(RemoteInvocationOptions optionsCopy, String responseName, - RemoteServiceRequest request, boolean mayInterruptIfRunning, String canceRequestName, RemotePromise remotePromise) { - RMap canceledRequests = redisson.getMap(canceRequestName, codec); + private void cancelExecution(RemoteInvocationOptions optionsCopy, + RemoteServiceRequest request, boolean mayInterruptIfRunning, RemotePromise remotePromise) { + RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); canceledRequests.putAsync(request.getId(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); @@ -661,15 +664,23 @@ public abstract class BaseRemoteService { if (!optionsCopy.isResultExpected()) { RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy); options.expectResultWithin(60, TimeUnit.SECONDS); - awaitResultAsync(options, remotePromise, request, responseName); + awaitResultAsync(options, remotePromise, request); } } protected List getMethodSignatures(Method method) { - List list = new ArrayList(method.getParameterTypes().length); - for (Class t : method.getParameterTypes()) { - list.add(t.getName()); + List result = methodSignaturesCache.get(method); + if (result == null) { + result = new ArrayList(method.getParameterTypes().length); + for (Class t : method.getParameterTypes()) { + result.add(t.getName()); + } + List oldList = methodSignaturesCache.putIfAbsent(method, result); + if (oldList != null) { + result = oldList; + } } - return list; + + return result; } } diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index ff7fa397e..1145e1eed 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -68,7 +68,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, executorId, responses); } - + @Override public void register(Class remoteInterface, T object) { register(remoteInterface, object, 1); @@ -166,11 +166,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } - final String responseName = getResponseQueueName(remoteInterface, request.getExecutorId()); + final String responseName = getResponseQueueName(request.getExecutorId()); // send the ack only if expected if (request.getOptions().isAckExpected()) { - String ackName = getAckName(remoteInterface, request.getId()); + String ackName = getAckName(request.getId()); RFuture ackClientsFuture = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then " @@ -215,13 +215,13 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS private void executeMethod(final Class remoteInterface, final RBlockingQueue requestQueue, final ExecutorService executor, final RemoteServiceRequest request) { final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignatures())); - final String responseName = getResponseQueueName(remoteInterface, request.getExecutorId()); + final String responseName = getResponseQueueName(request.getExecutorId()); final AtomicReference responseHolder = new AtomicReference(); final RPromise cancelRequestFuture = new RedissonPromise(); - scheduleCheck(getCancelRequestMapName(remoteInterface), request.getId(), cancelRequestFuture); + scheduleCheck(cancelRequestMapName, request.getId(), cancelRequestFuture); final java.util.concurrent.Future submitFuture = executor.submit(new Runnable() { @Override @@ -247,8 +247,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // could be removed not from future object if (future.getNow().isSendResponse()) { - String cancelResponseName = getCancelResponseMapName(remoteInterface); - RMap map = redisson.getMap(cancelResponseName, codec); + RMap map = redisson.getMap(cancelResponseMapName, codec); map.putAsync(request.getId(), response); map.expireAsync(60, TimeUnit.SECONDS); } diff --git a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java index 12b93dc51..e0e476357 100644 --- a/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java +++ b/redisson/src/main/java/org/redisson/executor/ScheduledTasksService.java @@ -115,7 +115,7 @@ public class ScheduledTasksService extends TasksService { @Override protected void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result, - final RemoteServiceRequest request, final String responseName) { + final RemoteServiceRequest request) { if (!optionsCopy.isResultExpected()) { return; } @@ -129,11 +129,11 @@ public class ScheduledTasksService extends TasksService { commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { @Override public void run() { - ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request, responseName); + ScheduledTasksService.super.awaitResultAsync(optionsCopy, result, request); } }, (long)(delay - delay*0.10), TimeUnit.MILLISECONDS); } else { - super.awaitResultAsync(optionsCopy, result, request, responseName); + super.awaitResultAsync(optionsCopy, result, request); } } diff --git a/redisson/src/main/java/org/redisson/executor/TasksService.java b/redisson/src/main/java/org/redisson/executor/TasksService.java index 0c3e83d33..5dd665b1e 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksService.java @@ -140,7 +140,6 @@ public class TasksService extends BaseRemoteService { public RFuture cancelExecutionAsync(final String requestId) { final Class syncInterface = RemoteExecutorService.class; - String requestQueueName = getRequestQueueName(syncInterface); if (!redisson.getMap(tasksName, LongCodec.INSTANCE).containsKey(requestId)) { return RedissonPromise.newSucceededFuture(false); @@ -148,6 +147,7 @@ public class TasksService extends BaseRemoteService { final RPromise result = new RedissonPromise(); + String requestQueueName = getRequestQueueName(syncInterface); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, codec); RemoteServiceRequest request = new RemoteServiceRequest(requestId); @@ -163,13 +163,10 @@ public class TasksService extends BaseRemoteService { if (future.getNow()) { result.trySuccess(true); } else { - String cancelRequestMapName = getCancelRequestMapName(syncInterface); - RMap canceledRequests = redisson.getMap(cancelRequestMapName, codec); canceledRequests.putAsync(requestId, new RemoteServiceCancelRequest(true, true)); canceledRequests.expireAsync(60, TimeUnit.SECONDS); - String cancelResponseMapName = getCancelResponseMapName(syncInterface); final RPromise response = new RedissonPromise(); scheduleCheck(cancelResponseMapName, requestId, response); response.addListener(new FutureListener() { diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java index 981d019b1..1a1df17b0 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceRequest.java @@ -46,11 +46,6 @@ public class RemoteServiceRequest implements Serializable { this.id = id; } - public RemoteServiceRequest(String executorId, String id) { - this.id = id; - this.executorId = executorId; - } - public RemoteServiceRequest(String executorId, String id, String methodName, List signatures, Object[] args, RemoteInvocationOptions options, long date) { super(); this.id = id;