RedissonSortedSet.iterator optimization. zscan support. #2

pull/38/head
Nikita 11 years ago
parent 3c917d81ec
commit 3c22b60640

@ -1024,6 +1024,11 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
return dispatch(SSCAN, new ScanOutput<K, V>(codec), args);
}
public Future<ScanResult<V>> zscan(K key, long startValue) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec).addKey(key).add(startValue);
return dispatch(ZSCAN, new ScanOutput<K, V>(codec), args);
}
/**
* Wait until commands are complete or the connection timeout is reached.
*

@ -817,6 +817,10 @@ public class RedisConnection<K, V> {
public ScanResult<V> sscan(K key, long startValue) {
return await(c.sscan(key, startValue));
}
public ScanResult<V> zscan(K key, long startValue) {
return await(c.zscan(key, startValue));
}
public RedisAsyncConnection<K, V> getAsync() {
return c;

@ -75,7 +75,7 @@ public enum CommandType {
SENTINEL,
SSCAN;
SSCAN, ZSCAN;
public byte[] bytes;

@ -243,16 +243,45 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
public Iterator<V> iterator(final double startScore, final double endScore) {
return new Iterator<V>() {
private double currentScore = startScore;
private boolean readNext = true;
private Double currentScore;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (!readNext) {
return true;
}
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
try {
Long remains = connection.zcount(getName(), currentScore, endScore);
return remains > 0;
Double score = startScore;
if (currentScore != null) {
score = currentScore;
}
// get next two values
List<ScoredValue<V>> values = connection.zrangebyscoreWithScores(getName(), score, endScore, 0, 2);
if (values.isEmpty()) {
return false;
}
int index = 0;
if (currentScore != null) {
ScoredValue<V> val = values.get(0);
if (currentScore.equals(val.score)) {
if (values.size() > 1) {
index = 1;
} else {
return false;
}
}
}
ScoredValue<V> val = values.get(index);
value = val.value;
currentScore = val.score;
readNext = false;
return true;
} finally {
connectionManager.releaseRead(connection);
}
@ -260,42 +289,14 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index " + currentScore);
}
removeExecuted = false;
RedisConnection<Object, V> connection = connectionManager.connectionReadOp();
try {
double lastScore = getScoreAtIndex(-1, connection);
double scoreDiff = lastScore - currentScore;
Long remains = connection.zcount(getName(), currentScore, lastScore);
if (remains == 0) {
throw new NoSuchElementException("No more elements!");
if (value == null) {
if (!hasNext()) {
throw new NoSuchElementException("Exhausted iterator");
}
// TODO don't load whole set in memory
// if (remains < 50) {
List<ScoredValue<V>> values = connection.zrangebyscoreWithScores(getName(), currentScore, Double.MAX_VALUE);
if (values.isEmpty()) {
throw new NoSuchElementException("No more elements!");
}
ScoredValue<V> val = values.get(0);
value = val.value;
// System.out.println("value: " + value + " currentScore: " + currentScore);
if (values.size() > 1) {
ScoredValue<V> nextVal = values.get(1);
currentScore = nextVal.score;
} else {
currentScore = endScore;
}
return value;
// }
} finally {
connectionManager.releaseRead(connection);
}
removeExecuted = false;
readNext = true;
return value;
}
@Override

@ -2,6 +2,8 @@ package org.redisson;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@ -14,6 +16,63 @@ import org.redisson.core.RSortedSet;
public class RedissonSortedSetTest extends BaseTest {
@Test
public void testIteratorRemove() {
RSortedSet<String> list = redisson.getSortedSet("list");
list.add("1");
list.add("4");
list.add("2");
list.add("5");
list.add("3");
for (Iterator<String> iterator = list.iterator(); iterator.hasNext();) {
String value = iterator.next();
if (value.equals("2")) {
iterator.remove();
}
}
Assert.assertThat(list, Matchers.containsInAnyOrder("1", "4", "5", "3"));
int iteration = 0;
for (Iterator<String> iterator = list.iterator(); iterator.hasNext();) {
iterator.next();
iterator.remove();
iteration++;
}
Assert.assertEquals(4, iteration);
Assert.assertEquals(0, list.size());
Assert.assertTrue(list.isEmpty());
}
@Test
public void testIteratorSequence() {
Set<Integer> set = redisson.getSortedSet("set");
for (int i = 0; i < 1000; i++) {
set.add(Integer.valueOf(i));
}
Set<Integer> setCopy = new HashSet<Integer>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Integer.valueOf(i));
}
checkIterator(set, setCopy);
}
private void checkIterator(Set<Integer> set, Set<Integer> setCopy) {
for (Iterator<Integer> iterator = set.iterator(); iterator.hasNext();) {
Integer value = iterator.next();
if (!setCopy.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testTrySetComparator() {
RSortedSet<Integer> set = redisson.getSortedSet("set");
@ -38,8 +97,6 @@ public class RedissonSortedSetTest extends BaseTest {
set.add(1);
set.add(2);
MatcherAssert.assertThat(set, Matchers.contains(1, 2, 3));
}

Loading…
Cancel
Save