From 6786a22b943879306ba6d8816af6c67b76cc4185 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 11 May 2018 15:05:57 +0300 Subject: [PATCH] Fixed - RemoteService sync invocations aren't thread safe. #1433 --- .../java/org/redisson/BaseRemoteService.java | 79 +++++++++++-------- .../remote/RemoteServiceResponse.java | 1 + .../redisson/RedissonRemoteServiceTest.java | 37 +++++++++ 3 files changed, 84 insertions(+), 33 deletions(-) diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java index 902a4b713..692e670e8 100644 --- a/redisson/src/main/java/org/redisson/BaseRemoteService.java +++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java @@ -65,7 +65,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.PlatformDependent; -import io.netty.util.internal.ThreadLocalRandom; /** * @@ -568,7 +567,7 @@ public abstract class BaseRemoteService { } }); } - + private T sync(final Class remoteInterface, final RemoteInvocationOptions options) { // local copy of the options, to prevent mutation final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options); @@ -595,20 +594,53 @@ public abstract class BaseRemoteService { RemotePromise addPromise = new RemotePromise(requestId); RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy, System.currentTimeMillis()); - addAsync(requestQueueName, request, addPromise).sync(); - RBlockingQueue responseQueue = null; - if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) { - responseQueue = redisson.getBlockingQueue(responseQueueName, codec); + final RFuture ackFuture; + if (optionsCopy.isAckExpected()) { + ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false); + } else { + ackFuture = null; } - + + final RPromise responseFuture; + if (optionsCopy.isResultExpected()) { + responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false); + } else { + responseFuture = null; + } + + RFuture futureAdd = addAsync(requestQueueName, request, addPromise); + futureAdd.await(); + if (!futureAdd.isSuccess()) { + if (responseFuture != null) { + responseFuture.cancel(false); + } + if (ackFuture != null) { + ackFuture.cancel(false); + } + throw futureAdd.cause(); + } + + if (!futureAdd.get()) { + if (responseFuture != null) { + responseFuture.cancel(false); + } + if (ackFuture != null) { + ackFuture.cancel(false); + } + throw new RedisException("Task hasn't been added"); + } + // poll for the ack only if expected - if (optionsCopy.isAckExpected()) { + if (ackFuture != null) { String ackName = getAckName(requestId); - RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), - TimeUnit.MILLISECONDS); + ackFuture.await(); + RemoteServiceAck ack = ackFuture.getNow(); if (ack == null) { - ack = tryPollAckAgain(optionsCopy, responseQueue, ackName); + RFuture ackFutureAttempt = + tryPollAckAgainAsync(optionsCopy, ackName, requestId); + ackFutureAttempt.await(); + ack = ackFutureAttempt.getNow(); if (ack == null) { throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request); @@ -618,9 +650,9 @@ public abstract class BaseRemoteService { } // poll for the response only if expected - if (optionsCopy.isResultExpected()) { - RemoteServiceResponse response = (RemoteServiceResponse) responseQueue - .poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS); + if (responseFuture != null) { + responseFuture.awaitUninterruptibly(); + RemoteServiceResponse response = (RemoteServiceResponse) responseFuture.getNow(); if (response == null) { throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request); @@ -638,25 +670,6 @@ public abstract class BaseRemoteService { return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler); } - private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy, - RBlockingQueue responseQueue, String ackName) - throws InterruptedException { - RFuture ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if redis.call('setnx', KEYS[1], 1) == 1 then " - + "redis.call('pexpire', KEYS[1], ARGV[1]);" - + "return 0;" - + "end;" - + "redis.call('del', KEYS[1]);" - + "return 1;", - Arrays. asList(ackName), optionsCopy.getAckTimeoutInMillis()); - - ackClientsFuture.sync(); - if (ackClientsFuture.getNow()) { - return (RemoteServiceAck) responseQueue.poll(); - } - return null; - } - private RFuture tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy, String ackName, final RequestId requestId) { final RPromise promise = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java b/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java index 8a91b308f..c935ed303 100644 --- a/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java +++ b/redisson/src/main/java/org/redisson/remote/RemoteServiceResponse.java @@ -43,6 +43,7 @@ public class RemoteServiceResponse implements RRemoteServiceResponse, Serializab this.id = id; } + @Override public String getId() { return id; } diff --git a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java index 488383f3d..bfa1e4d42 100644 --- a/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRemoteServiceTest.java @@ -5,10 +5,14 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.io.NotSerializableException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -16,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RFuture; +import org.redisson.api.RRemoteService; import org.redisson.api.RedissonClient; import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.annotation.RRemoteAsync; @@ -213,6 +218,38 @@ public class RedissonRemoteServiceTest extends BaseTest { } } + + @Test + public void testConcurrentInvocations() { + ExecutorService executorService = Executors.newFixedThreadPool(2); + RRemoteService remoteService = redisson.getRemoteService(); + remoteService.register(RemoteInterface.class, new RemoteImpl()); + RemoteInterface service = redisson.getRemoteService().get(RemoteInterface.class); + + List> futures = new ArrayList<>(); + + int iterations = 1000; + AtomicBoolean bool = new AtomicBoolean(); + for (int i = 0; i < iterations; i++) { + futures.add(executorService.submit(() -> { + try { + if (ThreadLocalRandom.current().nextBoolean()) { + service.resultMethod(1L); + } else { + service.methodOverload(); + } + } catch (Exception e) { + bool.set(true); + } + })); + } + + while (!futures.stream().allMatch(Future::isDone)) {} + + assertThat(bool.get()).isFalse(); + remoteService.deregister(RemoteInterface.class); + } + @Test public void testCancelAsync() throws InterruptedException {