refactoring

pull/578/head^2
Nikita 9 years ago
parent 984fda8295
commit 2874e75196

@ -137,6 +137,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
final Promise<V> result = new PromiseDelegator<V>(commandExecutor.getConnectionManager().<V>newPromise()) { final Promise<V> result = new PromiseDelegator<V>(commandExecutor.getConnectionManager().<V>newPromise()) {
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
return takeFuture.cancel(mayInterruptIfRunning); return takeFuture.cancel(mayInterruptIfRunning);
}; };
}; };
@ -145,14 +146,14 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override @Override
public void operationComplete(Future<V> future) throws Exception { public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
result.setFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
createSemaphore(null).releaseAsync().addListener(new FutureListener<Void>() { createSemaphore(null).releaseAsync().addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
result.setSuccess(takeFuture.getNow()); result.trySuccess(takeFuture.getNow());
} }
}); });
} }

@ -273,7 +273,7 @@ public class RedissonRemoteService implements RRemoteService {
private <T> void invokeMethod(final Class<T> remoteInterface, private <T> void invokeMethod(final Class<T> remoteInterface,
final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request, final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, final ExecutorService executor, RemoteServiceMethod method, String responseName, final ExecutorService executor,
Future<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) { Future<RemoteServiceCancelRequest> cancelRequestFuture, final AtomicReference<RRemoteServiceResponse> responseHolder) {
try { try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs()); Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
@ -454,7 +454,7 @@ public class RedissonRemoteService implements RRemoteService {
} }
if (optionsCopy.isAckExpected()) { if (optionsCopy.isAckExpected()) {
RBlockingQueue<RemoteServiceAck> responseQueue = redisson.getBlockingQueue(responseName, getCodec()); final RBlockingQueue<RemoteServiceAck> responseQueue = redisson.getBlockingQueue(responseName, getCodec());
Future<RemoteServiceAck> ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS); Future<RemoteServiceAck> ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() { ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override @Override

Loading…
Cancel
Save