diff --git a/src/main/java/org/redisson/RedissonDequeReactive.java b/src/main/java/org/redisson/RedissonDequeReactive.java new file mode 100644 index 000000000..585c1e5f0 --- /dev/null +++ b/src/main/java/org/redisson/RedissonDequeReactive.java @@ -0,0 +1,127 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.convertor.TrueReplayConvertor; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.connection.decoder.ListFirstObjectDecoder; +import org.redisson.core.RDequeReactive; + +/** + * Distributed and concurrent implementation of {@link java.util.Queue} + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public class RedissonDequeReactive extends RedissonQueueReactive implements RDequeReactive { + + private static final RedisCommand LPUSH_VOID = new RedisCommand("LPUSH", new VoidReplayConvertor()); + private static final RedisCommand LPUSH_BOOLEAN = new RedisCommand("LPUSH", new TrueReplayConvertor()); + private static final RedisCommand RPUSH_VOID = new RedisCommand("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); + private static final RedisCommand LRANGE_SINGLE = new RedisCommand("LRANGE", new ListFirstObjectDecoder()); + + + protected RedissonDequeReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + public RedissonDequeReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + + @Override + public Publisher addFirst(V e) { + return commandExecutor.writeObservable(getName(), codec, LPUSH_VOID, getName(), e); + } + + @Override + public Publisher addLast(V e) { + return commandExecutor.writeObservable(getName(), codec, RPUSH_VOID, getName(), e); + } + + @Override + public Publisher getLast() { + return commandExecutor.readObservable(getName(), codec, LRANGE_SINGLE, getName(), -1, -1); + } + + @Override + public Publisher offerFirst(V e) { + return commandExecutor.writeObservable(getName(), codec, LPUSH_BOOLEAN, getName(), e); + } + + @Override + public Publisher offerLast(V e) { + return offer(e); + } + + @Override + public Publisher peekFirst() { + return get(0); + } + + @Override + public Publisher peekLast() { + return getLast(); + } + + @Override + public Publisher pollFirst() { + return poll(); + } + + @Override + public Publisher pollLast() { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.RPOP, getName()); + } + + @Override + public Publisher pop() { + return poll(); + } + + @Override + public Publisher push(V e) { + return addFirst(e); + } + + @Override + public Publisher removeFirstOccurrence(Object o) { + return remove(o, 1); + } + + @Override + public Publisher removeFirst() { + return poll(); + } + + @Override + public Publisher removeLast() { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.RPOP, getName()); + } + + @Override + public Publisher removeLastOccurrence(Object o) { + return remove(o, -1); + } + +} diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index a9668f73e..a959500e7 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -31,6 +31,7 @@ import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SingleConnectionManager; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; +import org.redisson.core.RDequeReactive; import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RListReactive; @@ -203,6 +204,16 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonBlockingQueueReactive(codec, commandExecutor, name); } + @Override + public RDequeReactive getDeque(String name) { + return new RedissonDequeReactive(commandExecutor, name); + } + + @Override + public RDequeReactive getDeque(String name, Codec codec) { + return new RedissonDequeReactive(codec, commandExecutor, name); + } + public Config getConfig() { return config; } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 7bc1cf152..7d5474970 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -20,6 +20,7 @@ import java.util.List; import org.redisson.client.codec.Codec; import org.redisson.core.RBlockingQueueReactive; import org.redisson.core.RBucketReactive; +import org.redisson.core.RDequeReactive; import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RLexSortedSet; import org.redisson.core.RLexSortedSetReactive; @@ -154,16 +155,16 @@ public interface RedissonReactiveClient { RBlockingQueueReactive getBlockingQueue(String name, Codec codec); -// /** -// * Returns deque instance by name. -// * -// * @param name of deque -// * @return -// */ -// RDeque getDeque(String name); -// -// RDeque getDeque(String name, Codec codec); -// + /** + * Returns deque instance by name. + * + * @param name of deque + * @return + */ + RDequeReactive getDeque(String name); + + RDequeReactive getDeque(String name, Codec codec); + // /** // * Returns "atomic long" instance by name. // * diff --git a/src/main/java/org/redisson/core/RDequeReactive.java b/src/main/java/org/redisson/core/RDequeReactive.java new file mode 100644 index 000000000..de3af78cc --- /dev/null +++ b/src/main/java/org/redisson/core/RDequeReactive.java @@ -0,0 +1,61 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import org.reactivestreams.Publisher; + +/** + * {@link java.util.Deque} backed by Redis + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public interface RDequeReactive extends RQueueReactive { + + Publisher descendingIterator(); + + Publisher removeLastOccurrence(Object o); + + Publisher removeLast(); + + Publisher removeFirst(); + + Publisher removeFirstOccurrence(Object o); + + Publisher push(V e); + + Publisher pop(); + + Publisher pollLast(); + + Publisher pollFirst(); + + Publisher peekLast(); + + Publisher peekFirst(); + + Publisher offerLast(V e); + + Publisher getLast(); + + Publisher addLast(V e); + + Publisher addFirst(V e); + + Publisher offerFirst(V e); + +} diff --git a/src/main/java/org/redisson/core/RQueueReactive.java b/src/main/java/org/redisson/core/RQueueReactive.java index 976e96067..bdd8a8425 100644 --- a/src/main/java/org/redisson/core/RQueueReactive.java +++ b/src/main/java/org/redisson/core/RQueueReactive.java @@ -32,8 +32,6 @@ public interface RQueueReactive extends RCollectionReactive { Publisher offer(V e); - Publisher pollLastAndOfferFirstTo(RQueue queue); - Publisher pollLastAndOfferFirstTo(String queueName); } diff --git a/src/test/java/org/redisson/RedissonDequeReactiveTest.java b/src/test/java/org/redisson/RedissonDequeReactiveTest.java new file mode 100644 index 000000000..d4091d3a5 --- /dev/null +++ b/src/test/java/org/redisson/RedissonDequeReactiveTest.java @@ -0,0 +1,138 @@ +package org.redisson; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.Iterator; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.core.RDeque; +import org.redisson.core.RDequeReactive; + +public class RedissonDequeReactiveTest extends BaseReactiveTest { + + @Test + public void testRemoveLastOccurrence() { + RDequeReactive queue1 = redisson.getDeque("deque1"); + sync(queue1.addFirst(3)); + sync(queue1.addFirst(1)); + sync(queue1.addFirst(2)); + sync(queue1.addFirst(3)); + + sync(queue1.removeLastOccurrence(3)); + + MatcherAssert.assertThat(sync(queue1), Matchers.containsInAnyOrder(3, 2, 1)); + } + + @Test + public void testRemoveFirstOccurrence() { + RDequeReactive queue1 = redisson.getDeque("deque1"); + sync(queue1.addFirst(3)); + sync(queue1.addFirst(1)); + sync(queue1.addFirst(2)); + sync(queue1.addFirst(3)); + + sync(queue1.removeFirstOccurrence(3)); + + MatcherAssert.assertThat(sync(queue1), Matchers.containsInAnyOrder(2, 1, 3)); + } + + @Test + public void testRemoveLast() { + RDequeReactive queue1 = redisson.getDeque("deque1"); + sync(queue1.addFirst(1)); + sync(queue1.addFirst(2)); + sync(queue1.addFirst(3)); + + Assert.assertEquals(1, (int)sync(queue1.removeLast())); + Assert.assertEquals(2, (int)sync(queue1.removeLast())); + Assert.assertEquals(3, (int)sync(queue1.removeLast())); + } + + @Test + public void testRemoveFirst() { + RDequeReactive queue1 = redisson.getDeque("deque1"); + sync(queue1.addFirst(1)); + sync(queue1.addFirst(2)); + sync(queue1.addFirst(3)); + + Assert.assertEquals(3, (int)sync(queue1.removeFirst())); + Assert.assertEquals(2, (int)sync(queue1.removeFirst())); + Assert.assertEquals(1, (int)sync(queue1.removeFirst())); + } + + @Test + public void testPeek() { + RDequeReactive queue1 = redisson.getDeque("deque1"); + Assert.assertNull(sync(queue1.peekFirst())); + Assert.assertNull(sync(queue1.peekLast())); + sync(queue1.addFirst(2)); + Assert.assertEquals(2, (int)sync(queue1.peekFirst())); + Assert.assertEquals(2, (int)sync(queue1.peekLast())); + } + + @Test + public void testPollLastAndOfferFirstTo() { + RDequeReactive queue1 = redisson.getDeque("deque1"); + sync(queue1.addFirst(3)); + sync(queue1.addFirst(2)); + sync(queue1.addFirst(1)); + + RDequeReactive queue2 = redisson.getDeque("deque2"); + sync(queue2.addFirst(6)); + sync(queue2.addFirst(5)); + sync(queue2.addFirst(4)); + + sync(queue1.pollLastAndOfferFirstTo(queue2.getName())); + MatcherAssert.assertThat(sync(queue2), Matchers.contains(3, 4, 5, 6)); + } + + @Test + public void testAddFirst() { + RDequeReactive queue = redisson.getDeque("deque"); + sync(queue.addFirst(1)); + sync(queue.addFirst(2)); + sync(queue.addFirst(3)); + + MatcherAssert.assertThat(sync(queue), Matchers.contains(3, 2, 1)); + } + + @Test + public void testAddLast() { + RDequeReactive queue = redisson.getDeque("deque"); + sync(queue.addLast(1)); + sync(queue.addLast(2)); + sync(queue.addLast(3)); + + MatcherAssert.assertThat(sync(queue), Matchers.contains(1, 2, 3)); + } + + @Test + public void testOfferFirst() { + RDequeReactive queue = redisson.getDeque("deque"); + sync(queue.offerFirst(1)); + sync(queue.offerFirst(2)); + sync(queue.offerFirst(3)); + + MatcherAssert.assertThat(sync(queue), Matchers.contains(3, 2, 1)); + } + + @Test + public void testDescendingIterator() { + final RDequeReactive queue = redisson.getDeque("deque"); + sync(queue.addAll(Arrays.asList(1, 2, 3))); + + MatcherAssert.assertThat(new Iterable() { + + @Override + public Iterator iterator() { + return toIterator(queue.descendingIterator()); + } + + }, Matchers.contains(3, 2, 1)); + } + +}