Merge branch 'master' into 3.0.0

pull/1303/head
Nikita 7 years ago
commit e4253374fd

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
@ -58,7 +59,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
@ -196,24 +196,7 @@ public abstract class BaseRemoteService {
final String requestQueueName = getRequestQueueName(syncInterface);
final RFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
} else {
responseFuture = null;
}
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args,
optionsCopy, System.currentTimeMillis());
final Long ackTimeout = request.getOptions().getAckTimeoutInMillis();
final Long ackTimeout = optionsCopy.getAckTimeoutInMillis();
final RemotePromise<Object> result = new RemotePromise<Object>(requestId) {
@ -233,17 +216,20 @@ public abstract class BaseRemoteService {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(responseQueueName, LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('lrem', KEYS[3], 1, ARGV[1]);"
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
// + "redis.call('lrem', KEYS[3], 1, ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object> asList(ackName, responseQueueName, requestQueueName),
requestId, ackTimeout);
Arrays.<Object> asList(ackName),
// Arrays.<Object> asList(ackName, responseQueueName, requestQueueName),
ackTimeout);
boolean ackNotSent = commandExecutor.get(future);
if (ackNotSent) {
RList<Object> list = redisson.getList(requestQueueName, LongCodec.INSTANCE);
list.remove(requestId.toString());
super.cancel(mayInterruptIfRunning);
return true;
}
@ -269,7 +255,7 @@ public abstract class BaseRemoteService {
return false;
}
cancelExecution(optionsCopy, mayInterruptIfRunning, this, responseFuture);
cancelExecution(optionsCopy, mayInterruptIfRunning, this);
try {
awaitUninterruptibly(60, TimeUnit.SECONDS);
@ -280,6 +266,23 @@ public abstract class BaseRemoteService {
}
};
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args,
optionsCopy, System.currentTimeMillis());
final RFuture<RemoteServiceAck> ackFuture;
if (optionsCopy.isAckExpected()) {
ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, false);
} else {
ackFuture = null;
}
final RPromise<RRemoteServiceResponse> responseFuture;
if (optionsCopy.isResultExpected()) {
responseFuture = poll(optionsCopy.getExecutionTimeoutInMillis(), requestId, false);
} else {
responseFuture = null;
}
RFuture<Boolean> addFuture = addAsync(requestQueueName, request, result);
addFuture.addListener(new FutureListener<Boolean>() {
@ -589,12 +592,10 @@ public abstract class BaseRemoteService {
RequestId requestId = generateRequestId();
String requestQueueName = getRequestQueueName(remoteInterface);
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
RemoteServiceRequest request = new RemoteServiceRequest(executorId, requestId.toString(), method.getName(), getMethodSignatures(method), args, optionsCopy,
System.currentTimeMillis());
RemotePromise<Object> addPromise = new RemotePromise<Object>(requestId);
addAsync(requestQueueName, request, addPromise);
addPromise.getAddFuture().sync();
addAsync(requestQueueName, request, addPromise).sync();
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
@ -736,30 +737,13 @@ public abstract class BaseRemoteService {
return new RequestId(id);
}
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,
RemotePromise<Object> result) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[1], ARGV[1]); "
+ "return 1;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
request.getId(), encode(request));
result.setAddFuture(future);
return future;
}
protected abstract RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,
RemotePromise<Object> result);
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('lrem', KEYS[1], 1, ARGV[1]); "
+ "redis.call('hset', KEYS[2], ARGV[1]);"
+ "return 1;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
taskId.toString());
}
protected abstract RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId);
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise, RFuture<RRemoteServiceResponse> responseFuture) {
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, codec);
canceledRequests.putAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
@ -768,7 +752,7 @@ public abstract class BaseRemoteService {
if (!optionsCopy.isResultExpected()) {
RemoteInvocationOptions options = new RemoteInvocationOptions(optionsCopy);
options.expectResultWithin(60, TimeUnit.SECONDS);
responseFuture = poll(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false);
RFuture<RRemoteServiceResponse> responseFuture = poll(options.getExecutionTimeoutInMillis(), remotePromise.getRequestId(), false);
awaitResultAsync(options, remotePromise, responseFuture);
}
}

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
@ -37,6 +38,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RRemoteServiceResponse;
@ -71,6 +73,30 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
}
@Override
protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request,
RemotePromise<Object> result) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[1], ARGV[1]); "
+ "return 1;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
request.getId(), encode(request));
result.setAddFuture(future);
return future;
}
@Override
protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
return commandExecutor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"redis.call('lrem', KEYS[1], 1, ARGV[1]); "
+ "redis.call('hset', KEYS[2], ARGV[1]);"
+ "return 1;",
Arrays.<Object>asList(requestQueueName, requestQueueName + ":tasks"),
taskId.toString());
}
@Override
public <T> void register(Class<T> remoteInterface, T object) {
@ -177,10 +203,11 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
final RemoteServiceRequest request = future.getNow();
long elapsedTime = System.currentTimeMillis() - request.getDate();
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request
if (request.getOptions().isAckExpected() && elapsedTime > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", request.getId(), elapsedTime);
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
return;
@ -194,14 +221,16 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
// + "redis.call('rpush', KEYS[2], ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(ackName, responseName),
encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());
Arrays.<Object>asList(ackName),
request.getOptions().getAckTimeoutInMillis());
// Arrays.<Object>asList(ackName, responseName),
// encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -220,8 +249,32 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
subscribe(remoteInterface, requestQueue, executor);
return;
}
RList<Object> list = redisson.getList(responseName, codec);
RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
addFuture.addListener(new FutureListener<Boolean>() {
executeMethod(remoteInterface, requestQueue, executor, request);
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
return;
}
if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
return;
}
executeMethod(remoteInterface, requestQueue, executor, request);
}
});
}
});
} else {

Loading…
Cancel
Save