Feature - Add RPUSHX and LPUSHX commands support #2607

pull/2611/head
Nikita Koksharov 5 years ago
parent 3b9ddc6ecf
commit ddfcc61bf9

@ -49,6 +49,26 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
super(codec, commandExecutor, name, redisson);
}
@Override
public int addFirstIfExists(V... elements) {
return get(addFirstIfExistsAsync(elements));
}
@Override
public int addLastIfExists(V... elements) {
return get(addLastIfExistsAsync(elements));
}
@Override
public RFuture<Integer> addFirstIfExistsAsync(V... elements) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSHX, getName(), encode(elements));
}
@Override
public RFuture<Integer> addLastIfExistsAsync(V... elements) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPUSHX, getName(), encode(elements));
}
@Override
public void addFirst(V e) {
get(addFirstAsync(e));

@ -241,9 +241,17 @@ public abstract class RedissonObject implements RObject {
public Codec getCodec() {
return codec;
}
protected List<ByteBuf> encode(Object... values) {
List<ByteBuf> result = new ArrayList<>(values.length);
for (Object object : values) {
result.add(encode(object));
}
return result;
}
protected List<ByteBuf> encode(Collection<?> values) {
List<ByteBuf> result = new ArrayList<ByteBuf>(values.size());
List<ByteBuf> result = new ArrayList<>(values.size());
for (Object object : values) {
result.add(encode(object));
}

@ -48,6 +48,22 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
super(codec, commandExecutor, name, redisson);
}
public int addFirstIfExists(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
public int addLastIfExists(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
public RFuture<Integer> addFirstIfExistsAsync(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
public RFuture<Integer> addLastIfExistsAsync(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
public RFuture<Void> addFirstAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
@ -55,7 +71,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
public RFuture<Void> addLastAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
@Override
public void addFirst(V e) {
throw new UnsupportedOperationException("use add or put method");

@ -27,6 +27,22 @@ import java.util.List;
*/
public interface RDeque<V> extends Deque<V>, RQueue<V>, RDequeAsync<V> {
/**
* Adds element at the head of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
int addFirstIfExists(V... elements);
/**
* Adds element at the tail of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
int addLastIfExists(V... elements);
/**
* Retrieves and removes the tail elements of this queue.
* Elements amount limited by <code>limit</code> param.

@ -26,6 +26,22 @@ import java.util.List;
*/
public interface RDequeAsync<V> extends RQueueAsync<V> {
/**
* Adds element at the head of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
RFuture<Integer> addFirstIfExistsAsync(V... elements);
/**
* Adds element at the tail of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
RFuture<Integer> addLastIfExistsAsync(V... elements);
/**
* Removes last occurrence of element <code>o</code>
*

@ -27,6 +27,22 @@ import reactor.core.publisher.Mono;
*/
public interface RDequeReactive<V> extends RQueueReactive<V> {
/**
* Adds element at the head of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
Mono<Integer> addFirstIfExists(V... elements);
/**
* Adds element at the tail of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
Mono<Integer> addLastIfExists(V... elements);
Flux<V> descendingIterator();
/**

@ -29,6 +29,22 @@ import io.reactivex.Single;
*/
public interface RDequeRx<V> extends RQueueRx<V> {
/**
* Adds element at the head of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
Single<Integer> addFirstIfExists(V... elements);
/**
* Adds element at the tail of existing deque.
*
* @param elements - elements to add
* @return length of the list
*/
Single<Integer> addLastIfExists(V... elements);
Flowable<V> descendingIterator();
/**

@ -230,11 +230,13 @@ public interface RedisCommands {
RedisCommand<Object> RPOP = new RedisCommand<Object>("RPOP");
RedisCommand<Integer> LPUSH = new RedisCommand<Integer>("LPUSH", new IntegerReplayConvertor());
RedisCommand<Integer> LPUSHX = new RedisCommand<Integer>("LPUSHX", new IntegerReplayConvertor());
RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
RedisStrictCommand<Void> LPUSH_VOID = new RedisStrictCommand<Void>("LPUSH", new VoidReplayConvertor());
RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> LRANGE_SET = new RedisCommand<Set<Object>>("LRANGE", new ObjectSetReplayDecoder<Object>());
RedisCommand<Integer> RPUSH = new RedisCommand<Integer>("RPUSH", new IntegerReplayConvertor());
RedisCommand<Integer> RPUSHX = new RedisCommand<Integer>("RPUSHX", new IntegerReplayConvertor());
RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor());
RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor());

Loading…
Cancel
Save