RList.addAllAsync added

pull/337/head
Nikita 9 years ago
parent bcb307b3e7
commit f4424a39e0

@ -22,7 +22,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListFirstObjectDecoder; import org.redisson.connection.decoder.ListFirstObjectDecoder;
@ -39,8 +38,6 @@ import io.netty.util.concurrent.Future;
*/ */
public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> { public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder()); private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
@ -60,7 +57,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override @Override
public Future<Void> addFirstAsync(V e) { public Future<Void> addFirstAsync(V e) {
return commandExecutor.writeAsync(getName(), codec, LPUSH_VOID, getName(), e); return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSH_VOID, getName(), e);
} }
@Override @Override
@ -131,7 +128,7 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
@Override @Override
public Future<Boolean> offerFirstAsync(V e) { public Future<Boolean> offerFirstAsync(V e) {
return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, getName(), e); return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSH_BOOLEAN, getName(), e);
} }
@Override @Override

@ -15,7 +15,14 @@
*/ */
package org.redisson; package org.redisson;
import static org.redisson.client.protocol.RedisCommands.*; import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT;
import static org.redisson.client.protocol.RedisCommands.LINDEX;
import static org.redisson.client.protocol.RedisCommands.LLEN_INT;
import static org.redisson.client.protocol.RedisCommands.LPOP;
import static org.redisson.client.protocol.RedisCommands.LPUSH_BOOLEAN;
import static org.redisson.client.protocol.RedisCommands.LRANGE;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import static org.redisson.client.protocol.RedisCommands.RPUSH_BOOLEAN;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -36,8 +43,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RList; import org.redisson.core.RList;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/** /**
* Distributed and concurrent implementation of {@link java.util.List} * Distributed and concurrent implementation of {@link java.util.List}
@ -155,75 +160,51 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public Future<Boolean> addAllAsync(final Collection<? extends V> c) { public Future<Boolean> addAllAsync(final Collection<? extends V> c) {
final Promise<Boolean> promise = newPromise();
if (c.isEmpty()) { if (c.isEmpty()) {
promise.setSuccess(false); return newSucceededFuture(false);
return promise;
} }
Future<Integer> sizeFuture = sizeAsync();
sizeFuture.addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
final int listSize = future.getNow(); List<Object> args = new ArrayList<Object>(c.size() + 1);
List<Object> args = new ArrayList<Object>(c.size() + 1); args.add(getName());
args.add(getName()); args.addAll(c);
args.addAll(c); return commandExecutor.writeAsync(getName(), codec, RPUSH_BOOLEAN, args.toArray());
Future<Long> res = commandExecutor.writeAsync(getName(), codec, RPUSH, args.toArray());
res.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(listSize != future.getNow());
} else {
promise.setFailure(future.cause());
}
}
});
}
});
return promise;
} }
@Override public Future<Boolean> addAllAsync(int index, Collection<? extends V> coll) {
public boolean addAll(final int index, final Collection<? extends V> coll) { if (index < 0) {
throw new IndexOutOfBoundsException("index: " + index);
}
if (coll.isEmpty()) { if (coll.isEmpty()) {
return false; return newSucceededFuture(false);
} }
if (index == 0) { // prepend elements to list if (index == 0) { // prepend elements to list
List<Object> elements = new ArrayList<Object>(coll); List<Object> elements = new ArrayList<Object>(coll);
Collections.reverse(elements); Collections.reverse(elements);
elements.add(0, getName()); elements.add(0, getName());
Future<Long> f = commandExecutor.writeAsync(getName(), codec, LPUSH, elements.toArray()); return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, elements.toArray());
Long newSize = get(f);
return newSize != size();
} }
checkPosition(index); List<Object> args = new ArrayList<Object>(coll.size() + 1);
int size = size(); args.add(index);
if (index < size) { args.addAll(coll);
// insert into middle of list return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
List<Object> args = new ArrayList<Object>(coll.size() + 1); "local ind = table.remove(ARGV, 1); " + // index is the first parameter
args.add(index); "local size = redis.call('llen', KEYS[1]); " +
args.addAll(coll); "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " +
Future<Boolean> f = commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5), "local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"local ind = table.remove(ARGV, 1); " + // index is the first parameter "redis.call('ltrim', KEYS[1], 0, ind - 1); " +
"local tail = redis.call('lrange', KEYS[1], ind, -1); " + "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " + "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + "return true",
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + Collections.<Object>singletonList(getName()), args.toArray());
"return true", }
Collections.<Object>singletonList(getName()), args.toArray());
return get(f); @Override
} else { public boolean addAll(final int index, final Collection<? extends V> coll) {
// append to list return get(addAllAsync(index, coll));
return addAll(coll);
}
} }
@Override @Override
@ -305,16 +286,6 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return index >= 0 && index < size; return index >= 0 && index < size;
} }
private void checkPosition(int index) {
int size = size();
if (!isPositionInRange(index, size))
throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
}
private boolean isPositionInRange(int index, int size) {
return index >= 0 && index <= size;
}
@Override @Override
public V set(int index, V element) { public V set(int index, V element) {
checkIndex(index); checkIndex(index);

@ -133,6 +133,8 @@ public interface RedisCommands {
RedisStrictCommand<Long> RPOP = new RedisStrictCommand<Long>("RPOP"); RedisStrictCommand<Long> RPOP = new RedisStrictCommand<Long>("RPOP");
RedisStrictCommand<Long> LPUSH = new RedisStrictCommand<Long>("LPUSH", 2); RedisStrictCommand<Long> LPUSH = new RedisStrictCommand<Long>("LPUSH", 2);
RedisStrictCommand<Boolean> LPUSH_BOOLEAN = new RedisStrictCommand<Boolean>("LPUSH", new TrueReplayConvertor(), 2);
RedisStrictCommand<Void> LPUSH_VOID = new RedisStrictCommand<Void>("LPUSH", new VoidReplayConvertor(), 2);
RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> LRANGE = new RedisCommand<List<Object>>("LRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<Long> RPUSH = new RedisCommand<Long>("RPUSH", 2, ValueType.OBJECTS); RedisCommand<Long> RPUSH = new RedisCommand<Long>("RPUSH", 2, ValueType.OBJECTS);
RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS); RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS);

@ -15,6 +15,8 @@
*/ */
package org.redisson.core; package org.redisson.core;
import java.util.Collection;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
/** /**
@ -26,6 +28,8 @@ import io.netty.util.concurrent.Future;
*/ */
public interface RListAsync<V> extends RCollectionAsync<V> { public interface RListAsync<V> extends RCollectionAsync<V> {
Future<Boolean> addAllAsync(int index, Collection<? extends V> coll);
Future<Integer> lastIndexOfAsync(Object o); Future<Integer> lastIndexOfAsync(Object o);
Future<Integer> indexOfAsync(Object o); Future<Integer> indexOfAsync(Object o);

@ -21,7 +21,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.command.CommandReactiveExecutor; import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.decoder.ListFirstObjectDecoder; import org.redisson.connection.decoder.ListFirstObjectDecoder;
@ -35,8 +34,6 @@ import org.redisson.connection.decoder.ListFirstObjectDecoder;
*/ */
public class RedissonDequeReactive<V> extends RedissonQueueReactive<V> implements RDequeReactive<V> { public class RedissonDequeReactive<V> extends RedissonQueueReactive<V> implements RDequeReactive<V> {
private static final RedisCommand<Void> LPUSH_VOID = new RedisCommand<Void>("LPUSH", new VoidReplayConvertor());
private static final RedisCommand<Boolean> LPUSH_BOOLEAN = new RedisCommand<Boolean>("LPUSH", new TrueReplayConvertor());
private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); private static final RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS);
private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder()); private static final RedisCommand<Object> LRANGE_SINGLE = new RedisCommand<Object>("LRANGE", new ListFirstObjectDecoder());
@ -50,7 +47,7 @@ public class RedissonDequeReactive<V> extends RedissonQueueReactive<V> implement
@Override @Override
public Publisher<Void> addFirst(V e) { public Publisher<Void> addFirst(V e) {
return commandExecutor.writeReactive(getName(), codec, LPUSH_VOID, getName(), e); return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH_VOID, getName(), e);
} }
@Override @Override
@ -65,7 +62,7 @@ public class RedissonDequeReactive<V> extends RedissonQueueReactive<V> implement
@Override @Override
public Publisher<Boolean> offerFirst(V e) { public Publisher<Boolean> offerFirst(V e) {
return commandExecutor.writeReactive(getName(), codec, LPUSH_BOOLEAN, getName(), e); return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH_BOOLEAN, getName(), e);
} }
@Override @Override

@ -1,10 +1,8 @@
package org.redisson; package org.redisson;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
@ -362,7 +360,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
} }
@Test(expected = RedisException.class) @Test(expected = RedisException.class)
public void testAddAllError() { public void testAddAllIndexError() {
RListReactive<Integer> list = redisson.getList("list"); RListReactive<Integer> list = redisson.getList("list");
sync(list.addAll(2, Arrays.asList(7, 8, 9))); sync(list.addAll(2, Arrays.asList(7, 8, 9)));
} }
@ -388,7 +386,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5)); Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5));
sync(list.addAll(0, Arrays.asList(6, 7))); Assert.assertEquals(15, sync(list.addAll(0, Arrays.asList(6, 7))).intValue());
Assert.assertThat(sync(list), Matchers.contains(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5)); Assert.assertThat(sync(list), Matchers.contains(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5));
} }

@ -12,6 +12,8 @@ import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.api.RListReactive;
import org.redisson.client.RedisException;
import org.redisson.core.RList; import org.redisson.core.RList;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -554,6 +556,11 @@ public class RedissonListTest extends BaseTest {
Assert.assertThat(list, Matchers.contains(1, 2)); Assert.assertThat(list, Matchers.contains(1, 2));
} }
@Test(expected = RedisException.class)
public void testAddAllIndexError() {
RList<Integer> list = redisson.getList("list");
list.addAll(2, Arrays.asList(7, 8, 9));
}
@Test @Test
public void testAddAllIndex() { public void testAddAllIndex() {

Loading…
Cancel
Save