RedissonSet.iterator implemented with sscan. #2

pull/38/head
Nikita 11 years ago
parent 32c6d263f3
commit db82693980

@ -63,6 +63,8 @@ import com.lambdaworks.redis.output.MapValueListOutput;
import com.lambdaworks.redis.output.MapValueOutput;
import com.lambdaworks.redis.output.MultiOutput;
import com.lambdaworks.redis.output.NestedMultiOutput;
import com.lambdaworks.redis.output.ScanOutput;
import com.lambdaworks.redis.output.ScanResult;
import com.lambdaworks.redis.output.ScoredValueListOutput;
import com.lambdaworks.redis.output.StatusOutput;
import com.lambdaworks.redis.output.StringListOutput;
@ -1016,6 +1018,11 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).add("slaves").addKey(key);
return dispatch(SENTINEL, new ListMapOutput<K, V>(codec), args);
}
public Future<ScanResult<V>> sscan(K key, long startValue) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(startValue);
return dispatch(SSCAN, new ScanOutput<K, V>(codec), args);
}
/**
* Wait until commands are complete or the connection timeout is reached.

@ -2,17 +2,18 @@
package com.lambdaworks.redis;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import static java.lang.Math.max;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.util.concurrent.Future;
import java.util.*;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.lambdaworks.redis.protocol.CommandType.MULTI;
import static java.lang.Math.max;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.lambdaworks.redis.output.ScanResult;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
/**
* A synchronous thread-safe connection to a redis server. Multiple threads may
@ -812,6 +813,10 @@ public class RedisConnection<K, V> {
public Long zunionstore(K destination, ZStoreArgs storeArgs, K... keys) {
return await(c.zunionstore(destination, storeArgs, keys));
}
public ScanResult<V> sscan(K key, long startValue) {
return await(c.sscan(key, startValue));
}
public RedisAsyncConnection<K, V> getAsync() {
return c;

@ -11,14 +11,6 @@ import java.util.Map;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
/**
* {@link Map} of keys and values output.
*
* @param <K> Key type.
* @param <V> Value type.
*
* @author Will Glozer
*/
public class ListMapOutput<K, V> extends CommandOutput<K, V, List<Map<K, V>>> {
private K key;
private int index = 0;

@ -0,0 +1,23 @@
package com.lambdaworks.redis.output;
import java.nio.ByteBuffer;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.CommandOutput;
public class ScanOutput<K, V> extends CommandOutput<K, V, ScanResult<V>> {
public ScanOutput(RedisCodec<K, V> codec) {
super(codec, new ScanResult<V>());
}
@Override
public void set(ByteBuffer bytes) {
if (output.getPos() == null) {
output.setPos(((Number) codec.decodeValue(bytes)).longValue());
} else {
output.addValue(codec.decodeValue(bytes));
}
}
}

@ -0,0 +1,25 @@
package com.lambdaworks.redis.output;
import java.util.ArrayList;
import java.util.List;
public class ScanResult<V> {
private Long pos;
private List<V> values = new ArrayList<V>();
public void setPos(Long pos) {
this.pos = pos;
}
public Long getPos() {
return pos;
}
public void addValue(V value) {
values.add(value);
}
public List<V> getValues() {
return values;
}
}

@ -73,7 +73,9 @@ public enum CommandType {
// HyperLogLog
PFADD, PFCOUNT, PFMERGE,
SENTINEL;
SENTINEL,
SSCAN;
public byte[] bytes;

@ -23,6 +23,7 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.core.RSet;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.output.ScanResult;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
@ -62,49 +63,64 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
}
}
private ScanResult<V> scanIterator(long startPos) {
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
try {
return connection.sscan(getName(), startPos);
} finally {
connectionManager.releaseRead(connection);
}
}
@Override
public Iterator<V> iterator() {
RedisConnection<Object, Object> connection = connectionManager.connectionReadOp();
try {
// TODO use SSCAN in case of usage Redis 2.8
final Iterator<V> iter = (Iterator<V>) connection.smembers(getName()).iterator();
return new Iterator<V>() {
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
return iter.hasNext();
return new Iterator<V>() {
private Iterator<V> iter;
private Long iterPos;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null) {
ScanResult<V> res = scanIterator(0);
iter = res.getValues().iterator();
iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) {
ScanResult<V> res = scanIterator(iterPos);
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
removeExecuted = false;
return value;
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
value = iter.next();
removeExecuted = false;
return value;
}
iter.remove();
RedissonSet.this.remove(value);
removeExecuted = true;
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
};
} finally {
connectionManager.releaseRead(connection);
}
// lazy init iterator
hasNext();
iter.remove();
RedissonSet.this.remove(value);
removeExecuted = true;
}
};
}
@Override

@ -1,6 +1,11 @@
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import org.hamcrest.MatcherAssert;
@ -10,20 +15,74 @@ import org.junit.Test;
public class RedissonSetTest extends BaseTest {
@Test
public void testIteratorRemove() {
Set<String> list = redisson.getSet("list");
list.add("1");
list.add("4");
list.add("2");
list.add("5");
list.add("3");
for (Iterator<String> iterator = list.iterator(); iterator.hasNext();) {
String value = iterator.next();
if (value.equals("2")) {
iterator.remove();
}
}
Assert.assertThat(list, Matchers.containsInAnyOrder("1", "4", "5", "3"));
int iteration = 0;
for (Iterator<String> iterator = list.iterator(); iterator.hasNext();) {
iterator.next();
iterator.remove();
iteration++;
}
Assert.assertEquals(4, iteration);
Assert.assertEquals(0, list.size());
Assert.assertTrue(list.isEmpty());
}
@Test
public void testIteratorSequence() {
Set<Long> set = redisson.getSet("set");
for (int i = 0; i < 1000; i++) {
set.add(Long.valueOf(i));
}
Set<Long> setCopy = new HashSet<Long>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Long.valueOf(i));
}
checkIterator(set, setCopy);
}
private void checkIterator(Set<Long> set, Set<Long> setCopy) {
for (Iterator<Long> iterator = set.iterator(); iterator.hasNext();) {
Long value = iterator.next();
if (!setCopy.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testLong() {
Redisson redisson = Redisson.create();
Set<Long> set = redisson.getSet("set");
set.add(1L);
set.add(2L);
Assert.assertThat(set, Matchers.containsInAnyOrder(1L, 2L));
clear(set, redisson);
}
@Test
public void testRetainAll() {
Redisson redisson = Redisson.create();
Set<Integer> set = redisson.getSet("set");
for (int i = 0; i < 200; i++) {
set.add(i);
@ -31,13 +90,10 @@ public class RedissonSetTest extends BaseTest {
Assert.assertTrue(set.retainAll(Arrays.asList(1, 2)));
Assert.assertEquals(2, set.size());
clear(set, redisson);
}
@Test
public void testContainsAll() {
Redisson redisson = Redisson.create();
Set<Integer> set = redisson.getSet("set");
for (int i = 0; i < 200; i++) {
set.add(i);
@ -45,13 +101,10 @@ public class RedissonSetTest extends BaseTest {
Assert.assertTrue(set.containsAll(Arrays.asList(30, 11)));
Assert.assertFalse(set.containsAll(Arrays.asList(30, 711, 11)));
clear(set, redisson);
}
@Test
public void testToArray() {
Redisson redisson = Redisson.create();
Set<String> set = redisson.getSet("set");
set.add("1");
set.add("4");
@ -63,13 +116,10 @@ public class RedissonSetTest extends BaseTest {
String[] strs = set.toArray(new String[0]);
MatcherAssert.assertThat(Arrays.asList(strs), Matchers.containsInAnyOrder("1", "4", "2", "5", "3"));
clear(set, redisson);
}
@Test
public void testContains() {
Redisson redisson = Redisson.create();
Set<TestObject> set = redisson.getSet("set");
set.add(new TestObject("1", "2"));
@ -81,13 +131,10 @@ public class RedissonSetTest extends BaseTest {
Assert.assertTrue(set.contains(new TestObject("2", "3")));
Assert.assertTrue(set.contains(new TestObject("1", "2")));
Assert.assertFalse(set.contains(new TestObject("1", "9")));
clear(set, redisson);
}
@Test
public void testDuplicates() {
Redisson redisson = Redisson.create();
Set<TestObject> set = redisson.getSet("set");
set.add(new TestObject("1", "2"));
@ -97,13 +144,10 @@ public class RedissonSetTest extends BaseTest {
set.add(new TestObject("5", "6"));
Assert.assertEquals(4, set.size());
clear(set, redisson);
}
@Test
public void testSize() {
Redisson redisson = Redisson.create();
Set<Integer> set = redisson.getSet("set");
set.add(1);
set.add(2);
@ -114,8 +158,6 @@ public class RedissonSetTest extends BaseTest {
set.add(5);
Assert.assertEquals(5, set.size());
clear(set, redisson);
}
}

Loading…
Cancel
Save