diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 74ff0f9c1..b81fad744 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -23,7 +23,6 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -59,6 +58,7 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.ThreadLocalRandom; /** @@ -381,26 +381,45 @@ public abstract class BaseRemoteService { private RPromise poll(final long timeout, final String requestId, final String responseName) { final RPromise responseFuture = new RedissonPromise(); - ResponseEntry entry = responses.get(responseName); - if (entry == null) { - entry = new ResponseEntry(); - ResponseEntry oldEntry = responses.putIfAbsent(responseName, entry); - if (oldEntry != null) { - entry = oldEntry; + + ResponseEntry entry; + synchronized (responses) { + entry = responses.get(responseName); + if (entry == null) { + entry = new ResponseEntry(); + ResponseEntry oldEntry = responses.putIfAbsent(responseName, entry); + if (oldEntry != null) { + entry = oldEntry; + } } + + final ConcurrentMap> entryResponses = entry.getResponses(); + entryResponses.put(requestId, responseFuture); } - - final ConcurrentMap> responses = entry.getResponses(); - - commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { + + ScheduledFuture future = commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() { @Override public void run() { - if (!responseFuture.isDone() && responses.remove(requestId, responseFuture)) { - responseFuture.trySuccess(null); + synchronized (responses) { + ResponseEntry entry = responses.get(responseName); + if (entry == null) { + return; + } + + ConcurrentMap> entryResponses = entry.getResponses(); + RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); + if (responseFuture.tryFailure(ex)) { + entry.getTimeouts().remove(requestId); + entryResponses.remove(requestId, responseFuture); + if (entry.getResponses().isEmpty()) { + responses.remove(responseName, entry); + } + } } } }, timeout, TimeUnit.MILLISECONDS); - responses.put(requestId, responseFuture); + entry.getTimeouts().put(requestId, future); + pollTasks(entry, responseName); return responseFuture; } @@ -422,19 +441,29 @@ public abstract class BaseRemoteService { } RRemoteServiceResponse response = future.getNow(); - RPromise promise = (RPromise) entry.getResponses().remove(response.getId()); + RPromise promise; + synchronized (responses) { + ResponseEntry entry = responses.get(responseName); + if (entry == null) { + return; + } + + ConcurrentMap> entryResponses = entry.getResponses(); + promise = (RPromise) entryResponses.remove(response.getId()); + java.util.concurrent.ScheduledFuture timeoutFuture = entry.getTimeouts().remove(response.getId()); + timeoutFuture.cancel(false); + + if (entryResponses.isEmpty()) { + responses.remove(responseName, entry); + } else { + responseQueue.takeAsync().addListener(this); + } + } + if (promise != null) { promise.trySuccess(response); } - if (!entry.getResponses().isEmpty()) { - responseQueue.takeAsync().addListener(this); - } else { - entry.getStarted().set(false); - if (!entry.getResponses().isEmpty()) { - pollTasks(entry, responseName); - } - } } }); } @@ -493,7 +522,7 @@ public abstract class BaseRemoteService { RemoteServiceResponse response = (RemoteServiceResponse) responseQueue .poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); if (response == null) { - throw new RemoteServiceTimeoutException("No response1 after " + throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); } if (response.getError() != null) { diff --git a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java index f973152f7..52d3e6dda 100644 --- a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java +++ b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java @@ -16,6 +16,7 @@ package org.redisson.remote; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.misc.RPromise; @@ -30,8 +31,14 @@ import io.netty.util.internal.PlatformDependent; public class ResponseEntry { private final ConcurrentMap> responses = PlatformDependent.newConcurrentHashMap(); + private final ConcurrentMap> timeouts = PlatformDependent.newConcurrentHashMap(); private final AtomicBoolean started = new AtomicBoolean(); + + public ConcurrentMap> getTimeouts() { + return timeouts; + } + public ConcurrentMap> getResponses() { return responses; }