RedissonListReactiveTest added. Few bugs fixed. #210

pull/337/head
Nikita 9 years ago
parent 0197be6c71
commit 4729762eaa

@ -22,7 +22,9 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import rx.Single;
import rx.SingleSubscriber;
/**
*
@ -31,6 +33,29 @@ import rx.Single;
*/
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
static class ToObservableFuture<T> implements Single.OnSubscribe<T> {
private final Future<? extends T> that;
public ToObservableFuture(Future<? extends T> that) {
this.that = that;
}
@Override
public void call(final SingleSubscriber<? super T> subscriber) {
that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
}
subscriber.onSuccess(future.getNow());
}
});
}
}
public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager);
}
@ -43,7 +68,7 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
@Override
public <T, R> Single<R> writeObservable(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = writeAsync(key, codec, command, params);
return Single.from(f);
return Single.create(new ToObservableFuture<R>(f));
}
@Override
@ -54,21 +79,21 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
@Override
public <T, R> Single<R> readObservable(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = readAsync(key, codec, command, params);
return Single.from(f);
return Single.create(new ToObservableFuture<R>(f));
}
@Override
public <T, R> Single<R> evalReadObservable(String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object... params) {
Future<R> f = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return Single.from(f);
return Single.create(new ToObservableFuture<R>(f));
}
@Override
public <T, R> Single<R> evalWriteObservable(String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object... params) {
Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return Single.from(f);
return Single.create(new ToObservableFuture<R>(f));
}
}

@ -513,6 +513,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
};
}
// TODO use RedissonList with bounds
@Override
public List<V> subList(int fromIndex, int toIndex) {
int size = size();

@ -22,7 +22,6 @@ import static org.redisson.client.protocol.RedisCommands.LPOP;
import static org.redisson.client.protocol.RedisCommands.LPUSH;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import static org.redisson.client.protocol.RedisCommands.RPUSH;
import static org.redisson.client.protocol.RedisCommands.RPUSH_BOOLEAN;
import java.util.ArrayList;
import java.util.Collection;
@ -44,6 +43,7 @@ import rx.Observable.OnSubscribe;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.subjects.PublishSubject;
/**
@ -70,7 +70,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Observable<V> descendingIterator() {
return iterator(0, false);
return iterator(-1, false);
}
@Override
@ -123,8 +123,8 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Boolean> add(V e) {
return commandExecutor.writeObservable(getName(), codec, RPUSH_BOOLEAN, getName(), e);
public Single<Long> add(V e) {
return commandExecutor.writeObservable(getName(), codec, RPUSH, getName(), e);
}
@Override
@ -158,6 +158,8 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
final PublishSubject<Long> promise = newObservable();
ConnectableObservable<Long> r = promise.replay();
r.connect();
Single<Long> sizeObservable = size();
sizeObservable.subscribe(new SingleSubscriber<Long>() {
@Override
@ -187,7 +189,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
});
return promise.toSingle();
return r.toSingle();
}
@Override

@ -109,7 +109,7 @@ public interface RedisCommands {
RedisCommand<Object> LINDEX = new RedisCommand<Object>("LINDEX");
RedisCommand<Object> LINSERT = new RedisCommand<Object>("LINSERT", 3, ValueType.OBJECTS);
RedisStrictCommand<Integer> LLEN_INT = new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor());
RedisStrictCommand<Integer> LLEN = new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor());
RedisStrictCommand<Long> LLEN = new RedisStrictCommand<Long>("LLEN");
RedisStrictCommand<Boolean> LTRIM = new RedisStrictCommand<Boolean>("LTRIM", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EXPIRE = new RedisStrictCommand<Boolean>("EXPIRE", new BooleanReplayConvertor());

@ -36,7 +36,7 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
Single<Long> size();
Single<Boolean> add(V e);
Single<Long> add(V e);
Single<Long> addAll(Collection<? extends V> c);

@ -27,6 +27,7 @@ import rx.Single;
*
* @param <V> the type of elements held in this collection
*/
// TODO add sublist support
public interface RListReactive<V> extends RCollectionReactive<V> {
Observable<V> descendingIterator();

@ -0,0 +1,551 @@
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CountDownLatch;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisException;
import org.redisson.core.RListReactive;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
public class RedissonListReactiveTest extends BaseReactiveTest {
@Test
public void testAddByIndex() {
RListReactive<String> test2 = redisson.getList("test2");
sync(test2.add("foo"));
sync(test2.add(0, "bar"));
MatcherAssert.assertThat(test2.iterator().toBlocking().toIterable(), Matchers.contains("bar", "foo"));
}
@Test
public void testAddAllWithIndex() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new SingleSubscriber<Long>() {
@Override
public void onSuccess(Long value) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new SingleSubscriber<Long>() {
@Override
public void onSuccess(Long value) {
latch.countDown();
}
@Override
public void onError(Throwable error) {
Assert.fail(error.getMessage());
}
});
}
@Override
public void onError(Throwable error) {
Assert.fail(error.getMessage());
}
});
latch.await();
Assert.assertThat(sync(list), Matchers.contains(1L, 2L, 3L, 1L, 24L, 3L));
}
@Test
public void testAdd() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.add(1L).subscribe(new SingleSubscriber<Long>() {
@Override
public void onSuccess(Long value) {
list.add(2L).subscribe(new SingleSubscriber<Long>() {
@Override
public void onSuccess(Long value) {
latch.countDown();
}
@Override
public void onError(Throwable error) {
Assert.fail(error.getMessage());
}
});
}
@Override
public void onError(Throwable error) {
Assert.fail(error.getMessage());
}
});
latch.await();
Assert.assertThat(sync(list), Matchers.contains(1L, 2L));
}
private <V> Iterable<V> sync(RListReactive<V> list) {
return list.iterator().toBlocking().toIterable();
}
@Test
public void testLong() {
RListReactive<Long> list = redisson.getList("list");
sync(list.add(1L));
sync(list.add(2L));
Assert.assertThat(sync(list), Matchers.contains(1L, 2L));
}
@Test
public void testListIteratorIndex() {
RListReactive<Integer> list = redisson.getList("list2");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.add(0));
sync(list.add(7));
sync(list.add(8));
sync(list.add(0));
sync(list.add(10));
Iterator<Integer> iterator = list.iterator().toBlocking().getIterator();
Assert.assertTrue(1 == iterator.next());
Assert.assertTrue(2 == iterator.next());
Assert.assertTrue(3 == iterator.next());
Assert.assertTrue(4 == iterator.next());
Assert.assertTrue(5 == iterator.next());
Assert.assertTrue(0 == iterator.next());
Assert.assertTrue(7 == iterator.next());
Assert.assertTrue(8 == iterator.next());
Assert.assertTrue(0 == iterator.next());
Assert.assertTrue(10 == iterator.next());
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testListIteratorPrevious() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.add(0));
sync(list.add(7));
sync(list.add(8));
sync(list.add(0));
sync(list.add(10));
Iterator<Integer> iterator = list.descendingIterator().toBlocking().getIterator();
Assert.assertTrue(10 == iterator.next());
Assert.assertTrue(0 == iterator.next());
Assert.assertTrue(8 == iterator.next());
Assert.assertTrue(7 == iterator.next());
Assert.assertTrue(0 == iterator.next());
Assert.assertTrue(5 == iterator.next());
Assert.assertTrue(4 == iterator.next());
Assert.assertTrue(3 == iterator.next());
Assert.assertTrue(2 == iterator.next());
Assert.assertTrue(1 == iterator.next());
Assert.assertFalse(iterator.hasNext());
}
@Test
public void testLastIndexOfNone() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Assert.assertEquals(-1, list.lastIndexOf(10).toBlocking().value().intValue());
}
@Test
public void testLastIndexOf2() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.add(0));
sync(list.add(7));
sync(list.add(8));
sync(list.add(0));
sync(list.add(10));
int index = list.lastIndexOf(3).toBlocking().value();
Assert.assertEquals(2, index);
}
@Test
public void testLastIndexOf1() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.add(3));
sync(list.add(7));
sync(list.add(8));
sync(list.add(0));
sync(list.add(10));
int index = list.lastIndexOf(3).toBlocking().value();
Assert.assertEquals(5, index);
}
@Test
public void testLastIndexOf() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.add(3));
sync(list.add(7));
sync(list.add(8));
sync(list.add(3));
sync(list.add(10));
int index = list.lastIndexOf(3).toBlocking().value();
Assert.assertEquals(8, index);
}
@Test
public void testIndexOf() {
RListReactive<Integer> list = redisson.getList("list");
for (int i = 1; i < 200; i++) {
sync(list.add(i));
}
Assert.assertTrue(55 == list.indexOf(56).toBlocking().value());
Assert.assertTrue(99 == list.indexOf(100).toBlocking().value());
Assert.assertTrue(-1 == list.indexOf(200).toBlocking().value());
Assert.assertTrue(-1 == list.indexOf(0).toBlocking().value());
}
@Test
public void testRemove() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Integer val = list.remove(0).toBlocking().value();
Assert.assertTrue(1 == val);
Assert.assertThat(sync(list), Matchers.contains(2, 3, 4, 5));
}
@Test
public void testSet() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.set(4, 6));
Assert.assertThat(sync(list), Matchers.contains(1, 2, 3, 4, 6));
}
@Test(expected = RedisException.class)
public void testSetFail() throws InterruptedException {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.set(5, 6));
}
@Test
public void testRemoveAllEmpty() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Assert.assertFalse(sync(list.removeAll(Collections.emptyList())));
Assert.assertFalse(Arrays.asList(1).removeAll(Collections.emptyList()));
}
@Test
public void testRemoveAll() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Assert.assertFalse(sync(list.removeAll(Collections.emptyList())));
Assert.assertTrue(sync(list.removeAll(Arrays.asList(3, 2, 10, 6))));
Assert.assertThat(sync(list), Matchers.contains(1, 4, 5));
Assert.assertTrue(sync(list.removeAll(Arrays.asList(4))));
Assert.assertThat(sync(list), Matchers.contains(1, 5));
Assert.assertTrue(sync(list.removeAll(Arrays.asList(1, 5, 1, 5))));
Assert.assertEquals(0, sync(list.size()).longValue());
}
@Test
public void testRetainAll() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Assert.assertTrue(sync(list.retainAll(Arrays.asList(3, 2, 10, 6))));
Assert.assertThat(sync(list), Matchers.contains(2, 3));
Assert.assertEquals(2, sync(list.size()).longValue());
}
@Test
public void testFastSet() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.fastSet(0, 3));
Assert.assertEquals(3, (int)sync(list.get(0)));
}
@Test
public void testRetainAllEmpty() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Assert.assertTrue(sync(list.retainAll(Collections.<Integer>emptyList())));
Assert.assertEquals(0, sync(list.size()).intValue());
}
@Test
public void testRetainAllNoModify() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
Assert.assertFalse(sync(list.retainAll(Arrays.asList(1, 2)))); // nothing changed
Assert.assertThat(sync(list), Matchers.contains(1, 2));
}
@Test
public void testAddAllIndex() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
sync(list.addAll(2, Arrays.asList(7, 8, 9)));
Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 5));
sync(list.addAll(sync(list.size())-1, Arrays.asList(9, 1, 9)));
Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5));
sync(list.addAll(sync(list.size()), Arrays.asList(0, 5)));
Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5));
sync(list.addAll(0, Arrays.asList(6, 7)));
Assert.assertThat(sync(list), Matchers.contains(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5));
}
@Test
public void testAddAll() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
Assert.assertEquals(8, sync(list.addAll(Arrays.asList(7, 8, 9))).intValue());
Assert.assertEquals(11, sync(list.addAll(Arrays.asList(9, 1, 9))).intValue());
Assert.assertThat(sync(list), Matchers.contains(1, 2, 3, 4, 5, 7, 8, 9, 9, 1, 9));
}
@Test
public void testAddAllEmpty() {
RListReactive<Integer> list = redisson.getList("list");
Assert.assertEquals(0, sync(list.addAll(Collections.<Integer>emptyList())).intValue());
Assert.assertEquals(0, sync(list.size()).intValue());
}
@Test
public void testContainsAll() {
RListReactive<Integer> list = redisson.getList("list");
for (int i = 0; i < 200; i++) {
sync(list.add(i));
}
Assert.assertTrue(sync(list.containsAll(Arrays.asList(30, 11))));
Assert.assertFalse(sync(list.containsAll(Arrays.asList(30, 711, 11))));
Assert.assertTrue(sync(list.containsAll(Arrays.asList(30))));
}
@Test
public void testContainsAllEmpty() {
RListReactive<Integer> list = redisson.getList("list");
for (int i = 0; i < 200; i++) {
list.add(i);
}
Assert.assertTrue(sync(list.containsAll(Collections.emptyList())));
Assert.assertTrue(Arrays.asList(1).containsAll(Collections.emptyList()));
}
@Test
public void testIteratorSequence() {
RListReactive<String> list = redisson.getList("list2");
sync(list.add("1"));
sync(list.add("4"));
sync(list.add("2"));
sync(list.add("5"));
sync(list.add("3"));
checkIterator(list);
// to test "memory effect" absence
checkIterator(list);
}
private void checkIterator(RListReactive<String> list) {
int iteration = 0;
for (Iterator<String> iterator = list.iterator().toBlocking().getIterator(); iterator.hasNext();) {
String value = iterator.next();
String val = sync(list.get(iteration));
Assert.assertEquals(val, value);
iteration++;
}
Assert.assertEquals(sync(list.size()).intValue(), iteration);
}
@Test
public void testContains() {
RListReactive<String> list = redisson.getList("list");
sync(list.add("1"));
sync(list.add("4"));
sync(list.add("2"));
sync(list.add("5"));
sync(list.add("3"));
Assert.assertTrue(sync(list.contains("3")));
Assert.assertFalse(sync(list.contains("31")));
Assert.assertTrue(sync(list.contains("1")));
}
// @Test(expected = RedisException.class)
// public void testGetFail() {
// RListReactive<String> list = redisson.getList("list");
//
// sync(list.get(0));
// }
@Test
public void testAddGet() {
RListReactive<String> list = redisson.getList("list");
sync(list.add("1"));
sync(list.add("4"));
sync(list.add("2"));
sync(list.add("5"));
sync(list.add("3"));
String val1 = sync(list.get(0));
Assert.assertEquals("1", val1);
String val2 = sync(list.get(3));
Assert.assertEquals("5", val2);
}
@Test
public void testDuplicates() {
RListReactive<TestObject> list = redisson.getList("list");
sync(list.add(new TestObject("1", "2")));
sync(list.add(new TestObject("1", "2")));
sync(list.add(new TestObject("2", "3")));
sync(list.add(new TestObject("3", "4")));
sync(list.add(new TestObject("5", "6")));
Assert.assertEquals(5, sync(list.size()).intValue());
}
@Test
public void testSize() {
RListReactive<String> list = redisson.getList("list");
sync(list.add("1"));
sync(list.add("2"));
sync(list.add("3"));
sync(list.add("4"));
sync(list.add("5"));
sync(list.add("6"));
Assert.assertThat(sync(list), Matchers.contains("1", "2", "3", "4", "5", "6"));
sync(list.remove("2"));
Assert.assertThat(sync(list), Matchers.contains("1", "3", "4", "5", "6"));
sync(list.remove("4"));
Assert.assertThat(sync(list), Matchers.contains("1", "3", "5", "6"));
}
@Test
public void testCodec() {
RListReactive<Object> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2L));
sync(list.add("3"));
sync(list.add("e"));
Assert.assertThat(sync(list), Matchers.<Object>contains(1, 2L, "3", "e"));
}
}

@ -740,13 +740,6 @@ public class RedissonListTest extends BaseTest {
Assert.assertTrue(list.contains("1"));
}
@Test(expected = IndexOutOfBoundsException.class)
public void testGetFail2() {
List<String> list = redisson.getList("list");
list.get(0);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testGetFail() {
List<String> list = redisson.getList("list");

Loading…
Cancel
Save