diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 391a470ec..36afbdd1e 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -17,12 +17,11 @@ package org.redisson; import java.lang.reflect.Method; import java.util.Arrays; -import java.util.Collections; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RBlockingQueue; @@ -65,10 +64,33 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonRemoteService extends BaseRemoteService implements RRemoteService { + public static class Entry { + + RFuture future; + final AtomicInteger counter; + + public Entry(int workers) { + counter = new AtomicInteger(workers); + } + + public void setFuture(RFuture future) { + this.future = future; + } + + public RFuture getFuture() { + return future; + } + + public AtomicInteger getCounter() { + return counter; + } + + } + private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class); private final Map beans = PlatformDependent.newConcurrentHashMap(); - private final Map, Set>> futures = PlatformDependent.newConcurrentHashMap(); + private final Map, Entry> remoteMap = PlatformDependent.newConcurrentHashMap(); public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap responses) { super(codec, redisson, name, commandExecutor, executorId, responses); @@ -110,20 +132,19 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS beans.remove(key); } - Set> removedFutures = futures.remove(remoteInterface); - if (removedFutures == null) { - return; - } - - for (RFuture future : removedFutures) { - future.cancel(false); + Entry entry = remoteMap.remove(remoteInterface); + if (entry != null && entry.getFuture() != null) { + entry.getFuture().cancel(false); } } @Override public int getFreeWorkers(Class remoteInterface) { - Set> futuresSet = futures.get(remoteInterface); - return futuresSet.size(); + Entry entry = remoteMap.remove(remoteInterface); + if (entry == null) { + return 0; + } + return entry.getCounter().get(); } @Override @@ -144,32 +165,28 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } } - Set> values = Collections.newSetFromMap(PlatformDependent., Boolean>newConcurrentHashMap()); - futures.put(remoteInterface, values); + remoteMap.put(remoteInterface, new Entry(workers)); String requestQueueName = getRequestQueueName(remoteInterface); RBlockingQueue requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE); - for (int i = 0; i < workers; i++) { - subscribe(remoteInterface, requestQueue, executor); - } + subscribe(remoteInterface, requestQueue, executor); } private void subscribe(final Class remoteInterface, final RBlockingQueue requestQueue, final ExecutorService executor) { - Set> futuresSet = futures.get(remoteInterface); - if (futuresSet == null) { + final Entry entry = remoteMap.get(remoteInterface); + if (entry == null) { return; } final RFuture take = requestQueue.takeAsync(); - futuresSet.add(take); + entry.setFuture(take); take.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { - Set> futuresSet = futures.get(remoteInterface); - if (futuresSet == null) { + public void operationComplete(Future future) throws Exception { + Entry entry = remoteMap.get(remoteInterface); + if (entry == null) { return; } - futuresSet.remove(take); if (!future.isSuccess()) { if (future.cause() instanceof RedissonShutdownException) { @@ -184,6 +201,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS // do not subscribe now, see // https://github.com/mrniko/redisson/issues/493 // subscribe(remoteInterface, requestQueue); + + if (entry.getCounter().get() == 0) { + return; + } + + if (entry.getCounter().decrementAndGet() > 0) { + subscribe(remoteInterface, requestQueue, executor); + } final String requestId = future.getNow(); RMap tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec)); @@ -197,16 +222,18 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } log.error("Can't process the remote service request with id " + requestId, future.cause()); + // re-subscribe after a failed takeAsync - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } final RemoteServiceRequest request = future.getNow(); if (request == null) { log.debug("Task can't be found for request: {}", requestId); + // re-subscribe after a skipped ackTimeout - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } @@ -215,8 +242,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS if (request.getOptions().isAckExpected() && elapsedTime > request .getOptions().getAckTimeoutInMillis()) { log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", request.getId(), elapsedTime); + // re-subscribe after a skipped ackTimeout - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } @@ -247,13 +275,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } log.error("Can't send ack for request: " + request, future.cause()); + // re-subscribe after a failed send (ack) - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } if (!future.getNow()) { - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } @@ -269,13 +298,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS return; } log.error("Can't send ack for request: " + request, future.cause()); + // re-subscribe after a failed send (ack) - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } if (!future.getNow()) { - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); return; } @@ -374,7 +404,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS @Override public void operationComplete(Future future) throws Exception { // interface has been deregistered - if (futures.get(remoteInterface) == null) { + if (!remoteMap.containsKey(remoteInterface)) { return; } @@ -386,12 +416,17 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS future.cause()); } - // re-subscribe anyways (fail or success) after the send - // (response) - subscribe(remoteInterface, requestQueue, executor); + resubscribe(remoteInterface, requestQueue, executor); } }); } else { + resubscribe(remoteInterface, requestQueue, executor); + } + } + + private void resubscribe(Class remoteInterface, RBlockingQueue requestQueue, + ExecutorService executor) { + if (remoteMap.get(remoteInterface).getCounter().getAndIncrement() == 0) { // re-subscribe anyways after the method invocation subscribe(remoteInterface, requestQueue, executor); } diff --git a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java index 61f685dcf..fbe9d254b 100644 --- a/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java +++ b/redisson/src/test/java/org/redisson/executor/RedissonExecutorServiceTest.java @@ -299,7 +299,7 @@ public class RedissonExecutorServiceTest extends BaseTest { e.execute(new RunnableTask()); } e.shutdown(); - assertThat(e.awaitTermination(1000, TimeUnit.MILLISECONDS)).isTrue(); + assertThat(e.awaitTermination(1500, TimeUnit.MILLISECONDS)).isTrue(); } @Test