Future.cancel method handling for remote method call. #560

pull/574/merge
Nikita 9 years ago
parent a3e8af5995
commit 5a0b32f6ae

@ -393,22 +393,22 @@ public class Redisson implements RedissonClient {
}
public RRemoteService getRemoteSerivce() {
return new RedissonRemoteService(this);
return new RedissonRemoteService(this, commandExecutor);
}
@Override
public RRemoteService getRemoteSerivce(String name) {
return new RedissonRemoteService(this, name);
return new RedissonRemoteService(this, name, commandExecutor);
}
@Override
public RRemoteService getRemoteSerivce(Codec codec) {
return new RedissonRemoteService(codec, this);
return new RedissonRemoteService(codec, this, commandExecutor);
}
@Override
public RRemoteService getRemoteSerivce(String name, Codec codec) {
return new RedissonRemoteService(codec, this, name);
return new RedissonRemoteService(codec, this, name, commandExecutor);
}
@Override

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@ -27,6 +28,8 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.PromiseDelegator;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBlockingQueueAsync;
@ -68,23 +71,25 @@ public class RedissonRemoteService implements RRemoteService {
private final Codec codec;
private final Redisson redisson;
private final String name;
private final CommandExecutor commandExecutor;
public RedissonRemoteService(Redisson redisson) {
this(redisson, "redisson_remote_service");
public RedissonRemoteService(Redisson redisson, CommandExecutor commandExecutor) {
this(redisson, "redisson_remote_service", commandExecutor);
}
public RedissonRemoteService(Redisson redisson, String name) {
this(null, redisson, name);
public RedissonRemoteService(Redisson redisson, String name, CommandExecutor commandExecutor) {
this(null, redisson, name, commandExecutor);
}
public RedissonRemoteService(Codec codec, Redisson redisson) {
this(codec, redisson, "redisson_remote_service");
public RedissonRemoteService(Codec codec, Redisson redisson, CommandExecutor commandExecutor) {
this(codec, redisson, "redisson_remote_service", commandExecutor);
}
public RedissonRemoteService(Codec codec, Redisson redisson, String name) {
public RedissonRemoteService(Codec codec, Redisson redisson, String name, CommandExecutor commandExecutor) {
this.codec = codec;
this.redisson = redisson;
this.name = name;
this.commandExecutor = commandExecutor;
}
@Override
@ -118,6 +123,14 @@ public class RedissonRemoteService implements RRemoteService {
}
return redisson.getConfig().getCodec();
}
private byte[] encode(Object obj) {
try {
return getCodec().getValueEncoder().encode(obj);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
@ -159,7 +172,7 @@ public class RedissonRemoteService implements RRemoteService {
+ "return 1;"
+ "end;"
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName),
getCodec().getValueEncoder().encode(new RemoteServiceAck()),
encode(new RemoteServiceAck()),
request.getOptions().getAckTimeoutInMillis());
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@ -290,25 +303,49 @@ public class RedissonRemoteService implements RRemoteService {
}
final String requestId = generateRequestId();
final Promise<Object> result = ImmediateEventExecutor.INSTANCE.newPromise();
String requestQueueName = name + ":{" + interfaceName + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
final String requestQueueName = name + ":{" + interfaceName + "}";
final String responseName = name + ":{" + interfaceName + "}:" + requestId;
final String ackName = getAckName(remoteInterface.getName(), requestId);
final RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
final RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(), args, optionsCopy, System.currentTimeMillis());
final Promise<Object> result = new PromiseDelegator<Object>(ImmediateEventExecutor.INSTANCE.newPromise()) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (optionsCopy.isAckExpected()) {
Future<Boolean> future = redisson.getScript().evalAsync(responseName, Mode.READ_WRITE, LongCodec.INSTANCE,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('lrem', KEYS[3], 1, ARGV[1]);"
+ "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName, requestQueueName),
encode(request),
request.getOptions().getAckTimeoutInMillis());
return commandExecutor.get(future);
}
return requestQueue.remove(request);
}
};
Future<Boolean> addFuture = requestQueue.addAsync(request);
addFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
String responseName = name + ":{" + interfaceName + "}:" + requestId;
responseQueue = redisson.getBlockingQueue(responseName, getCodec());
} else {
responseQueue = null;
@ -320,26 +357,25 @@ public class RedissonRemoteService implements RRemoteService {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
RemoteServiceAck ack = future.getNow();
if (ack == null) {
final String ackName = getAckName(remoteInterface.getName(), request.getRequestId());
Future<RemoteServiceAck> ackFutureAttempt = tryPollAckAgainAsync(optionsCopy, responseQueue, ackName);
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (future.getNow() == null) {
Exception ex = new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
result.setFailure(ex);
result.tryFailure(ex);
return;
}
@ -353,9 +389,7 @@ public class RedissonRemoteService implements RRemoteService {
});
} else {
if (optionsCopy.isResultExpected()) {
invokeAsync(optionsCopy, result, request, responseQueue);
}
invokeAsync(optionsCopy, result, request, responseQueue);
}
}
});
@ -376,7 +410,7 @@ public class RedissonRemoteService implements RRemoteService {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
@ -397,21 +431,22 @@ public class RedissonRemoteService implements RRemoteService {
public void operationComplete(Future<RemoteServiceResponse> future)
throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}
if (future.getNow() == null) {
RemoteServiceTimeoutException e = new RemoteServiceTimeoutException("No response after " + optionsCopy.getExecutionTimeoutInMillis() + "ms for request: " + request);
result.setFailure(e);
result.tryFailure(e);
return;
}
if (future.getNow().getError() != null) {
result.setFailure(future.getNow().getError());
result.tryFailure(future.getNow().getError());
return;
}
result.setSuccess(future.getNow().getResult());
result.trySuccess(future.getNow().getResult());
}
});
}

@ -0,0 +1,140 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
public class PromiseDelegator<T> implements Promise<T> {
private final Promise<T> promise;
public PromiseDelegator(Promise<T> promise) {
super();
this.promise = promise;
}
public Promise<T> setSuccess(T result) {
return promise.setSuccess(result);
}
public boolean isSuccess() {
return promise.isSuccess();
}
public boolean trySuccess(T result) {
return promise.trySuccess(result);
}
public boolean isCancellable() {
return promise.isCancellable();
}
public Throwable cause() {
return promise.cause();
}
public Promise<T> setFailure(Throwable cause) {
return promise.setFailure(cause);
}
public boolean tryFailure(Throwable cause) {
return promise.tryFailure(cause);
}
public boolean setUncancellable() {
return promise.setUncancellable();
}
public Promise<T> addListener(GenericFutureListener<? extends Future<? super T>> listener) {
return promise.addListener(listener);
}
public Promise<T> addListeners(GenericFutureListener<? extends Future<? super T>>... listeners) {
return promise.addListeners(listeners);
}
public Promise<T> removeListener(GenericFutureListener<? extends Future<? super T>> listener) {
return promise.removeListener(listener);
}
public Promise<T> removeListeners(GenericFutureListener<? extends Future<? super T>>... listeners) {
return promise.removeListeners(listeners);
}
public Promise<T> await() throws InterruptedException {
return promise.await();
}
public Promise<T> awaitUninterruptibly() {
return promise.awaitUninterruptibly();
}
public Promise<T> sync() throws InterruptedException {
return promise.sync();
}
public Promise<T> syncUninterruptibly() {
return promise.syncUninterruptibly();
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return promise.await(timeout, unit);
}
public boolean isCancelled() {
return promise.isCancelled();
}
public boolean isDone() {
return promise.isDone();
}
public boolean await(long timeoutMillis) throws InterruptedException {
return promise.await(timeoutMillis);
}
public T get() throws InterruptedException, ExecutionException {
return promise.get();
}
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return promise.awaitUninterruptibly(timeout, unit);
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return promise.get(timeout, unit);
}
public boolean awaitUninterruptibly(long timeoutMillis) {
return promise.awaitUninterruptibly(timeoutMillis);
}
public T getNow() {
return promise.getNow();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return promise.cancel(mayInterruptIfRunning);
}
}
Loading…
Cancel
Save