diff --git a/src/main/java/org/redisson/reactive/RedissonListReactive.java b/src/main/java/org/redisson/reactive/RedissonListReactive.java index 236d50729..afe5d5fa7 100644 --- a/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -27,7 +27,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -42,7 +41,6 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.command.CommandReactiveExecutor; -import reactor.core.reactivestreams.SubscriberBarrier; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -173,7 +171,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Publisher addAll(final Collection c) { + public Publisher addAll(Collection c) { if (c.isEmpty()) { return size(); } @@ -185,7 +183,11 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Publisher addAll(final long index, final Collection coll) { + public Publisher addAll(long index, Collection coll) { + if (index < 0) { + throw new IndexOutOfBoundsException("index: " + index); + } + if (coll.isEmpty()) { return size(); } @@ -198,41 +200,19 @@ public class RedissonListReactive extends RedissonExpirableReactive implement return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH, elements.toArray()); } - final Processor promise = newObservable(); - - Publisher s = size(); - s.subscribe(new SubscriberBarrier(promise) { - @Override - public void doNext(Long size) { - if (!isPositionInRange(index, size)) { - IndexOutOfBoundsException e = new IndexOutOfBoundsException("index: " + index + " but current size: "+ size); - promise.onError(e); - return; - } - - if (index >= size) { - addAll(coll).subscribe(toSubscriber(promise)); - return; - } - - // insert into middle of list - - List args = new ArrayList(coll.size() + 1); - args.add(index); - args.addAll(coll); - Publisher f = commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand("EVAL", new LongReplayConvertor(), 5), - "local ind = table.remove(ARGV, 1); " + // index is the first parameter - "local tail = redis.call('lrange', KEYS[1], ind, -1); " + - "redis.call('ltrim', KEYS[1], 0, ind - 1); " + - "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + - "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + - "return redis.call('llen', KEYS[1]);", - Collections.singletonList(getName()), args.toArray()); - f.subscribe(toSubscriber(promise)); - } - - }); - return promise; + List args = new ArrayList(coll.size() + 1); + args.add(index); + args.addAll(coll); + return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand("EVAL", new LongReplayConvertor(), 5), + "local ind = table.remove(ARGV, 1); " + // index is the first parameter + "local size = redis.call('llen', KEYS[1]); " + + "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " + + "local tail = redis.call('lrange', KEYS[1], ind, -1); " + + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + + "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + + "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + + "return redis.call('llen', KEYS[1]);", + Collections.singletonList(getName()), args.toArray()); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 81c251806..9d0b4f345 100644 --- a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -60,10 +60,6 @@ abstract class RedissonObjectReactive implements RObjectReactive { }; } - protected Processor newObservable() { - return Broadcaster.create(); - } - protected Stream newSucceededObservable(V result) { return Streams.just(result); } diff --git a/src/test/java/org/redisson/BaseReactiveTest.java b/src/test/java/org/redisson/BaseReactiveTest.java index 9bebcda0d..f991a5f07 100644 --- a/src/test/java/org/redisson/BaseReactiveTest.java +++ b/src/test/java/org/redisson/BaseReactiveTest.java @@ -11,6 +11,7 @@ import org.redisson.api.RCollectionReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RedissonReactiveClient; +import reactor.rx.Promise; import reactor.rx.Streams; public abstract class BaseReactiveTest { @@ -40,7 +41,11 @@ public abstract class BaseReactiveTest { } public V sync(Publisher ob) { - List t = Streams.create(ob).toList().poll(); + Promise> promise = Streams.create(ob).toList(); + List t = promise.poll(); + if (promise.isError()) { + throw new RuntimeException(promise.reason()); + } if (t == null) { return null; } @@ -64,7 +69,7 @@ public abstract class BaseReactiveTest { @After public void after() { - redisson.flushdb(); + sync(redisson.getKeys().flushdb()); } } diff --git a/src/test/java/org/redisson/RedissonKeysReactiveTest.java b/src/test/java/org/redisson/RedissonKeysReactiveTest.java index e2bff39b6..8ec975b58 100644 --- a/src/test/java/org/redisson/RedissonKeysReactiveTest.java +++ b/src/test/java/org/redisson/RedissonKeysReactiveTest.java @@ -36,7 +36,7 @@ public class RedissonKeysReactiveTest extends BaseReactiveTest { MatcherAssert.assertThat(sync(redisson.getKeys().randomKey()), Matchers.isOneOf("test1", "test2")); sync(redisson.getKeys().delete("test1")); Assert.assertEquals(sync(redisson.getKeys().randomKey()), "test2"); - redisson.flushdb(); + sync(redisson.getKeys().flushdb()); Assert.assertNull(sync(redisson.getKeys().randomKey())); } diff --git a/src/test/java/org/redisson/RedissonListReactiveTest.java b/src/test/java/org/redisson/RedissonListReactiveTest.java index 210b43780..550a65d9d 100644 --- a/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -1,8 +1,10 @@ package org.redisson; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CountDownLatch; import org.hamcrest.MatcherAssert; @@ -359,6 +361,12 @@ public class RedissonListReactiveTest extends BaseReactiveTest { Assert.assertThat(sync(list), Matchers.contains(1, 2)); } + @Test(expected = RedisException.class) + public void testAddAllError() { + RListReactive list = redisson.getList("list"); + sync(list.addAll(2, Arrays.asList(7, 8, 9))); + } + @Test public void testAddAllIndex() { RListReactive list = redisson.getList("list"); @@ -368,7 +376,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(4)); sync(list.add(5)); - sync(list.addAll(2, Arrays.asList(7, 8, 9))); + Assert.assertEquals(8, sync(list.addAll(2, Arrays.asList(7, 8, 9))).longValue()); Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 5));