RedissonListReactive refactoring

pull/411/head
Nikita 9 years ago
parent 0bbc332e37
commit 5cca26e526

@ -56,11 +56,11 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public static final RedisCommand<Boolean> EVAL_BOOLEAN_ARGS2 = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5, ValueType.OBJECTS);
protected RedissonList(CommandAsyncExecutor commandExecutor, String name) {
public RedissonList(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
protected RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonList(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}

@ -35,9 +35,9 @@ public interface RListReactive<V> extends RCollectionReactive<V> {
Publisher<V> iterator(int startIndex);
Publisher<Integer> lastIndexOf(Object o);
Publisher<Long> lastIndexOf(Object o);
Publisher<Integer> indexOf(Object o);
Publisher<Long> indexOf(Object o);
Publisher<Long> add(long index, V element);
@ -49,6 +49,6 @@ public interface RListReactive<V> extends RCollectionReactive<V> {
Publisher<V> get(long index);
Publisher<V> remove(int index);
Publisher<V> remove(long index);
}

@ -30,14 +30,13 @@ import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonList;
import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
@ -56,12 +55,16 @@ import reactor.rx.subscription.ReactiveSubscription;
*/
public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> {
private final RedissonList<V> instance;
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonList<V>(commandExecutor, name);
}
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonList<V>(codec, commandExecutor, name);
}
@Override
@ -151,7 +154,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> remove(Object o) {
return remove(o, 1);
return reactive(instance.removeAsync(o));
}
protected Publisher<Boolean> remove(Object o, int count) {
@ -160,17 +163,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> containsAll(Collection<?> c) {
return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1, #items do " +
"for j = 0, table.getn(ARGV), 1 do " +
"if items[i] == ARGV[j] then " +
"table.remove(ARGV, j) " +
"end " +
"end " +
"end " +
"return table.getn(ARGV) == 0 and 1 or 0",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.containsAllAsync(c));
}
@Override
@ -232,40 +225,12 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local v = 0 " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('lrem', KEYS[1], 0, ARGV[i]) == 1 "
+ "then v = 1 end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.removeAllAsync(c));
}
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES,
"local changed = 0 " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
+ "local i = 1 "
+ "local s = table.getn(items) "
+ "while i <= s do "
+ "local element = items[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('LREM', KEYS[1], 0, element) "
+ "changed = 1 "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
return reactive(instance.retainAllAsync(c));
}
@Override
@ -293,7 +258,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Publisher<V> remove(int index) {
public Publisher<V> remove(long index) {
if (index == 0) {
return commandExecutor.writeReactive(getName(), codec, LPOP, getName());
}
@ -309,7 +274,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public Publisher<Boolean> contains(Object o) {
return indexOf(o, new BooleanNumberReplayConvertor(-1L));
return reactive(instance.containsAsync(o));
}
private <R> Publisher<R> indexOf(Object o, Convertor<R> convertor) {
@ -327,13 +292,13 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Publisher<Integer> indexOf(Object o) {
return indexOf(o, new IntegerReplayConvertor());
public Publisher<Long> indexOf(Object o) {
return indexOf(o, new LongReplayConvertor());
}
@Override
public Publisher<Integer> lastIndexOf(Object o) {
return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
public Publisher<Long> lastIndexOf(Object o) {
return commandExecutor.evalReadReactive(getName(), codec, new RedisCommand<Integer>("EVAL", 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
"local items = redis.call('lrange', key, 0, -1) " +

@ -229,7 +229,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
int index = sync(list.lastIndexOf(3));
long index = sync(list.lastIndexOf(3));
Assert.assertEquals(2, index);
}
@ -247,7 +247,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
int index = sync(list.lastIndexOf(3));
long index = sync(list.lastIndexOf(3));
Assert.assertEquals(5, index);
}
@ -265,7 +265,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(3));
sync(list.add(10));
int index = sync(list.lastIndexOf(3));
long index = sync(list.lastIndexOf(3));
Assert.assertEquals(8, index);
}

Loading…
Cancel
Save