RDequeReactive added. #210

pull/337/head
Nikita 9 years ago
parent f376c74b2b
commit a1d3f2cf4b

@ -0,0 +1,127 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.decoder.ListFirstObjectDecoder;
import org.redisson.core.RDequeReactive;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}
*
* @author Nikita Koksharov
*
* @param <V> the type of elements held in this collection
*/
public class RedissonDequeReactive<V> extends RedissonQueueReactive<V> implements RDequeReactive<V> {
private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
protected RedissonDequeReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
public RedissonDequeReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public Publisher<Void> addFirst(V e) {
return commandExecutor.writeObservable(getName(), codec, LPUSH_VOID, getName(), e);
}
@Override
public Publisher<Void> addLast(V e) {
return commandExecutor.writeObservable(getName(), codec, RPUSH_VOID, getName(), e);
}
@Override
public Publisher<V> getLast() {
return commandExecutor.readObservable(getName(), codec, LRANGE_SINGLE, getName(), -1, -1);
}
@Override
public Publisher<Boolean> offerFirst(V e) {
return commandExecutor.writeObservable(getName(), codec, LPUSH_BOOLEAN, getName(), e);
}
@Override
public Publisher<Long> offerLast(V e) {
return offer(e);
}
@Override
public Publisher<V> peekFirst() {
return get(0);
}
@Override
public Publisher<V> peekLast() {
return getLast();
}
@Override
public Publisher<V> pollFirst() {
return poll();
}
@Override
public Publisher<V> pollLast() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.RPOP, getName());
}
@Override
public Publisher<V> pop() {
return poll();
}
@Override
public Publisher<Void> push(V e) {
return addFirst(e);
}
@Override
public Publisher<Boolean> removeFirstOccurrence(Object o) {
return remove(o, 1);
}
@Override
public Publisher<V> removeFirst() {
return poll();
}
@Override
public Publisher<V> removeLast() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.RPOP, getName());
}
@Override
public Publisher<Boolean> removeLastOccurrence(Object o) {
return remove(o, -1);
}
}

@ -31,6 +31,7 @@ import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
@ -203,6 +204,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonBlockingQueueReactive<V>(codec, commandExecutor, name);
}
@Override
public <V> RDequeReactive<V> getDeque(String name) {
return new RedissonDequeReactive<V>(commandExecutor, name);
}
@Override
public <V> RDequeReactive<V> getDeque(String name, Codec codec) {
return new RedissonDequeReactive<V>(codec, commandExecutor, name);
}
public Config getConfig() {
return config;
}

@ -20,6 +20,7 @@ import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RDequeReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSet;
import org.redisson.core.RLexSortedSetReactive;
@ -154,16 +155,16 @@ public interface RedissonReactiveClient {
<V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec);
// /**
// * Returns deque instance by name.
// *
// * @param name of deque
// * @return
// */
// <V> RDeque<V> getDeque(String name);
//
// <V> RDeque<V> getDeque(String name, Codec codec);
//
/**
* Returns deque instance by name.
*
* @param name of deque
* @return
*/
<V> RDequeReactive<V> getDeque(String name);
<V> RDequeReactive<V> getDeque(String name, Codec codec);
// /**
// * Returns "atomic long" instance by name.
// *

@ -0,0 +1,61 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import org.reactivestreams.Publisher;
/**
* {@link java.util.Deque} backed by Redis
*
* @author Nikita Koksharov
*
* @param <V> the type of elements held in this collection
*/
public interface RDequeReactive<V> extends RQueueReactive<V> {
Publisher<V> descendingIterator();
Publisher<Boolean> removeLastOccurrence(Object o);
Publisher<V> removeLast();
Publisher<V> removeFirst();
Publisher<Boolean> removeFirstOccurrence(Object o);
Publisher<Void> push(V e);
Publisher<V> pop();
Publisher<V> pollLast();
Publisher<V> pollFirst();
Publisher<V> peekLast();
Publisher<V> peekFirst();
Publisher<Long> offerLast(V e);
Publisher<V> getLast();
Publisher<Void> addLast(V e);
Publisher<Void> addFirst(V e);
Publisher<Boolean> offerFirst(V e);
}

@ -32,8 +32,6 @@ public interface RQueueReactive<V> extends RCollectionReactive<V> {
Publisher<Long> offer(V e);
Publisher<V> pollLastAndOfferFirstTo(RQueue<V> queue);
Publisher<V> pollLastAndOfferFirstTo(String queueName);
}

@ -0,0 +1,138 @@
package org.redisson;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
import java.util.Iterator;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RDeque;
import org.redisson.core.RDequeReactive;
public class RedissonDequeReactiveTest extends BaseReactiveTest {
@Test
public void testRemoveLastOccurrence() {
RDequeReactive<Integer> queue1 = redisson.getDeque("deque1");
sync(queue1.addFirst(3));
sync(queue1.addFirst(1));
sync(queue1.addFirst(2));
sync(queue1.addFirst(3));
sync(queue1.removeLastOccurrence(3));
MatcherAssert.assertThat(sync(queue1), Matchers.containsInAnyOrder(3, 2, 1));
}
@Test
public void testRemoveFirstOccurrence() {
RDequeReactive<Integer> queue1 = redisson.getDeque("deque1");
sync(queue1.addFirst(3));
sync(queue1.addFirst(1));
sync(queue1.addFirst(2));
sync(queue1.addFirst(3));
sync(queue1.removeFirstOccurrence(3));
MatcherAssert.assertThat(sync(queue1), Matchers.containsInAnyOrder(2, 1, 3));
}
@Test
public void testRemoveLast() {
RDequeReactive<Integer> queue1 = redisson.getDeque("deque1");
sync(queue1.addFirst(1));
sync(queue1.addFirst(2));
sync(queue1.addFirst(3));
Assert.assertEquals(1, (int)sync(queue1.removeLast()));
Assert.assertEquals(2, (int)sync(queue1.removeLast()));
Assert.assertEquals(3, (int)sync(queue1.removeLast()));
}
@Test
public void testRemoveFirst() {
RDequeReactive<Integer> queue1 = redisson.getDeque("deque1");
sync(queue1.addFirst(1));
sync(queue1.addFirst(2));
sync(queue1.addFirst(3));
Assert.assertEquals(3, (int)sync(queue1.removeFirst()));
Assert.assertEquals(2, (int)sync(queue1.removeFirst()));
Assert.assertEquals(1, (int)sync(queue1.removeFirst()));
}
@Test
public void testPeek() {
RDequeReactive<Integer> queue1 = redisson.getDeque("deque1");
Assert.assertNull(sync(queue1.peekFirst()));
Assert.assertNull(sync(queue1.peekLast()));
sync(queue1.addFirst(2));
Assert.assertEquals(2, (int)sync(queue1.peekFirst()));
Assert.assertEquals(2, (int)sync(queue1.peekLast()));
}
@Test
public void testPollLastAndOfferFirstTo() {
RDequeReactive<Integer> queue1 = redisson.getDeque("deque1");
sync(queue1.addFirst(3));
sync(queue1.addFirst(2));
sync(queue1.addFirst(1));
RDequeReactive<Integer> queue2 = redisson.getDeque("deque2");
sync(queue2.addFirst(6));
sync(queue2.addFirst(5));
sync(queue2.addFirst(4));
sync(queue1.pollLastAndOfferFirstTo(queue2.getName()));
MatcherAssert.assertThat(sync(queue2), Matchers.contains(3, 4, 5, 6));
}
@Test
public void testAddFirst() {
RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addFirst(1));
sync(queue.addFirst(2));
sync(queue.addFirst(3));
MatcherAssert.assertThat(sync(queue), Matchers.contains(3, 2, 1));
}
@Test
public void testAddLast() {
RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addLast(1));
sync(queue.addLast(2));
sync(queue.addLast(3));
MatcherAssert.assertThat(sync(queue), Matchers.contains(1, 2, 3));
}
@Test
public void testOfferFirst() {
RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.offerFirst(1));
sync(queue.offerFirst(2));
sync(queue.offerFirst(3));
MatcherAssert.assertThat(sync(queue), Matchers.contains(3, 2, 1));
}
@Test
public void testDescendingIterator() {
final RDequeReactive<Integer> queue = redisson.getDeque("deque");
sync(queue.addAll(Arrays.asList(1, 2, 3)));
MatcherAssert.assertThat(new Iterable<Integer>() {
@Override
public Iterator<Integer> iterator() {
return toIterator(queue.descendingIterator());
}
}, Matchers.contains(3, 2, 1));
}
}
Loading…
Cancel
Save