RBlockingQueueAsync.drainToAsync methods added. #212

pull/243/head
Nikita 10 years ago
parent 0142095c82
commit 7b6a51f243

@ -104,14 +104,15 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public int drainTo(Collection<? super V> c) {
return get(drainToAsync(c));
}
@Override
public Future<Integer> drainToAsync(Collection<? super V> c) {
if (c == null) {
throw new NullPointerException();
}
return get(drainToAsync(c));
}
private Future<Integer> drainToAsync(Collection<? super V> c) {
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
@ -119,18 +120,19 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
}
@Override
public int drainTo(Collection<? super V> c, final int maxElements) {
public int drainTo(Collection<? super V> c, int maxElements) {
if (maxElements <= 0) {
return 0;
}
if (c == null) {
throw new NullPointerException();
}
return get(drainToAsync(c, maxElements));
}
private Future<Integer> drainToAsync(Collection<? super V> c, final int maxElements) {
@Override
public Future<Integer> drainToAsync(Collection<? super V> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +

@ -15,6 +15,7 @@
*/
package org.redisson.core;
import java.util.Collection;
import java.util.concurrent.*;
import io.netty.util.concurrent.Future;
@ -27,6 +28,10 @@ import io.netty.util.concurrent.Future;
*/
public interface RBlockingQueueAsync<V> extends RQueueAsync<V>, RExpirableAsync {
Future<Integer> drainToAsync(Collection<? super V> c, int maxElements);
Future<Integer> drainToAsync(Collection<? super V> c);
Future<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit);
Future<V> pollAsync(long timeout, TimeUnit unit);

Loading…
Cancel
Save