From 0e0e1c1b3f4fe616177f4f0c5a7d83774aab76e7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 17 Jul 2015 11:33:57 +0300 Subject: [PATCH] RListAsync interface added. fastSet/fastSetAsync methods added. #186 --- src/main/java/org/redisson/RedissonList.java | 190 ++++++++++-------- .../client/protocol/RedisCommand.java | 6 + .../client/protocol/RedisCommands.java | 6 +- .../convertor/IntegerReplayConvertor.java | 10 + .../java/org/redisson/core/RBucketAsync.java | 2 +- src/main/java/org/redisson/core/RList.java | 13 +- .../java/org/redisson/core/RListAsync.java | 58 ++++++ .../java/org/redisson/RedissonListTest.java | 35 ++++ 8 files changed, 228 insertions(+), 92 deletions(-) create mode 100644 src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java create mode 100644 src/main/java/org/redisson/core/RListAsync.java diff --git a/src/main/java/org/redisson/RedissonList.java b/src/main/java/org/redisson/RedissonList.java index b10168c67..59b0edfb6 100644 --- a/src/main/java/org/redisson/RedissonList.java +++ b/src/main/java/org/redisson/RedissonList.java @@ -24,7 +24,9 @@ import java.util.ListIterator; import java.util.NoSuchElementException; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import static org.redisson.client.protocol.RedisCommands.*; import org.redisson.connection.ConnectionManager; @@ -43,16 +45,17 @@ import io.netty.util.concurrent.Promise; */ public class RedissonList extends RedissonExpirable implements RList { - private int batchSize = 50; - protected RedissonList(ConnectionManager connectionManager, String name) { super(connectionManager, name); } @Override public int size() { - Long size = connectionManager.read(getName(), LLEN, getName()); - return size.intValue(); + return connectionManager.get(sizeAsync()); + } + + public Future sizeAsync() { + return connectionManager.readAsync(getName(), LLEN, getName()); } @Override @@ -72,17 +75,22 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public Object[] toArray() { - List list = readAllList(); + List list = readAll(); return list.toArray(); } - protected List readAllList() { - return connectionManager.read(getName(), LRANGE, getName(), 0, -1); + private List readAll() { + return connectionManager.get(readAllAsync()); + } + + @Override + public Future> readAllAsync() { + return connectionManager.readAsync(getName(), LRANGE, getName(), 0, -1); } @Override public T[] toArray(T[] a) { - List list = readAllList(); + List list = readAll(); return list.toArray(a); } @@ -101,31 +109,36 @@ public class RedissonList extends RedissonExpirable implements RList { return remove(o, 1); } + @Override + public Future removeAsync(Object o) { + return removeAsync(o, 1); + } + + protected Future removeAsync(Object o, int count) { + return connectionManager.writeAsync(getName(), LREM_SINGLE, getName(), count, o); + } + protected boolean remove(Object o, int count) { - return (Long)connectionManager.write(getName(), LREM, getName(), count, o) > 0; + return connectionManager.get(removeAsync(o, count)); } @Override - public boolean containsAll(Collection c) { - if (isEmpty() || c.isEmpty()) { - return false; - } - - Collection copy = new ArrayList(c); - int to = div(size(), batchSize); - for (int i = 0; i < to; i++) { - final int j = i; - List range = connectionManager.read(getName(), LRANGE, getName(), j*batchSize, j*batchSize + batchSize - 1); - for (Iterator iterator = copy.iterator(); iterator.hasNext();) { - Object obj = iterator.next(); - int index = range.indexOf(obj); - if (index != -1) { - iterator.remove(); - } - } - } - return copy.isEmpty(); + public Future containsAllAsync(Collection c) { + return connectionManager.evalReadAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + "local s = redis.call('llen', KEYS[1]);" + + "for i = 0, s, 1 do " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == redis.call('lindex', KEYS[1], i) " + + "then table.remove(ARGV, j) end " + + "end; " + +"end;" + + "return table.getn(ARGV) == 0; ", + Collections.singletonList(getName()), c.toArray()); + } + @Override + public boolean containsAll(Collection c) { + return connectionManager.get(containsAllAsync(c)); } @Override @@ -189,16 +202,6 @@ public class RedissonList extends RedissonExpirable implements RList { "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + "return true", Collections.singletonList(getName()), args.toArray()); - -// return "OK".equals(new RedissonScript(connectionManager).evalR( -// "local ind = table.remove(ARGV); " + // index is last parameter -// "local tail = redis.call('lrange', KEYS[1], ind, -1); " + -// "redis.call('ltrim', KEYS[1], 0, ind - 1); " + -// "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + -// "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + -// "return 'OK'", -// RScript.ReturnType.STATUS, -// Collections.singletonList(getName()), new ArrayList(coll), Collections.singletonList(index))); } else { // append to list return addAll(coll); @@ -206,34 +209,59 @@ public class RedissonList extends RedissonExpirable implements RList { } @Override - public boolean removeAll(Collection c) { + public Future removeAllAsync(Collection c) { if (c.isEmpty()) { - return false; + return connectionManager.getGroup().next().newSucceededFuture(false); } - boolean result = false; - for (Object object : c) { - boolean res = (Long)connectionManager.write(getName(), LREM, getName(), 0, object) > 0; - if (!result) { - result = res; - } - } - return result; + return connectionManager.evalWriteAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + "local v = true " + + "for i = 0, table.getn(ARGV), 1 do " + + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 0 " + + "then v = false end " + +"end " + + "return v ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public boolean removeAll(Collection c) { + return connectionManager.get(removeAllAsync(c)); } @Override public boolean retainAll(Collection c) { - boolean changed = false; - for (Iterator iterator = iterator(); iterator.hasNext();) { - V object = iterator.next(); - if (!c.contains(object)) { - iterator.remove(); - changed = true; - } - } - return changed; + return connectionManager.get(retainAllAsync(c)); + } + + @Override + public Future retainAllAsync(Collection c) { + return connectionManager.evalWriteAsync(getName(), new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + "local changed = false " + + "local s = redis.call('llen', KEYS[1]) " + + "local i = 0 " + + "while i < s do " + + "local element = redis.call('lindex', KEYS[1], i) " + + "local isInAgrs = false " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == element then " + + "isInAgrs = true " + + "break " + + "end " + + "end " + + "if isInAgrs == false then " + + "redis.call('LREM', KEYS[1], 0, element) " + + "i = i-1 " + + "s = s-1 " + + "changed = true " + + "end " + + "i = i + 1 " + + "end " + + "return changed ", + Collections.singletonList(getName()), c.toArray()); } + @Override public void clear() { delete(); @@ -274,12 +302,15 @@ public class RedissonList extends RedissonExpirable implements RList { return index >= 0 && index <= size; } - @Override public V set(int index, V element) { checkIndex(index); + return connectionManager.get(setAsync(index, element)); + } - return connectionManager.evalWrite(getName(), new RedisCommand("EVAL", 5), + @Override + public Future setAsync(int index, V element) { + return connectionManager.evalWriteAsync(getName(), new RedisCommand("EVAL", 5), "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + "return v", @@ -287,19 +318,19 @@ public class RedissonList extends RedissonExpirable implements RList { } @Override - public void add(int index, V element) { - addAll(index, Collections.singleton(element)); + public void fastSet(int index, V element) { + checkIndex(index); + connectionManager.get(fastSetAsync(index, element)); } - private int div(int p, int q) { - int div = p / q; - int rem = p - q * div; // equal to p % q - - if (rem == 0) { - return div; - } + @Override + public Future fastSetAsync(int index, V element) { + return connectionManager.writeAsync(getName(), RedisCommands.LSET, getName(), index, element); + } - return div + 1; + @Override + public void add(int index, V element) { + addAll(index, Collections.singleton(element)); } @Override @@ -321,29 +352,30 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public int indexOf(Object o) { - if (isEmpty()) { - return -1; - } + return connectionManager.get(indexOfAsync(o)); + } - Long index = connectionManager.evalRead(getName(), new RedisCommand("EVAL", 4), + @Override + public Future indexOfAsync(Object o) { + return connectionManager.evalReadAsync(getName(), new RedisCommand("EVAL", new IntegerReplayConvertor(), 4), "local s = redis.call('llen', KEYS[1]);" + "for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + "return -1", Collections.singletonList(getName()), o); - return index.intValue(); } @Override public int lastIndexOf(Object o) { - if (isEmpty()) { - return -1; - } + return connectionManager.get(lastIndexOfAsync(o)); + } - return ((Long)connectionManager.evalRead(getName(), new RedisCommand("EVAL", 4), + @Override + public Future lastIndexOfAsync(Object o) { + return connectionManager.evalReadAsync(getName(), new RedisCommand("EVAL", new IntegerReplayConvertor(), 4), "local s = redis.call('llen', KEYS[1]);" + "for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + "return -1", - Collections.singletonList(getName()), o)).intValue(); + Collections.singletonList(getName()), o); } @Override diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 980e2c720..0bd3cfd4a 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -37,6 +37,12 @@ public class RedisCommand { private Decoder replayDecoder; Convertor convertor = new EmptyConvertor(); + /** + * Copy command and change name + * + * @param command - source command + * @param name - new command name + */ public RedisCommand(RedisCommand command, String name) { this.outParamType = command.outParamType; this.inParamType = command.inParamType; diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index a19efccd8..bd81427e1 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -20,6 +20,7 @@ import java.util.Map; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; import org.redisson.client.protocol.decoder.ListScanResult; @@ -34,7 +35,6 @@ import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapReplayDecoder; import org.redisson.client.protocol.decoder.StringReplayDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; -import org.redisson.client.protocol.pubsub.PubSubStatusMessage; public interface RedisCommands { @@ -52,11 +52,13 @@ public interface RedisCommands { RedisCommand SISMEMBER = new RedisCommand("SISMEMBER", new BooleanReplayConvertor(), 2); RedisStrictCommand SCARD = new RedisStrictCommand("SCARD"); + RedisCommand LSET = new RedisCommand("LSET", new VoidReplayConvertor(), 3); RedisCommand LPOP = new RedisCommand("LPOP"); + RedisCommand LREM_SINGLE = new RedisCommand("LREM", new BooleanReplayConvertor(), 3); RedisCommand LREM = new RedisCommand("LREM", 3); RedisCommand LINDEX = new RedisCommand("LINDEX"); RedisCommand LINSERT = new RedisCommand("LINSERT", 3, ValueType.OBJECTS); - RedisStrictCommand LLEN = new RedisStrictCommand("LLEN"); + RedisStrictCommand LLEN = new RedisStrictCommand("LLEN", new IntegerReplayConvertor()); RedisStrictCommand LTRIM = new RedisStrictCommand("LTRIM", new BooleanReplayConvertor()); RedisStrictCommand EXPIRE = new RedisStrictCommand("EXPIRE", new BooleanReplayConvertor()); diff --git a/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java b/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java new file mode 100644 index 000000000..83293e8a8 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/convertor/IntegerReplayConvertor.java @@ -0,0 +1,10 @@ +package org.redisson.client.protocol.convertor; + +public class IntegerReplayConvertor extends SingleConvertor { + + @Override + public Integer convert(Object obj) { + return ((Long) obj).intValue(); + } + +} diff --git a/src/main/java/org/redisson/core/RBucketAsync.java b/src/main/java/org/redisson/core/RBucketAsync.java index 3ca4133f8..efb5c3ea9 100644 --- a/src/main/java/org/redisson/core/RBucketAsync.java +++ b/src/main/java/org/redisson/core/RBucketAsync.java @@ -20,7 +20,7 @@ import io.netty.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** - * Any object holder + * Async object functions * * @author Nikita Koksharov * diff --git a/src/main/java/org/redisson/core/RList.java b/src/main/java/org/redisson/core/RList.java index b82925f96..ac8b5f83b 100644 --- a/src/main/java/org/redisson/core/RList.java +++ b/src/main/java/org/redisson/core/RList.java @@ -15,9 +15,6 @@ */ package org.redisson.core; -import io.netty.util.concurrent.Future; - -import java.util.Collection; import java.util.List; /** @@ -27,12 +24,8 @@ import java.util.List; * * @param the type of elements held in this collection */ -public interface RList extends List, RExpirable { +public interface RList extends List, RListAsync { + + void fastSet(int index, V element); - Future getAsync(int index); - - Future addAsync(V e); - - Future addAllAsync(Collection c); - } diff --git a/src/main/java/org/redisson/core/RListAsync.java b/src/main/java/org/redisson/core/RListAsync.java new file mode 100644 index 000000000..571ad20ef --- /dev/null +++ b/src/main/java/org/redisson/core/RListAsync.java @@ -0,0 +1,58 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import io.netty.util.concurrent.Future; + +import java.util.Collection; +import java.util.List; + +/** + * Async list functions + * + * @author Nikita Koksharov + * + * @param the type of elements held in this collection + */ +public interface RListAsync extends RExpirableAsync { + + Future lastIndexOfAsync(Object o); + + Future indexOfAsync(Object o); + + Future fastSetAsync(int index, V element); + + Future setAsync(int index, V element); + + Future retainAllAsync(Collection c); + + Future removeAllAsync(Collection c); + + Future containsAllAsync(Collection c); + + Future removeAsync(Object o); + + Future> readAllAsync(); + + Future sizeAsync(); + + Future getAsync(int index); + + Future addAsync(V e); + + Future addAllAsync(Collection c); + +} diff --git a/src/test/java/org/redisson/RedissonListTest.java b/src/test/java/org/redisson/RedissonListTest.java index 2b04261c7..ce161811a 100644 --- a/src/test/java/org/redisson/RedissonListTest.java +++ b/src/test/java/org/redisson/RedissonListTest.java @@ -448,6 +448,19 @@ public class RedissonListTest extends BaseTest { } + @Test + public void testRemoveAllEmpty() { + List list = redisson.getList("list"); + list.add(1); + list.add(2); + list.add(3); + list.add(4); + list.add(5); + + Assert.assertFalse(list.removeAll(Collections.emptyList())); + Assert.assertFalse(Arrays.asList(1).removeAll(Collections.emptyList())); + } + @Test public void testRemoveAll() { List list = redisson.getList("list"); @@ -485,6 +498,16 @@ public class RedissonListTest extends BaseTest { Assert.assertEquals(2, list.size()); } + @Test + public void testFastSet() { + RList list = redisson.getList("list"); + list.add(1); + list.add(2); + + list.fastSet(0, 3); + Assert.assertEquals(3, (int)list.get(0)); + } + @Test public void testRetainAllEmpty() { List list = redisson.getList("list"); @@ -592,6 +615,18 @@ public class RedissonListTest extends BaseTest { Assert.assertTrue(list.containsAll(Arrays.asList(30, 11))); Assert.assertFalse(list.containsAll(Arrays.asList(30, 711, 11))); + Assert.assertTrue(list.containsAll(Arrays.asList(30))); + } + + @Test + public void testContainsAllEmpty() { + List list = redisson.getList("list"); + for (int i = 0; i < 200; i++) { + list.add(i); + } + + Assert.assertTrue(list.containsAll(Collections.emptyList())); + Assert.assertTrue(Arrays.asList(1).containsAll(Collections.emptyList())); } @Test