diff --git a/src/main/java/org/redisson/RedissonDeque.java b/src/main/java/org/redisson/RedissonDeque.java index 75cc4146d..04226c2ea 100644 --- a/src/main/java/org/redisson/RedissonDeque.java +++ b/src/main/java/org/redisson/RedissonDeque.java @@ -22,7 +22,6 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.ListFirstObjectDecoder; @@ -39,8 +38,6 @@ import io.netty.util.concurrent.Future; */ public class RedissonDeque extends RedissonQueue implements RDeque { - private static final RedisCommand LPUSH_VOID = new RedisCommand("LPUSH", new VoidReplayConvertor()); - private static final RedisCommand LPUSH_BOOLEAN = new RedisCommand("LPUSH", new TrueReplayConvertor()); private static final RedisCommand RPUSH_VOID = new RedisCommand("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); private static final RedisCommand LRANGE_SINGLE = new RedisCommand("LRANGE", new ListFirstObjectDecoder()); @@ -60,7 +57,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future addFirstAsync(V e) { - return commandExecutor.writeAsync(getName(), codec, LPUSH_VOID, getName(), e); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSH_VOID, getName(), e); } @Override @@ -131,7 +128,7 @@ public class RedissonDeque extends RedissonQueue implements RDeque { @Override public Future offerFirstAsync(V e) { - return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, getName(), e); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPUSH_BOOLEAN, getName(), e); } @Override diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index c02dd35e5..afed4dd8c 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -15,7 +15,14 @@ */ package org.redisson; -import static org.redisson.client.protocol.RedisCommands.*; +import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT; +import static org.redisson.client.protocol.RedisCommands.LINDEX; +import static org.redisson.client.protocol.RedisCommands.LLEN_INT; +import static org.redisson.client.protocol.RedisCommands.LPOP; +import static org.redisson.client.protocol.RedisCommands.LPUSH_BOOLEAN; +import static org.redisson.client.protocol.RedisCommands.LRANGE; +import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE; +import static org.redisson.client.protocol.RedisCommands.RPUSH_BOOLEAN; import java.util.ArrayList; import java.util.Collection; @@ -36,8 +43,6 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.core.RList; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; /** * Distributed and concurrent implementation of {@link java.util.List} @@ -155,75 +160,51 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Future addAllAsync(final Collection c) { - final Promise promise = newPromise(); if (c.isEmpty()) { - promise.setSuccess(false); - return promise; + return newSucceededFuture(false); } - Future sizeFuture = sizeAsync(); - sizeFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.setFailure(future.cause()); - return; - } - final int listSize = future.getNow(); - List args = new ArrayList(c.size() + 1); - args.add(getName()); - args.addAll(c); - Future res = commandExecutor.writeAsync(getName(), codec, RPUSH, args.toArray()); - res.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - promise.setSuccess(listSize != future.getNow()); - } else { - promise.setFailure(future.cause()); - } - } - }); - } - }); - return promise; + List args = new ArrayList(c.size() + 1); + args.add(getName()); + args.addAll(c); + return commandExecutor.writeAsync(getName(), codec, RPUSH_BOOLEAN, args.toArray()); } - @Override - public boolean addAll(final int index, final Collection coll) { + public Future addAllAsync(int index, Collection coll) { + if (index < 0) { + throw new IndexOutOfBoundsException("index: " + index); + } + if (coll.isEmpty()) { - return false; + return newSucceededFuture(false); } + if (index == 0) { // prepend elements to list List elements = new ArrayList(coll); Collections.reverse(elements); elements.add(0, getName()); - Future f = commandExecutor.writeAsync(getName(), codec, LPUSH, elements.toArray()); - Long newSize = get(f); - return newSize != size(); + return commandExecutor.writeAsync(getName(), codec, LPUSH_BOOLEAN, elements.toArray()); } - checkPosition(index); - int size = size(); - if (index < size) { - // insert into middle of list - List args = new ArrayList(coll.size() + 1); - args.add(index); - args.addAll(coll); - Future f = commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), - "local ind = table.remove(ARGV, 1); " + // index is the first parameter - "local tail = redis.call('lrange', KEYS[1], ind, -1); " + - "redis.call('ltrim', KEYS[1], 0, ind - 1); " + - "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + - "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + - "return true", - Collections.singletonList(getName()), args.toArray()); - return get(f); - } else { - // append to list - return addAll(coll); - } + List args = new ArrayList(coll.size() + 1); + args.add(index); + args.addAll(coll); + return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 5), + "local ind = table.remove(ARGV, 1); " + // index is the first parameter + "local size = redis.call('llen', KEYS[1]); " + + "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " + + "local tail = redis.call('lrange', KEYS[1], ind, -1); " + + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + + "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + + "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + + "return true", + Collections.singletonList(getName()), args.toArray()); + } + + @Override + public boolean addAll(final int index, final Collection coll) { + return get(addAllAsync(index, coll)); } @Override @@ -305,16 +286,6 @@ public class RedissonList extends RedissonExpirable implements RList { return index >= 0 && index < size; } - private void checkPosition(int index) { - int size = size(); - if (!isPositionInRange(index, size)) - throw new IndexOutOfBoundsException("index: " + index + " but current size: "+ size); - } - - private boolean isPositionInRange(int index, int size) { - return index >= 0 && index <= size; - } - @Override public V set(int index, V element) { checkIndex(index); diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 7278358d0..0f0a2c00b 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -133,6 +133,8 @@ public interface RedisCommands { RedisStrictCommand RPOP = new RedisStrictCommand("RPOP"); RedisStrictCommand LPUSH = new RedisStrictCommand("LPUSH", 2); + RedisStrictCommand LPUSH_BOOLEAN = new RedisStrictCommand("LPUSH", new TrueReplayConvertor(), 2); + RedisStrictCommand LPUSH_VOID = new RedisStrictCommand("LPUSH", new VoidReplayConvertor(), 2); RedisCommand> LRANGE = new RedisCommand>("LRANGE", new ObjectListReplayDecoder()); RedisCommand RPUSH = new RedisCommand("RPUSH", 2, ValueType.OBJECTS); RedisCommand RPUSH_BOOLEAN = new RedisCommand("RPUSH", new TrueReplayConvertor(), 2, ValueType.OBJECTS); diff --git a/src/main/java/org/redisson/core/RListAsync.java b/src/main/java/org/redisson/core/RListAsync.java index a781d4507..f9d7da761 100644 --- a/src/main/java/org/redisson/core/RListAsync.java +++ b/src/main/java/org/redisson/core/RListAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.core; +import java.util.Collection; + import io.netty.util.concurrent.Future; /** @@ -26,6 +28,8 @@ import io.netty.util.concurrent.Future; */ public interface RListAsync extends RCollectionAsync { + Future addAllAsync(int index, Collection coll); + Future lastIndexOfAsync(Object o); Future indexOfAsync(Object o); diff --git a/src/main/java/org/redisson/reactive/RedissonDequeReactive.java b/src/main/java/org/redisson/reactive/RedissonDequeReactive.java index 3206264d3..aeeebb067 100644 --- a/src/main/java/org/redisson/reactive/RedissonDequeReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonDequeReactive.java @@ -21,7 +21,6 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.command.CommandReactiveExecutor; import org.redisson.connection.decoder.ListFirstObjectDecoder; @@ -35,8 +34,6 @@ import org.redisson.connection.decoder.ListFirstObjectDecoder; */ public class RedissonDequeReactive extends RedissonQueueReactive implements RDequeReactive { - private static final RedisCommand LPUSH_VOID = new RedisCommand("LPUSH", new VoidReplayConvertor()); - private static final RedisCommand LPUSH_BOOLEAN = new RedisCommand("LPUSH", new TrueReplayConvertor()); private static final RedisCommand RPUSH_VOID = new RedisCommand("RPUSH", new VoidReplayConvertor(), 2, ValueType.OBJECTS); private static final RedisCommand LRANGE_SINGLE = new RedisCommand("LRANGE", new ListFirstObjectDecoder()); @@ -50,7 +47,7 @@ public class RedissonDequeReactive extends RedissonQueueReactive implement @Override public Publisher addFirst(V e) { - return commandExecutor.writeReactive(getName(), codec, LPUSH_VOID, getName(), e); + return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH_VOID, getName(), e); } @Override @@ -65,7 +62,7 @@ public class RedissonDequeReactive extends RedissonQueueReactive implement @Override public Publisher offerFirst(V e) { - return commandExecutor.writeReactive(getName(), codec, LPUSH_BOOLEAN, getName(), e); + return commandExecutor.writeReactive(getName(), codec, RedisCommands.LPUSH_BOOLEAN, getName(), e); } @Override diff --git a/src/test/java/org/redisson/RedissonListReactiveTest.java b/src/test/java/org/redisson/RedissonListReactiveTest.java index 550a65d9d..49fd1ddab 100644 --- a/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -1,10 +1,8 @@ package org.redisson; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.concurrent.CountDownLatch; import org.hamcrest.MatcherAssert; @@ -362,7 +360,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { } @Test(expected = RedisException.class) - public void testAddAllError() { + public void testAddAllIndexError() { RListReactive list = redisson.getList("list"); sync(list.addAll(2, Arrays.asList(7, 8, 9))); } @@ -388,7 +386,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { Assert.assertThat(sync(list), Matchers.contains(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5)); - sync(list.addAll(0, Arrays.asList(6, 7))); + Assert.assertEquals(15, sync(list.addAll(0, Arrays.asList(6, 7))).intValue()); Assert.assertThat(sync(list), Matchers.contains(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5)); } diff --git a/src/test/java/org/redisson/RedissonListTest.java b/src/test/java/org/redisson/RedissonListTest.java index 30f6db5b7..c88fce5bc 100644 --- a/src/test/java/org/redisson/RedissonListTest.java +++ b/src/test/java/org/redisson/RedissonListTest.java @@ -12,6 +12,8 @@ import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.redisson.api.RListReactive; +import org.redisson.client.RedisException; import org.redisson.core.RList; import io.netty.util.concurrent.Future; @@ -554,6 +556,11 @@ public class RedissonListTest extends BaseTest { Assert.assertThat(list, Matchers.contains(1, 2)); } + @Test(expected = RedisException.class) + public void testAddAllIndexError() { + RList list = redisson.getList("list"); + list.addAll(2, Arrays.asList(7, 8, 9)); + } @Test public void testAddAllIndex() {