refactoring

pull/1249/head
Nikita 7 years ago
parent fc6ac4bba2
commit 5b2d85ca5b

@ -25,11 +25,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}
@ -142,42 +137,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
}
public RFuture<V> pollLastAsync() {
final long threadId = Thread.currentThread().getId();
final RPromise<V> result = new RedissonPromise<V>();
lock.lockAsync(threadId).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
RFuture<V> f = commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName());
f.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
final V value = future.getNow();
lock.unlockAsync(threadId).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
result.trySuccess(value);
}
});
}
});
}
});
return result;
return pollAsync(RedisCommands.RPOP, getName());
}
@Override

Loading…
Cancel
Save