Ability to cancel task in running state. #208

pull/578/head^2
Nikita 9 years ago
parent a2393b0417
commit f8d9d514a3

@ -30,7 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -106,7 +106,7 @@ public class RedissonExecutorService implements RExecutorService {
remoteService.setTasksCounterName(tasksCounter.getName());
remoteService.setStatusName(status.getName());
asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck());
asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(Integer.MAX_VALUE * 2));
asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
}
@ -116,7 +116,7 @@ public class RedissonExecutorService implements RExecutorService {
}
@Override
public void registerWorkers(int executors, Executor executor) {
public void registerWorkers(int executors, ExecutorService executor) {
RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName);
service.setStatusName(status.getName());
service.setTasksCounterName(tasksCounter.getName());

@ -20,13 +20,17 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.concurrent.Future;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonQueueSemaphore extends RedissonSemaphore {
private String queueName;

@ -23,7 +23,7 @@ import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -31,17 +31,18 @@ import org.redisson.api.RBatch;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.remote.RRemoteAsync;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
@ -65,25 +66,25 @@ import io.netty.util.internal.ThreadLocalRandom;
*/
public class RedissonRemoteService implements RRemoteService {
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
protected final Codec codec;
protected final Redisson redisson;
protected final String name;
protected final CommandExecutor commandExecutor;
public RedissonRemoteService(Redisson redisson, CommandExecutor commandExecutor) {
this(redisson, "redisson_remote_service", commandExecutor);
this(redisson, "redisson_rs", commandExecutor);
}
public RedissonRemoteService(Redisson redisson, String name, CommandExecutor commandExecutor) {
this(null, redisson, name, commandExecutor);
}
public RedissonRemoteService(Codec codec, Redisson redisson, CommandExecutor commandExecutor) {
this(codec, redisson, "redisson_remote_service", commandExecutor);
this(codec, redisson, "redisson_rs", commandExecutor);
}
public RedissonRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) {
@ -97,13 +98,14 @@ public class RedissonRemoteService implements RRemoteService {
public <T> void register(Class<T> remoteInterface, T object) {
register(remoteInterface, object, 1);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int workersAmount) {
register(remoteInterface, object, workersAmount, null);
}
public <T> void register(Class<T> remoteInterface, T object, int workersAmount, Executor executor) {
@Override
public <T> void register(Class<T> remoteInterface, T object, int workersAmount, ExecutorService executor) {
if (workersAmount < 1) {
throw new IllegalArgumentException("executorsAmount can't be lower than 1");
}
@ -114,7 +116,7 @@ public class RedissonRemoteService implements RRemoteService {
return;
}
}
for (int i = 0; i < workersAmount; i++) {
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
@ -122,6 +124,10 @@ public class RedissonRemoteService implements RRemoteService {
}
}
private String getCancelRequestQueueName(Class<?> remoteInterface, String requestId) {
return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":cancel";
}
private String getAckName(Class<?> remoteInterface, String requestId) {
return "{" + name + ":" + remoteInterface.getName() + "}:" + requestId + ":ack";
}
@ -133,7 +139,7 @@ public class RedissonRemoteService implements RRemoteService {
private String getRequestQueueName(Class<?> remoteInterface) {
return "{" + name + ":" + remoteInterface.getName() + "}";
}
private Codec getCodec() {
if (codec != null) {
return codec;
@ -149,7 +155,8 @@ public class RedissonRemoteService implements RRemoteService {
}
}
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue, final Executor executor) {
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
final ExecutorService executor) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
take.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
@ -163,35 +170,39 @@ public class RedissonRemoteService implements RRemoteService {
return;
}
// do not subscribe now, see https://github.com/mrniko/redisson/issues/493
// do not subscribe now, see
// https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue);
final RemoteServiceRequest request = future.getNow();
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) {
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
return;
}
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
final RemoteServiceMethod method = beans
.get(new RemoteServiceKey(remoteInterface, request.getMethodName()));
final String responseName = getResponseQueueName(remoteInterface, request.getRequestId());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(remoteInterface, request.getRequestId());
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
+ "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName),
encode(new RemoteServiceAck()),
request.getOptions().getAckTimeoutInMillis());
Future<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
+ "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object> asList(ackName, responseName),
encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -210,61 +221,90 @@ public class RedissonRemoteService implements RRemoteService {
return;
}
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor);
}
});
} else {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor);
}
runMethod(remoteInterface, requestQueue, executor, request, method, responseName);
}
});
} else {
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor);
}
});
} else {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor);
}
runMethod(remoteInterface, requestQueue, executor, request, method, responseName);
}
}
});
}
private <T> void invokeMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
final RemoteServiceRequest request, RemoteServiceMethod method, String responseName, final Executor executor) {
final AtomicReference<RemoteServiceResponse> responseHolder = new AtomicReference<RemoteServiceResponse>();
private <T> void runMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
final ExecutorService executor, final RemoteServiceRequest request, final RemoteServiceMethod method,
final String responseName) {
if (executor != null) {
RBlockingQueue<RemoteServiceCancelRequest> cancelRequestQueue =
redisson.getBlockingQueue(getCancelRequestQueueName(remoteInterface, request.getRequestId()), getCodec());
final Future<RemoteServiceCancelRequest> cancelRequestFuture = cancelRequestQueue.takeAsync();
final AtomicReference<RRemoteServiceResponse> responseHolder = new AtomicReference<RRemoteServiceResponse>();
final java.util.concurrent.Future<?> submitFuture = executor.submit(new Runnable() {
@Override
public void run() {
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor,
cancelRequestFuture, responseHolder);
}
});
cancelRequestFuture.addListener(new FutureListener<RemoteServiceCancelRequest>() {
@Override
public void operationComplete(Future<RemoteServiceCancelRequest> future) throws Exception {
if (!future.isSuccess()) {
return;
}
boolean res = submitFuture.cancel(future.getNow().isMayInterruptIfRunning());
if (res) {
responseHolder.compareAndSet(null, new RemoteServiceCancelResponse());
}
}
});
} else {
final AtomicReference<RRemoteServiceResponse> responseHolder = new AtomicReference<RRemoteServiceResponse>();
invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, null, responseHolder);
}
}
private <T> void invokeMethod(final Class<T> remoteInterface,
final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, final ExecutorService executor,
Future<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
RemoteServiceResponse response = new RemoteServiceResponse(result);
responseHolder.set(response);
responseHolder.compareAndSet(null, response);
} catch (Exception e) {
RemoteServiceResponse response = new RemoteServiceResponse(e.getCause());
responseHolder.set(response);
responseHolder.compareAndSet(null, response);
log.error("Can't execute: " + request, e);
}
if (cancelRequestFuture != null) {
cancelRequestFuture.cancel(false);
}
// send the response only if expected
if (request.getOptions().isResultExpected()) {
Future<List<?>> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName, responseHolder.get());
Future<List<?>> clientsFuture = send(request.getOptions().getExecutionTimeoutInMillis(), responseName,
responseHolder.get());
clientsFuture.addListener(new FutureListener<List<?>>() {
@Override
public void operationComplete(Future<List<?>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't send response: " + responseHolder.get() + " for request: " + request, future.cause());
log.error("Can't send response: " + responseHolder.get() + " for request: " + request,
future.cause());
if (future.cause() instanceof RedissonShutdownException) {
return;
}
}
// re-subscribe anyways (fail or success) after the send (response)
// re-subscribe anyways (fail or success) after the send
// (response)
subscribe(remoteInterface, requestQueue, executor);
}
});
@ -281,15 +321,14 @@ public class RedissonRemoteService implements RRemoteService {
@Override
public <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit) {
return get(remoteInterface, RemoteInvocationOptions.defaults()
.expectResultWithin(executionTimeout, executionTimeUnit));
return get(remoteInterface,
RemoteInvocationOptions.defaults().expectResultWithin(executionTimeout, executionTimeUnit));
}
@Override
public <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit,
long ackTimeout, TimeUnit ackTimeUnit) {
return get(remoteInterface, RemoteInvocationOptions.defaults()
.expectAckWithin(ackTimeout, ackTimeUnit)
public <T> T get(Class<T> remoteInterface, long executionTimeout, TimeUnit executionTimeUnit, long ackTimeout,
TimeUnit ackTimeUnit) {
return get(remoteInterface, RemoteInvocationOptions.defaults().expectAckWithin(ackTimeout, ackTimeUnit)
.expectResultWithin(executionTimeout, executionTimeUnit));
}
@ -297,31 +336,34 @@ public class RedissonRemoteService implements RRemoteService {
public <T> T get(Class<T> remoteInterface, RemoteInvocationOptions options) {
for (Annotation annotation : remoteInterface.getAnnotations()) {
if (annotation.annotationType() == RRemoteAsync.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteAsync)annotation).value();
Class<T> syncInterface = (Class<T>) ((RRemoteAsync) annotation).value();
for (Method m : remoteInterface.getMethods()) {
try {
syncInterface.getMethod(m.getName(), m.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Method '" + m.getName() + "' with params '" + Arrays.toString(m.getParameterTypes())
+ "' isn't defined in " + syncInterface);
throw new IllegalArgumentException("Method '" + m.getName() + "' with params '"
+ Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
if (!m.getReturnType().getClass().isInstance(Future.class)) {
throw new IllegalArgumentException(m.getReturnType().getClass() + " isn't allowed as return type");
throw new IllegalArgumentException(
m.getReturnType().getClass() + " isn't allowed as return type");
}
}
return async(remoteInterface, options, syncInterface);
}
}
return sync(remoteInterface, options);
}
private <T> T async(final Class<T> remoteInterface, final RemoteInvocationOptions options, final Class<?> syncInterface) {
private <T> T async(final Class<T> remoteInterface, final RemoteInvocationOptions options,
final Class<?> syncInterface) {
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-"
+ generateRequestId();
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
@ -333,10 +375,8 @@ public class RedissonRemoteService implements RRemoteService {
return toString.hashCode();
}
if (!optionsCopy.isResultExpected()
&& !(method.getReturnType().equals(Void.class)
|| method.getReturnType().equals(Void.TYPE)
|| method.getReturnType().equals(Future.class))) {
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class)
|| method.getReturnType().equals(Void.TYPE) || method.getReturnType().equals(Future.class))) {
throw new IllegalArgumentException("The noResult option only supports void return value");
}
@ -345,33 +385,64 @@ public class RedissonRemoteService implements RRemoteService {
final String requestQueueName = getRequestQueueName(syncInterface);
final String responseName = getResponseQueueName(syncInterface, requestId);
final String ackName = getAckName(syncInterface, requestId);
final RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
final RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(), args, optionsCopy, System.currentTimeMillis());
final RemotePromise<Object> result = new RemotePromise<Object>(ImmediateEventExecutor.INSTANCE.newPromise()) {
final RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName,
getCodec());
final RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args,
optionsCopy, System.currentTimeMillis());
final RemotePromise<Object> result = new RemotePromise<Object>(commandExecutor.getConnectionManager().newPromise()) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isCancelled()) {
return true;
}
if (isDone()) {
return false;
}
if (optionsCopy.isAckExpected()) {
Future<Boolean> future = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE,
Future<Boolean> future = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('lrem', KEYS[3], 1, ARGV[1]);"
+ "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName, requestQueueName),
encode(request),
request.getOptions().getAckTimeoutInMillis());
return commandExecutor.get(future);
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('lrem', KEYS[3], 1, ARGV[1]);"
+ "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object> asList(ackName, responseName, requestQueueName),
encode(request), request.getOptions().getAckTimeoutInMillis());
boolean ackNotSent = commandExecutor.get(future);
if (ackNotSent) {
return true;
}
return cancel(syncInterface, requestId, request, mayInterruptIfRunning);
}
return requestQueue.remove(request);
boolean removed = requestQueue.remove(request);
if (removed) {
return true;
}
return cancel(syncInterface, requestId, request, mayInterruptIfRunning);
}
private boolean cancel(Class<?> remoteInterface, String requestId, RemoteServiceRequest request,
boolean mayInterruptIfRunning) {
RBlockingQueueAsync<RemoteServiceCancelRequest> cancelRequestQueue = redisson.getBlockingQueue(getCancelRequestQueueName(remoteInterface, requestId), getCodec());
cancelRequestQueue.putAsync(new RemoteServiceCancelRequest(mayInterruptIfRunning));
cancelRequestQueue.expireAsync(60, TimeUnit.SECONDS);
awaitUninterruptibly();
return isCancelled();
}
};
Future<Boolean> addFuture = addAsync(requestQueue, request, result);
addFuture.addListener(new FutureListener<Boolean>() {
@ -381,16 +452,10 @@ public class RedissonRemoteService implements RRemoteService {
result.tryFailure(future.cause());
return;
}
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
responseQueue = redisson.getBlockingQueue(responseName, getCodec());
} else {
responseQueue = null;
}
if (optionsCopy.isAckExpected()) {
Future<RemoteServiceAck> ackFuture = (Future<RemoteServiceAck>) responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
RBlockingQueue<RemoteServiceAck> responseQueue = redisson.getBlockingQueue(responseName, getCodec());
Future<RemoteServiceAck> ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
@ -398,51 +463,54 @@ public class RedissonRemoteService implements RRemoteService {
result.tryFailure(future.cause());
return;
}
RemoteServiceAck ack = future.getNow();
if (ack == null) {
Future<RemoteServiceAck> ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName);
Future<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, responseQueue, ackName);
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
public void operationComplete(Future<RemoteServiceAck> future)
throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (future.getNow() == null) {
Exception ex = new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
Exception ex = new RemoteServiceAckTimeoutException(
"No ACK response after "
+ optionsCopy.getAckTimeoutInMillis()
+ "ms for request: " + request);
result.tryFailure(ex);
return;
}
invokeAsync(optionsCopy, result, request, responseQueue, ackName);
invokeAsync(optionsCopy, result, request, responseName, ackName);
}
});
} else {
invokeAsync(optionsCopy, result, request, responseQueue);
invokeAsync(optionsCopy, result, request, responseName);
}
}
});
} else {
invokeAsync(optionsCopy, result, request, responseQueue);
}
invokeAsync(optionsCopy, result, request, responseName);
}
}
});
return result;
}
};
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
private void invokeAsync(final RemoteInvocationOptions optionsCopy,
final Promise<Object> result, final RemoteServiceRequest request,
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue,
final String ackName) {
private void invokeAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String responseName, final String ackName) {
Future<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener<Boolean>() {
@Override
@ -451,50 +519,57 @@ public class RedissonRemoteService implements RRemoteService {
result.tryFailure(future.cause());
return;
}
invokeAsync(optionsCopy, result, request, responseQueue);
invokeAsync(optionsCopy, result, request, responseName);
}
});
}
private void invokeAsync(final RemoteInvocationOptions optionsCopy, final Promise<Object> result,
final RemoteServiceRequest request,
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue) {
private void invokeAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise<Object> result,
final RemoteServiceRequest request, final String responseName) {
// poll for the response only if expected
if (optionsCopy.isResultExpected()) {
Future<RemoteServiceResponse> responseFuture = (Future<RemoteServiceResponse>) responseQueue.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
responseFuture.addListener(new FutureListener<RemoteServiceResponse>() {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseName, getCodec());
Future<RRemoteServiceResponse> responseFuture = responseQueue
.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
responseFuture.addListener(new FutureListener<RRemoteServiceResponse>() {
@Override
public void operationComplete(Future<RemoteServiceResponse> future)
throws Exception {
public void operationComplete(Future<RRemoteServiceResponse> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (future.getNow() == null) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
result.tryFailure(e);
return;
}
if (future.getNow().getError() != null) {
result.tryFailure(future.getNow().getError());
if (future.getNow() instanceof RemoteServiceCancelResponse) {
result.doCancel();
return;
}
result.trySuccess(future.getNow().getResult());
RemoteServiceResponse response = (RemoteServiceResponse) future.getNow();
if (response.getError() != null) {
result.tryFailure(response.getError());
return;
}
result.trySuccess(response.getResult());
}
});
}
}
private <T> T sync(final Class<T> remoteInterface, final RemoteInvocationOptions options) {
final String interfaceName = remoteInterface.getName();
// local copy of the options, to prevent mutation
final RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-" + generateRequestId();
final String toString = getClass().getSimpleName() + "-" + remoteInterface.getSimpleName() + "-proxy-"
+ generateRequestId();
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
@ -506,15 +581,17 @@ public class RedissonRemoteService implements RRemoteService {
return toString.hashCode();
}
if (!optionsCopy.isResultExpected() && !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
if (!optionsCopy.isResultExpected()
&& !(method.getReturnType().equals(Void.class) || method.getReturnType().equals(Void.TYPE)))
throw new IllegalArgumentException("The noResult option only supports void return value");
String requestId = generateRequestId();
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(), args, optionsCopy, System.currentTimeMillis());
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName,
getCodec());
RemoteServiceRequest request = new RemoteServiceRequest(requestId, method.getName(), args, optionsCopy,
System.currentTimeMillis());
requestQueue.add(request);
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
@ -526,11 +603,13 @@ public class RedissonRemoteService implements RRemoteService {
// poll for the ack only if expected
if (optionsCopy.isAckExpected()) {
String ackName = getAckName(remoteInterface, requestId);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
RemoteServiceAck ack = (RemoteServiceAck) responseQueue.poll(optionsCopy.getAckTimeoutInMillis(),
TimeUnit.MILLISECONDS);
if (ack == null) {
ack = tryPollAckAgain(optionsCopy, responseQueue, ackName);
if (ack == null) {
throw new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
throw new RemoteServiceAckTimeoutException("No ACK response after "
+ optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
}
}
redisson.getBucket(ackName).delete();
@ -538,9 +617,11 @@ public class RedissonRemoteService implements RRemoteService {
// poll for the response only if expected
if (optionsCopy.isResultExpected()) {
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
RemoteServiceResponse response = (RemoteServiceResponse) responseQueue
.poll(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (response == null) {
throw new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
throw new RemoteServiceTimeoutException("No response1 after "
+ optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
}
if (response.getError() != null) {
throw response.getError();
@ -552,39 +633,40 @@ public class RedissonRemoteService implements RRemoteService {
}
};
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[]{remoteInterface}, handler);
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName) throws InterruptedException {
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName), optionsCopy.getAckTimeoutInMillis());
RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName)
throws InterruptedException {
Future<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.sync();
if (ackClientsFuture.getNow()) {
return (RemoteServiceAck) responseQueue.poll();
}
return null;
}
private Future<RemoteServiceAck> tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue, String ackName)
final RBlockingQueue<RemoteServiceAck> responseQueue, String ackName)
throws InterruptedException {
final Promise<RemoteServiceAck> promise = ImmediateEventExecutor.INSTANCE.newPromise();
Future<Boolean> ackClientsFuture = redisson.getScript().evalAsync(ackName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
Future<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
RScript.ReturnType.BOOLEAN, Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
+ "end;"
+ "redis.call('del', KEYS[1]);"
+ "return 1;",
Arrays.<Object> asList(ackName), optionsCopy.getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -592,9 +674,9 @@ public class RedissonRemoteService implements RRemoteService {
promise.setFailure(future.cause());
return;
}
if (future.getNow()) {
Future<RemoteServiceAck> pollFuture = (Future<RemoteServiceAck>) responseQueue.pollAsync();
Future<RemoteServiceAck> pollFuture = responseQueue.pollAsync();
pollFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
@ -602,7 +684,7 @@ public class RedissonRemoteService implements RRemoteService {
promise.setFailure(future.cause());
return;
}
promise.setSuccess(future.getNow());
}
});
@ -629,9 +711,11 @@ public class RedissonRemoteService implements RRemoteService {
return batch.executeAsync();
}
protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request, RemotePromise<Object> result) {
protected Future<Boolean> addAsync(RBlockingQueue<RemoteServiceRequest> requestQueue, RemoteServiceRequest request,
RemotePromise<Object> result) {
Future<Boolean> future = requestQueue.addAsync(request);
result.setAddFuture(future);
return future;
}
}

@ -51,6 +51,6 @@ public interface RExecutorService extends ExecutorService {
*
* @param workers - workers amount
*/
void registerWorkers(int workers, Executor executor);
void registerWorkers(int workers, ExecutorService executor);
}

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
@ -83,7 +84,7 @@ public interface RRemoteService {
* @param object
* @param workersAmount
*/
<T> void register(Class<T> remoteInterface, T object, int workersAmount, Executor executor);
<T> void register(Class<T> remoteInterface, T object, int workersAmount, ExecutorService executor);
/**
* Get remote service object for remote invocations.

@ -82,7 +82,7 @@ public class RemoteInvocationOptions implements Serializable {
public static RemoteInvocationOptions defaults() {
return new RemoteInvocationOptions()
.expectAckWithin(1, TimeUnit.SECONDS)
.expectResultWithin(20, TimeUnit.SECONDS);
.expectResultWithin(30, TimeUnit.SECONDS);
}
public Long getAckTimeoutInMillis() {

@ -62,10 +62,6 @@ public class RedisConnection implements RedisCommands {
return (C) channel.attr(RedisConnection.CONNECTION).get();
}
public void removeCurrentCommand() {
channel.attr(CommandsQueue.CURRENT_COMMAND).remove();
}
public CommandData getCurrentCommand() {
QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get();
if (command instanceof CommandData) {

@ -15,6 +15,11 @@
*/
package org.redisson.client;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisException extends RuntimeException {
private static final long serialVersionUID = 3389820652701696154L;

@ -160,7 +160,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private void reattachBlockingQueue(RedisConnection connection, final CommandData<?, ?> commandData) {
if (commandData == null
|| !commandData.isBlockingCommand()) {
|| !commandData.isBlockingCommand()
|| commandData.getPromise().isDone()) {
return;
}

@ -89,16 +89,26 @@ public class CommandAsyncService implements CommandAsyncExecutor {
l.countDown();
}
});
try {
l.await();
} catch (InterruptedException e) {
boolean interrupted = false;
while (!future.isDone()) {
try {
l.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
// commented out due to blocking issues up to 200 ms per minute for each thread
// future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
@ -509,9 +519,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getTimeout().cancel();
int timeoutTime = connectionManager.getConfig().getTimeout();
long timeoutTime = connectionManager.getConfig().getTimeout();
if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
handleBlockingOperations(details, connection, popTimeout);
if (popTimeout == 0) {
return;
@ -521,7 +531,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
timeoutTime += 1000;
}
final int timeoutAmount = timeoutTime;
final long timeoutAmount = timeoutTime;
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
@ -535,7 +545,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setTimeout(timeout);
}
private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection, Integer popTimeout) {
private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection, Long popTimeout) {
final FutureListener<Boolean> listener = new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -551,7 +561,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
scheduledFuture = connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
// there is no re-connection was made
// re-connection wasn't made
// and connection is still active
if (orignalChannel == connection.getChannel()
&& connection.isActive()) {
@ -590,17 +600,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
});
details.getAttemptPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isCancelled()) {
// command should be removed due to
// ConnectionWatchdog blockingQueue reconnection logic
connection.removeCurrentCommand();
}
}
});
synchronized (listener) {
if (!details.getMainPromise().isDone()) {
connectionManager.getShutdownPromise().addListener(listener);
@ -717,7 +716,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
((RedisClientResult)res).setRedisClient(addr);
}
details.getMainPromise().setSuccess(res);
details.getMainPromise().trySuccess(res);
} else {
details.getMainPromise().tryFailure(future.cause());
}

@ -40,5 +40,9 @@ public class RemotePromise<T> extends PromiseDelegator<T> {
public Future<Boolean> getAddFuture() {
return addFuture;
}
public void doCancel() {
super.cancel(true);
}
}

@ -17,6 +17,11 @@ package org.redisson.remote;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public interface RRemoteServiceResponse extends Serializable {
}

@ -25,4 +25,6 @@ import java.io.Serializable;
*/
public class RemoteServiceAck implements RRemoteServiceResponse, Serializable {
private static final long serialVersionUID = -6332680404562746984L;
}

@ -0,0 +1,43 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceCancelRequest implements Serializable {
private static final long serialVersionUID = -4800574267648904260L;
private boolean mayInterruptIfRunning;
public RemoteServiceCancelRequest() {
}
public RemoteServiceCancelRequest(boolean mayInterruptIfRunning) {
super();
this.mayInterruptIfRunning = mayInterruptIfRunning;
}
public boolean isMayInterruptIfRunning() {
return mayInterruptIfRunning;
}
}

@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceCancelResponse implements RRemoteServiceResponse, Serializable {
private static final long serialVersionUID = -4356901222132702182L;
}

@ -15,6 +15,11 @@
*/
package org.redisson.remote;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceKey {
private final Class<?> serviceInterface;

@ -17,6 +17,11 @@ package org.redisson.remote;
import java.lang.reflect.Method;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceMethod {
private final Object bean;

@ -20,8 +20,15 @@ import java.util.Arrays;
import org.redisson.api.RemoteInvocationOptions;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceRequest implements Serializable {
private static final long serialVersionUID = -1711385312384040075L;
private String requestId;
private String methodName;
private Object[] args;

@ -17,8 +17,15 @@ package org.redisson.remote;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceResponse implements RRemoteServiceResponse, Serializable {
private static final long serialVersionUID = -1958922748139674253L;
private Object result;
private Throwable error;

@ -69,6 +69,8 @@ public class RedissonRemoteServiceTest extends BaseTest {
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {
Future<Void> cancelMethod();
Future<Void> voidMethod(String name, Long param);
Future<Long> resultMethod(Long value);
@ -102,6 +104,8 @@ public class RedissonRemoteServiceTest extends BaseTest {
public interface RemoteInterface {
void cancelMethod() throws InterruptedException;
void voidMethod(String name, Long param);
Long resultMethod(Long value);
@ -120,6 +124,27 @@ public class RedissonRemoteServiceTest extends BaseTest {
public class RemoteImpl implements RemoteInterface {
private AtomicInteger iterations;
public RemoteImpl() {
}
public RemoteImpl(AtomicInteger iterations) {
super();
this.iterations = iterations;
}
@Override
public void cancelMethod() throws InterruptedException {
for (long i = 0; i < Long.MAX_VALUE; i++) {
iterations.incrementAndGet();
if (Thread.interrupted()) {
System.out.println("interrupted! " + i);
return;
}
}
}
@Override
public void voidMethod(String name, Long param) {
System.out.println(name + " " + param);
@ -160,6 +185,31 @@ public class RedissonRemoteServiceTest extends BaseTest {
}
}
@Test
public void testCancelAsync() throws InterruptedException {
RedissonClient r1 = createInstance();
AtomicInteger iterations = new AtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
r1.getKeys().flushall();
r1.getRemoteSerivce().register(RemoteInterface.class, new RemoteImpl(iterations), 1, executor);
RedissonClient r2 = createInstance();
RemoteInterfaceAsync ri = r2.getRemoteSerivce().get(RemoteInterfaceAsync.class);
Future<Void> f = ri.cancelMethod();
Thread.sleep(500);
assertThat(f.cancel(true)).isTrue();
executor.shutdown();
r1.shutdown();
r2.shutdown();
assertThat(iterations.get()).isLessThan(Integer.MAX_VALUE / 2);
assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
}
@Test(expected = IllegalArgumentException.class)
public void testWrongMethodAsync() throws InterruptedException {
redisson.getRemoteSerivce().get(RemoteInterfaceWrongMethodAsync.class);

@ -59,7 +59,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
String invokeResult = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()));
assertThat(invokeResult).isEqualTo(CallableTask.RESULT);
String a = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()), 1, TimeUnit.SECONDS);
String a = e.invokeAny(Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask()), 5, TimeUnit.SECONDS);
assertThat(a).isEqualTo(CallableTask.RESULT);
List<CallableTask> invokeAllParams = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask());
@ -70,7 +70,7 @@ public class RedissonExecutorServiceTest extends BaseTest {
}
List<CallableTask> invokeAllParams1 = Arrays.asList(new CallableTask(), new CallableTask(), new CallableTask());
List<Future<String>> allResult1 = e.invokeAll(invokeAllParams1, 1, TimeUnit.SECONDS);
List<Future<String>> allResult1 = e.invokeAll(invokeAllParams1, 5, TimeUnit.SECONDS);
assertThat(allResult1).hasSize(invokeAllParams.size());
for (Future<String> future : allResult1) {
assertThat(future.get()).isEqualTo(CallableTask.RESULT);
@ -161,13 +161,13 @@ public class RedissonExecutorServiceTest extends BaseTest {
@Test
public void testResetShutdownState() throws InterruptedException, ExecutionException {
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10; i++) {
RExecutorService e = redisson.getExecutorService("test");
e.execute(new RunnableTask());
assertThat(e.isShutdown()).isFalse();
e.shutdown();
assertThat(e.isShutdown()).isTrue();
assertThat(e.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
assertThat(e.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
assertThat(e.isTerminated()).isTrue();
assertThat(e.delete()).isTrue();
assertThat(e.isShutdown()).isFalse();

Loading…
Cancel
Save