RListAsync interface added. fastSet/fastSetAsync methods added. #186

pull/243/head
Nikita 10 years ago
parent ed33902085
commit 0e0e1c1b3f

@ -24,7 +24,9 @@ import java.util.ListIterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import static org.redisson.client.protocol.RedisCommands.*; import static org.redisson.client.protocol.RedisCommands.*;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -43,16 +45,17 @@ import io.netty.util.concurrent.Promise;
*/ */
public class RedissonList<V> extends RedissonExpirable implements RList<V> { public class RedissonList<V> extends RedissonExpirable implements RList<V> {
private int batchSize = 50;
protected RedissonList(ConnectionManager connectionManager, String name) { protected RedissonList(ConnectionManager connectionManager, String name) {
super(connectionManager, name); super(connectionManager, name);
} }
@Override @Override
public int size() { public int size() {
Long size = connectionManager.read(getName(), LLEN, getName()); return connectionManager.get(sizeAsync());
return size.intValue(); }
public Future<Integer> sizeAsync() {
return connectionManager.readAsync(getName(), LLEN, getName());
} }
@Override @Override
@ -72,17 +75,22 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public Object[] toArray() { public Object[] toArray() {
List<V> list = readAllList(); List<V> list = readAll();
return list.toArray(); return list.toArray();
} }
protected List<V> readAllList() { private List<V> readAll() {
return connectionManager.read(getName(), LRANGE, getName(), 0, -1); return connectionManager.get(readAllAsync());
}
@Override
public Future<List<V>> readAllAsync() {
return connectionManager.readAsync(getName(), LRANGE, getName(), 0, -1);
} }
@Override @Override
public <T> T[] toArray(T[] a) { public <T> T[] toArray(T[] a) {
List<V> list = readAllList(); List<V> list = readAll();
return list.toArray(a); return list.toArray(a);
} }
@ -101,31 +109,36 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return remove(o, 1); return remove(o, 1);
} }
@Override
public Future<Boolean> removeAsync(Object o) {
return removeAsync(o, 1);
}
protected Future<Boolean> removeAsync(Object o, int count) {
return connectionManager.writeAsync(getName(), LREM_SINGLE, getName(), count, o);
}
protected boolean remove(Object o, int count) { protected boolean remove(Object o, int count) {
return (Long)connectionManager.write(getName(), LREM, getName(), count, o) > 0; return connectionManager.get(removeAsync(o, count));
} }
@Override @Override
public boolean containsAll(Collection<?> c) { public Future<Boolean> containsAllAsync(Collection<?> c) {
if (isEmpty() || c.isEmpty()) { return connectionManager.evalReadAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
return false; "local s = redis.call('llen', KEYS[1]);" +
} "for i = 0, s, 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
Collection<Object> copy = new ArrayList<Object>(c); + "if ARGV[j] == redis.call('lindex', KEYS[1], i) "
int to = div(size(), batchSize); + "then table.remove(ARGV, j) end "
for (int i = 0; i < to; i++) { + "end; "
final int j = i; +"end;"
List<V> range = connectionManager.read(getName(), LRANGE, getName(), j*batchSize, j*batchSize + batchSize - 1); + "return table.getn(ARGV) == 0; ",
for (Iterator<Object> iterator = copy.iterator(); iterator.hasNext();) { Collections.<Object>singletonList(getName()), c.toArray());
Object obj = iterator.next(); }
int index = range.indexOf(obj);
if (index != -1) {
iterator.remove();
}
}
}
return copy.isEmpty();
@Override
public boolean containsAll(Collection<?> c) {
return connectionManager.get(containsAllAsync(c));
} }
@Override @Override
@ -189,16 +202,6 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return true", "return true",
Collections.<Object>singletonList(getName()), args.toArray()); Collections.<Object>singletonList(getName()), args.toArray());
// return "OK".equals(new RedissonScript(connectionManager).evalR(
// "local ind = table.remove(ARGV); " + // index is last parameter
// "local tail = redis.call('lrange', KEYS[1], ind, -1); " +
// "redis.call('ltrim', KEYS[1], 0, ind - 1); " +
// "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
// "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
// "return 'OK'",
// RScript.ReturnType.STATUS,
// Collections.<Object>singletonList(getName()), new ArrayList<Object>(coll), Collections.singletonList(index)));
} else { } else {
// append to list // append to list
return addAll(coll); return addAll(coll);
@ -206,34 +209,59 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
} }
@Override @Override
public boolean removeAll(Collection<?> c) { public Future<Boolean> removeAllAsync(Collection<?> c) {
if (c.isEmpty()) { if (c.isEmpty()) {
return false; return connectionManager.getGroup().next().newSucceededFuture(false);
} }
boolean result = false; return connectionManager.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
for (Object object : c) { "local v = true " +
boolean res = (Long)connectionManager.write(getName(), LREM, getName(), 0, object) > 0; "for i = 0, table.getn(ARGV), 1 do "
if (!result) { + "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 0 "
result = res; + "then v = false end "
} +"end "
} + "return v ",
return result; Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return connectionManager.get(removeAllAsync(c));
} }
@Override @Override
public boolean retainAll(Collection<?> c) { public boolean retainAll(Collection<?> c) {
boolean changed = false; return connectionManager.get(retainAllAsync(c));
for (Iterator<V> iterator = iterator(); iterator.hasNext();) { }
V object = iterator.next();
if (!c.contains(object)) { @Override
iterator.remove(); public Future<Boolean> retainAllAsync(Collection<?> c) {
changed = true; return connectionManager.evalWriteAsync(getName(), new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
} "local changed = false " +
} "local s = redis.call('llen', KEYS[1]) "
return changed; + "local i = 0 "
+ "while i < s do "
+ "local element = redis.call('lindex', KEYS[1], i) "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('LREM', KEYS[1], 0, element) "
+ "i = i-1 "
+ "s = s-1 "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
} }
@Override @Override
public void clear() { public void clear() {
delete(); delete();
@ -274,12 +302,15 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return index >= 0 && index <= size; return index >= 0 && index <= size;
} }
@Override @Override
public V set(int index, V element) { public V set(int index, V element) {
checkIndex(index); checkIndex(index);
return connectionManager.get(setAsync(index, element));
}
return connectionManager.evalWrite(getName(), new RedisCommand<Object>("EVAL", 5), @Override
public Future<V> setAsync(int index, V element) {
return connectionManager.evalWriteAsync(getName(), new RedisCommand<Object>("EVAL", 5),
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
"return v", "return v",
@ -287,19 +318,19 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
} }
@Override @Override
public void add(int index, V element) { public void fastSet(int index, V element) {
addAll(index, Collections.singleton(element)); checkIndex(index);
connectionManager.get(fastSetAsync(index, element));
} }
private int div(int p, int q) { @Override
int div = p / q; public Future<Void> fastSetAsync(int index, V element) {
int rem = p - q * div; // equal to p % q return connectionManager.writeAsync(getName(), RedisCommands.LSET, getName(), index, element);
}
if (rem == 0) {
return div;
}
return div + 1; @Override
public void add(int index, V element) {
addAll(index, Collections.singleton(element));
} }
@Override @Override
@ -321,29 +352,30 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override @Override
public int indexOf(Object o) { public int indexOf(Object o) {
if (isEmpty()) { return connectionManager.get(indexOfAsync(o));
return -1; }
}
Long index = connectionManager.evalRead(getName(), new RedisCommand<Long>("EVAL", 4), @Override
public Future<Integer> indexOfAsync(Object o) {
return connectionManager.evalReadAsync(getName(), new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
"local s = redis.call('llen', KEYS[1]);" + "local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + "for i = 0, s, 1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" +
"return -1", "return -1",
Collections.<Object>singletonList(getName()), o); Collections.<Object>singletonList(getName()), o);
return index.intValue();
} }
@Override @Override
public int lastIndexOf(Object o) { public int lastIndexOf(Object o) {
if (isEmpty()) { return connectionManager.get(lastIndexOfAsync(o));
return -1; }
}
return ((Long)connectionManager.evalRead(getName(), new RedisCommand<Long>("EVAL", 4), @Override
public Future<Integer> lastIndexOfAsync(Object o) {
return connectionManager.evalReadAsync(getName(), new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
"local s = redis.call('llen', KEYS[1]);" + "local s = redis.call('llen', KEYS[1]);" +
"for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" + "for i = s, 0, -1 do if ARGV[1] == redis.call('lindex', KEYS[1], i) then return i end end;" +
"return -1", "return -1",
Collections.<Object>singletonList(getName()), o)).intValue(); Collections.<Object>singletonList(getName()), o);
} }
@Override @Override

@ -37,6 +37,12 @@ public class RedisCommand<R> {
private Decoder<R> replayDecoder; private Decoder<R> replayDecoder;
Convertor<R> convertor = new EmptyConvertor<R>(); Convertor<R> convertor = new EmptyConvertor<R>();
/**
* Copy command and change name
*
* @param command - source command
* @param name - new command name
*/
public RedisCommand(RedisCommand<R> command, String name) { public RedisCommand(RedisCommand<R> command, String name) {
this.outParamType = command.outParamType; this.outParamType = command.outParamType;
this.inParamType = command.inParamType; this.inParamType = command.inParamType;

@ -20,6 +20,7 @@ import java.util.Map;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
@ -34,7 +35,6 @@ import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapReplayDecoder; import org.redisson.client.protocol.decoder.StringMapReplayDecoder;
import org.redisson.client.protocol.decoder.StringReplayDecoder; import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
public interface RedisCommands { public interface RedisCommands {
@ -52,11 +52,13 @@ public interface RedisCommands {
RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor(), 2); RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD"); RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD");
RedisCommand<Void> LSET = new RedisCommand<Void>("LSET", new VoidReplayConvertor(), 3);
RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP"); RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP");
RedisCommand<Boolean> LREM_SINGLE = new RedisCommand<Boolean>("LREM", new BooleanReplayConvertor(), 3);
RedisCommand<Long> LREM = new RedisCommand<Long>("LREM", 3); RedisCommand<Long> LREM = new RedisCommand<Long>("LREM", 3);
RedisCommand<Object> LINDEX = new RedisCommand<Object>("LINDEX"); RedisCommand<Object> LINDEX = new RedisCommand<Object>("LINDEX");
RedisCommand<Object> LINSERT = new RedisCommand<Object>("LINSERT", 3, ValueType.OBJECTS); RedisCommand<Object> LINSERT = new RedisCommand<Object>("LINSERT", 3, ValueType.OBJECTS);
RedisStrictCommand<Long> LLEN = new RedisStrictCommand<Long>("LLEN"); RedisStrictCommand<Integer> LLEN = new RedisStrictCommand<Integer>("LLEN", new IntegerReplayConvertor());
RedisStrictCommand<Boolean> LTRIM = new RedisStrictCommand<Boolean>("LTRIM", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> LTRIM = new RedisStrictCommand<Boolean>("LTRIM", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EXPIRE = new RedisStrictCommand<Boolean>("EXPIRE", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> EXPIRE = new RedisStrictCommand<Boolean>("EXPIRE", new BooleanReplayConvertor());

@ -0,0 +1,10 @@
package org.redisson.client.protocol.convertor;
public class IntegerReplayConvertor extends SingleConvertor<Integer> {
@Override
public Integer convert(Object obj) {
return ((Long) obj).intValue();
}
}

@ -20,7 +20,7 @@ import io.netty.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Any object holder * Async object functions
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *

@ -15,9 +15,6 @@
*/ */
package org.redisson.core; package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
@ -27,12 +24,8 @@ import java.util.List;
* *
* @param <V> the type of elements held in this collection * @param <V> the type of elements held in this collection
*/ */
public interface RList<V> extends List<V>, RExpirable { public interface RList<V> extends List<V>, RListAsync<V> {
void fastSet(int index, V element);
Future<V> getAsync(int index);
Future<Boolean> addAsync(V e);
Future<Boolean> addAllAsync(Collection<? extends V> c);
} }

@ -0,0 +1,58 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import io.netty.util.concurrent.Future;
import java.util.Collection;
import java.util.List;
/**
* Async list functions
*
* @author Nikita Koksharov
*
* @param <V> the type of elements held in this collection
*/
public interface RListAsync<V> extends RExpirableAsync {
Future<Integer> lastIndexOfAsync(Object o);
Future<Integer> indexOfAsync(Object o);
Future<Void> fastSetAsync(int index, V element);
Future<V> setAsync(int index, V element);
Future<Boolean> retainAllAsync(Collection<?> c);
Future<Boolean> removeAllAsync(Collection<?> c);
Future<Boolean> containsAllAsync(Collection<?> c);
Future<Boolean> removeAsync(Object o);
Future<List<V>> readAllAsync();
Future<Integer> sizeAsync();
Future<V> getAsync(int index);
Future<Boolean> addAsync(V e);
Future<Boolean> addAllAsync(Collection<? extends V> c);
}

@ -448,6 +448,19 @@ public class RedissonListTest extends BaseTest {
} }
@Test
public void testRemoveAllEmpty() {
List<Integer> list = redisson.getList("list");
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
Assert.assertFalse(list.removeAll(Collections.emptyList()));
Assert.assertFalse(Arrays.asList(1).removeAll(Collections.emptyList()));
}
@Test @Test
public void testRemoveAll() { public void testRemoveAll() {
List<Integer> list = redisson.getList("list"); List<Integer> list = redisson.getList("list");
@ -485,6 +498,16 @@ public class RedissonListTest extends BaseTest {
Assert.assertEquals(2, list.size()); Assert.assertEquals(2, list.size());
} }
@Test
public void testFastSet() {
RList<Integer> list = redisson.getList("list");
list.add(1);
list.add(2);
list.fastSet(0, 3);
Assert.assertEquals(3, (int)list.get(0));
}
@Test @Test
public void testRetainAllEmpty() { public void testRetainAllEmpty() {
List<Integer> list = redisson.getList("list"); List<Integer> list = redisson.getList("list");
@ -592,6 +615,18 @@ public class RedissonListTest extends BaseTest {
Assert.assertTrue(list.containsAll(Arrays.asList(30, 11))); Assert.assertTrue(list.containsAll(Arrays.asList(30, 11)));
Assert.assertFalse(list.containsAll(Arrays.asList(30, 711, 11))); Assert.assertFalse(list.containsAll(Arrays.asList(30, 711, 11)));
Assert.assertTrue(list.containsAll(Arrays.asList(30)));
}
@Test
public void testContainsAllEmpty() {
List<Integer> list = redisson.getList("list");
for (int i = 0; i < 200; i++) {
list.add(i);
}
Assert.assertTrue(list.containsAll(Collections.emptyList()));
Assert.assertTrue(Arrays.asList(1).containsAll(Collections.emptyList()));
} }
@Test @Test

Loading…
Cancel
Save