Fixed - RemoteService sync invocations aren't thread safe. #1433

pull/1461/head
Nikita 7 years ago
parent 28b0c49acb
commit 6786a22b94

@ -65,7 +65,6 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
@ -568,7 +567,7 @@ public abstract class BaseRemoteService {
}
});
}
private <T> T sync(final Class<T> remoteInterface, final RemoteInvocationOptions options) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
@ -595,20 +594,53 @@ public abstract class BaseRemoteService {
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy,
System.currentTimeMillis());
addAsync(requestQueueName, request, addPromise).sync();
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
final RFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
} else {
responseFuture = null;
}
RFuture<Boolean> futureAdd = addAsync(requestQueueName, request, addPromise);
futureAdd.await();
if (!futureAdd.isSuccess()) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
throw futureAdd.cause();
}
if (!futureAdd.get()) {
if (responseFuture != null) {
responseFuture.cancel(false);
}
if (ackFuture != null) {
ackFuture.cancel(false);
}
throw new RedisException("Task hasn't been added");
}
// poll for the ack only if expected
if (optionsCopy.isAckExpected()) {
if (ackFuture != null) {
String ackName = getAckName(requestId);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(),
TimeUnit.MILLISECONDS);
ackFuture.await();
RemoteServiceAck ack = ackFuture.getNow();
if (ack == null) {
ack = tryPollAckAgain(optionsCopy, responseQueue, ackName);
RFuture<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackName, requestId);
ackFutureAttempt.await();
ack = ackFutureAttempt.getNow();
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after "
+ optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
@ -618,9 +650,9 @@ public abstract class BaseRemoteService {
}
// poll for the response only if expected
if (optionsCopy.isResultExpected()) {
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue
.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (responseFuture != null) {
responseFuture.awaitUninterruptibly();
RemoteServiceResponse response = (RemoteServiceResponse) responseFuture.getNow();
if (response == null) {
throw new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
@ -638,25 +670,6 @@ public abstract class BaseRemoteService {
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName)
throws InterruptedException {
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.sync();
if (ackClientsFuture.getNow()) {
return (RemoteServiceAck) responseQueue.poll();
}
return null;
}
private RFuture<RemoteServiceAck> tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy,
String ackName, final RequestId requestId) {
final RPromise<RemoteServiceAck> promise = new RedissonPromise<RemoteServiceAck>();

@ -43,6 +43,7 @@ public class RemoteServiceResponse implements RRemoteServiceResponse, Serializab
this.id = id;
}
@Override
public String getId() {
return id;
}

@ -5,10 +5,14 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -16,6 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
@ -213,6 +218,38 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
}
@Test
public void testConcurrentInvocations() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
RRemoteService remoteService = redisson.getRemoteService();
remoteService.register(RemoteInterface.class, new RemoteImpl());
RemoteInterface service = redisson.getRemoteService().get(RemoteInterface.class);
List<Future<?>> futures = new ArrayList<>();
int iterations = 1000;
AtomicBoolean bool = new AtomicBoolean();
for (int i = 0; i < iterations; i++) {
futures.add(executorService.submit(() -> {
try {
if (ThreadLocalRandom.current().nextBoolean()) {
service.resultMethod(1L);
} else {
service.methodOverload();
}
} catch (Exception e) {
bool.set(true);
}
}));
}
while (!futures.stream().allMatch(Future::isDone)) {}
assertThat(bool.get()).isFalse();
remoteService.deregister(RemoteInterface.class);
}
@Test
public void testCancelAsync() throws InterruptedException {

Loading…
Cancel
Save