RDequeAsync interface added.

pull/243/head
Nikita
parent 6afd7cd9e4
commit 27be4c6e0f

@ -16,13 +16,19 @@
package org.redisson;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
import org.redisson.core.RDeque;
import io.netty.util.concurrent.Future;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}
*
@ -32,20 +38,37 @@ import org.redisson.core.RDeque;
*/
public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
protected RedissonDeque(ConnectionManager connectionManager, String name) {
super(connectionManager, name);
}
@Override
public void addFirst(V e) {
connectionManager.write(getName(), RedisCommands.LPUSH, getName(), e);
connectionManager.get(addFirstAsync(e));
}
@Override
public Future<Void> addFirstAsync(V e) {
return connectionManager.writeAsync(getName(), LPUSH_VOID, getName(), e);
}
@Override
public void addLast(V e) {
connectionManager.write(getName(), RedisCommands.RPUSH, getName(), e);
connectionManager.get(addLastAsync(e));
}
@Override
public Future<Void> addLastAsync(V e) {
return connectionManager.writeAsync(getName(), RPUSH_VOID, getName(), e);
}
@Override
public Iterator<V> descendingIterator() {
return new Iterator<V>() {
@ -82,38 +105,63 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
};
}
@Override
public Future<V> getLastAsync() {
return connectionManager.readAsync(getName(), LRANGE_SINGLE, getName(), -1, -1);
}
@Override
public V getLast() {
List<V> list = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), -1, -1);
if (list.isEmpty()) {
V result = connectionManager.get(getLastAsync());
if (result == null) {
throw new NoSuchElementException();
}
return list.get(0);
return result;
}
@Override
public boolean offerFirst(V e) {
connectionManager.write(getName(), RedisCommands.LPUSH, getName(), e);
return true;
return connectionManager.get(offerFirstAsync(e));
}
@Override
public Future<Boolean> offerFirstAsync(V e) {
return connectionManager.writeAsync(getName(), LPUSH_BOOLEAN, getName(), e);
}
@Override
public Future<Boolean> offerLastAsync(V e) {
return offerAsync(e);
}
@Override
public boolean offerLast(V e) {
return offer(e);
return connectionManager.get(offerLastAsync(e));
}
@Override
public Future<V> peekFirstAsync() {
return getAsync(0);
}
@Override
public V peekFirst() {
return peek();
return connectionManager.get(peekFirstAsync());
}
@Override
public Future<V> peekLastAsync() {
return getLastAsync();
}
@Override
public V peekLast() {
List<V> list = connectionManager.read(getName(), RedisCommands.LRANGE, getName(), -1, -1);
if (list.isEmpty()) {
return null;
}
return list.get(0);
return connectionManager.get(getLastAsync());
}
@Override
public Future<V> pollFirstAsync() {
return pollAsync();
}
@Override
@ -121,9 +169,20 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
return poll();
}
@Override
public Future<V> pollLastAsync() {
return connectionManager.writeAsync(getName(), RedisCommands.RPOP, getName());
}
@Override
public V pollLast() {
return connectionManager.write(getName(), RedisCommands.RPOP, getName());
return connectionManager.get(pollLastAsync());
}
@Override
public Future<V> popAsync() {
return pollAsync();
}
@Override
@ -131,25 +190,50 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
return removeFirst();
}
@Override
public Future<Void> pushAsync(V e) {
return addFirstAsync(e);
}
@Override
public void push(V e) {
addFirst(e);
}
@Override
public Future<Boolean> removeFirstOccurrenceAsync(Object o) {
return removeAsync(o, 1);
}
@Override
public boolean removeFirstOccurrence(Object o) {
return remove(o, 1);
}
@Override
public Future<V> removeFirstAsync() {
return pollAsync();
}
@Override
public Future<V> removeLastAsync() {
return connectionManager.writeAsync(getName(), RedisCommands.RPOP, getName());
}
@Override
public V removeLast() {
V value = connectionManager.write(getName(), RedisCommands.RPOP, getName());
V value = connectionManager.get(removeLastAsync());
if (value == null) {
throw new NoSuchElementException();
}
return value;
}
@Override
public Future<Boolean> removeLastOccurrenceAsync(Object o) {
return removeAsync(o, -1);
}
@Override
public boolean removeLastOccurrence(Object o) {
return remove(o, -1);

@ -276,7 +276,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return getValue(index);
}
private V getValue(int index) {
V getValue(int index) {
return connectionManager.get(getAsync(index));
}

@ -47,7 +47,7 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
}
public V getFirst() {
V value = connectionManager.read(getName(), RedisCommands.LINDEX, getName(), 0);
V value = getValue(0);
if (value == null) {
throw new NoSuchElementException();
}

@ -0,0 +1,45 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class ListFirstObjectDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
return parts.get(0);
}
return null;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -24,6 +24,7 @@ import java.util.Deque;
*
* @param <V> the type of elements held in this collection
*/
public interface RDeque<V> extends Deque<V>, RQueue<V> {
public interface RDeque<V> extends Deque<V>, RQueue<V>, RDequeAsync<V> {
}

@ -0,0 +1,59 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
/**
* {@link java.util.Deque} backed by Redis
*
* @author Nikita Koksharov
*
* @param <V> the type of elements held in this collection
*/
public interface RDequeAsync<V> extends RQueueAsync<V> {
Future<Boolean> removeLastOccurrenceAsync(Object o);
Future<V> removeLastAsync();
Future<V> removeFirstAsync();
Future<Boolean> removeFirstOccurrenceAsync(Object o);
Future<Void> pushAsync(V e);
Future<V> popAsync();
Future<V> pollLastAsync();
Future<V> pollFirstAsync();
Future<V> peekLastAsync();
Future<V> peekFirstAsync();
Future<Boolean> offerLastAsync(V e);
Future<V> getLastAsync();
Future<Void> addLastAsync(V e);
Future<Void> addFirstAsync(V e);
Future<Boolean> offerFirstAsync(V e);
}

@ -13,6 +13,66 @@ import org.redisson.core.RDeque;
public class RedissonDequeTest extends BaseTest {
@Test
public void testRemoveLastOccurrence() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
queue1.addFirst(3);
queue1.addFirst(1);
queue1.addFirst(2);
queue1.addFirst(3);
queue1.removeLastOccurrence(3);
MatcherAssert.assertThat(queue1, Matchers.containsInAnyOrder(3, 2, 1));
}
@Test
public void testRemoveFirstOccurrence() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
queue1.addFirst(3);
queue1.addFirst(1);
queue1.addFirst(2);
queue1.addFirst(3);
queue1.removeFirstOccurrence(3);
MatcherAssert.assertThat(queue1, Matchers.containsInAnyOrder(2, 1, 3));
}
@Test
public void testRemoveLast() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
queue1.addFirst(1);
queue1.addFirst(2);
queue1.addFirst(3);
Assert.assertEquals(1, (int)queue1.removeLast());
Assert.assertEquals(2, (int)queue1.removeLast());
Assert.assertEquals(3, (int)queue1.removeLast());
}
@Test
public void testRemoveFirst() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
queue1.addFirst(1);
queue1.addFirst(2);
queue1.addFirst(3);
Assert.assertEquals(3, (int)queue1.removeFirst());
Assert.assertEquals(2, (int)queue1.removeFirst());
Assert.assertEquals(1, (int)queue1.removeFirst());
}
@Test
public void testPeek() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
Assert.assertNull(queue1.peekFirst());
Assert.assertNull(queue1.peekLast());
queue1.addFirst(2);
Assert.assertEquals(2, (int)queue1.peekFirst());
Assert.assertEquals(2, (int)queue1.peekLast());
}
@Test
public void testPollLastAndOfferFirstTo() {
RDeque<Integer> queue1 = redisson.getDeque("deque1");
@ -28,7 +88,7 @@ public class RedissonDequeTest extends BaseTest {
queue1.pollLastAndOfferFirstTo(queue2);
MatcherAssert.assertThat(queue2, Matchers.contains(3, 4, 5, 6));
}
@Test
public void testAddFirstOrigin() {
Deque<Integer> queue = new ArrayDeque<Integer>();

Loading…
Cancel
Save