@ -17,12 +17,11 @@ package org.redisson;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingQueue;
@ -65,10 +64,33 @@ import io.netty.util.internal.PlatformDependent;
public class RedissonRemoteService extends BaseRemoteService implements RRemoteService {
public static class Entry {
RFuture<String> future;
final AtomicInteger counter;
public Entry(int workers) {
counter = new AtomicInteger(workers);
public void setFuture(RFuture<String> future) {
this.future = future;
public RFuture<String> getFuture() {
return future;
public AtomicInteger getCounter() {
return counter;
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Set<RFuture<String>>> futures = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Entry> remoteMap = PlatformDependent.newConcurrentHashMap();
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
@ -110,20 +132,19 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
Set<RFuture<String>> removedFutures = futures.remove(remoteInterface);
if (removedFutures == null) {
for (RFuture<String> future : removedFutures) {
Entry entry = remoteMap.remove(remoteInterface);
if (entry != null && entry.getFuture() != null) {
public int getFreeWorkers(Class<?> remoteInterface) {
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
return futuresSet.size();
Entry entry = remoteMap.remove(remoteInterface);
if (entry == null) {
return 0;
return entry.getCounter().get();
@ -144,32 +165,28 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
Set<RFuture<String>> values = Collections.newSetFromMap(PlatformDependent.<RFuture<String>, Boolean>newConcurrentHashMap());
futures.put(remoteInterface, values);
remoteMap.put(remoteInterface, new Entry(workers));
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
for (int i = 0; i < workers; i++) {
subscribe(remoteInterface, requestQueue, executor);
subscribe(remoteInterface, requestQueue, executor);
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<String> requestQueue,
final ExecutorService executor) {
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
if (futuresSet == null) {
final Entry entry = remoteMap.get(remoteInterface);
if (entry == null) {
final RFuture<String> take = requestQueue.takeAsync();
take.addListener(new FutureListener<String>() {
public void operationComplete(Future<String> future) throws Exception {
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
if (futuresSet == null) {
Entry entry = remoteMap.get(remoteInterface);
if (entry == null) {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
@ -185,6 +202,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue);
if (entry.getCounter().get() == 0) {
if (entry.getCounter().decrementAndGet() > 0) {
subscribe(remoteInterface, requestQueue, executor);
final String requestId = future.getNow();
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
@ -197,16 +222,18 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.error("Can't process the remote service request with id " + requestId, future.cause());
// re-subscribe after a failed takeAsync
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
final RemoteServiceRequest request = future.getNow();
if (request == null) {
log.debug("Task can't be found for request: {}", requestId);
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
@ -215,8 +242,9 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
if (request.getOptions().isAckExpected() && elapsedTime > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", request.getId(), elapsedTime);
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
@ -247,13 +275,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
@ -269,13 +298,14 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
@ -374,7 +404,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
public void operationComplete(Future<Void> future) throws Exception {
// interface has been deregistered
if (futures.get(remoteInterface) == null) {
if (!remoteMap.containsKey(remoteInterface)) {
@ -386,12 +416,17 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// re-subscribe anyways (fail or success) after the send
// (response)
subscribe(remoteInterface, requestQueue, executor);
resubscribe(remoteInterface, requestQueue, executor);
} else {
resubscribe(remoteInterface, requestQueue, executor);
private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor) {
if (remoteMap.get(remoteInterface).getCounter().getAndIncrement() == 0) {
// re-subscribe anyways after the method invocation
subscribe(remoteInterface, requestQueue, executor);