RedissonListReactive.addAll fixed

pull/337/head
Nikita 9 years ago
parent 914892f534
commit bcb307b3e7

@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
@ -42,7 +41,6 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream; import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription; import reactor.rx.subscription.ReactiveSubscription;
@ -173,7 +171,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
} }
@Override @Override
public Publisher<Long> addAll(final Collection<? extends V> c) { public Publisher<Long> addAll(Collection<? extends V> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return size(); return size();
} }
@ -185,7 +183,11 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
} }
@Override @Override
public Publisher<Long> addAll(final long index, final Collection<? extends V> coll) { public Publisher<Long> addAll(long index, Collection<? extends V> coll) {
if (index < 0) {
throw new IndexOutOfBoundsException("index: " + index);
}
if (coll.isEmpty()) { if (coll.isEmpty()) {
return size(); return size();
} }
@ -198,41 +200,19 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH, elements.toArray()); return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH, elements.toArray());
} }
final Processor<Long, Long> promise = newObservable(); List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
Publisher<Long> s = size(); args.addAll(coll);
s.subscribe(new SubscriberBarrier<Long, Long>(promise) { return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5),
@Override "local ind = table.remove(ARGV, 1); " + // index is the first parameter
public void doNext(Long size) { "local size = redis.call('llen', KEYS[1]); " +
if (!isPositionInRange(index, size)) { "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " +
IndexOutOfBoundsException e = new IndexOutOfBoundsException("index: " + index + " but current size: "+ size); "local tail = redis.call('lrange', KEYS[1], ind, -1); " +
promise.onError(e); "redis.call('ltrim', KEYS[1], 0, ind - 1); " +
return; "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
} "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return redis.call('llen', KEYS[1]);",
if (index >= size) { Collections.<Object>singletonList(getName()), args.toArray());
addAll(coll).subscribe(toSubscriber(promise));
return;
}
// insert into middle of list
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
args.addAll(coll);
Publisher<Long> f = commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5),
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
"for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return redis.call('llen', KEYS[1]);",
Collections.<Object>singletonList(getName()), args.toArray());
f.subscribe(toSubscriber(promise));
}
});
return promise;
} }
@Override @Override

@ -60,10 +60,6 @@ abstract class RedissonObjectReactive implements RObjectReactive {
}; };
} }
protected <V> Processor<V, V> newObservable() {
return Broadcaster.create();
}
protected <V> Stream<V> newSucceededObservable(V result) { protected <V> Stream<V> newSucceededObservable(V result) {
return Streams.just(result); return Streams.just(result);
} }

@ -11,6 +11,7 @@ import org.redisson.api.RCollectionReactive;
import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RedissonReactiveClient; import org.redisson.api.RedissonReactiveClient;
import reactor.rx.Promise;
import reactor.rx.Streams; import reactor.rx.Streams;
public abstract class BaseReactiveTest { public abstract class BaseReactiveTest {
@ -40,7 +41,11 @@ public abstract class BaseReactiveTest {
} }
public <V> V sync(Publisher<V> ob) { public <V> V sync(Publisher<V> ob) {
List<V> t = Streams.create(ob).toList().poll(); Promise<List<V>> promise = Streams.create(ob).toList();
List<V> t = promise.poll();
if (promise.isError()) {
throw new RuntimeException(promise.reason());
}
if (t == null) { if (t == null) {
return null; return null;
} }
@ -64,7 +69,7 @@ public abstract class BaseReactiveTest {
@After @After
public void after() { public void after() {
redisson.flushdb(); sync(redisson.getKeys().flushdb());
} }
} }

@ -36,7 +36,7 @@ public class RedissonKeysReactiveTest extends BaseReactiveTest {
MatcherAssert.assertThat(sync(redisson.getKeys().randomKey()), Matchers.isOneOf("test1", "test2")); MatcherAssert.assertThat(sync(redisson.getKeys().randomKey()), Matchers.isOneOf("test1", "test2"));
sync(redisson.getKeys().delete("test1")); sync(redisson.getKeys().delete("test1"));
Assert.assertEquals(sync(redisson.getKeys().randomKey()), "test2"); Assert.assertEquals(sync(redisson.getKeys().randomKey()), "test2");
redisson.flushdb(); sync(redisson.getKeys().flushdb());
Assert.assertNull(sync(redisson.getKeys().randomKey())); Assert.assertNull(sync(redisson.getKeys().randomKey()));
} }

@ -1,8 +1,10 @@
package org.redisson; package org.redisson;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
@ -359,6 +361,12 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
Assert.assertThat(sync(list), Matchers.contains(1, 2)); Assert.assertThat(sync(list), Matchers.contains(1, 2));
} }
@Test(expected = RedisException.class)
public void testAddAllError() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.addAll(2, Arrays.asList(7, 8, 9)));
}
@Test @Test
public void testAddAllIndex() { public void testAddAllIndex() {
RListReactive<Integer> list = redisson.getList("list"); RListReactive<Integer> list = redisson.getList("list");
@ -368,7 +376,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(4)); sync(list.add(4));
sync(list.add(5)); sync(list.add(5));
sync(list.addAll(2, Arrays.asList(7, 8, 9))); Assert.assertEquals(8, sync(list.addAll(2, Arrays.asList(7, 8, 9))).longValue());
Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 5)); Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 5));

Loading…
Cancel
Save