SCAN commands result handling fixed. #463

pull/469/head
Nikita 9 years ago
parent 22a3dd2a09
commit 7c4deafc74

@ -0,0 +1,96 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private long startPos = -1;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos;
do {
prevIterPos = nextIterPos;
ListScanResult<V> res = iterator(client, nextIterPos);
client = res.getRedisClient();
if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues) && res.getPos() == startPos) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
} while (!iter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
abstract ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos);
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element");
}
value = iter.next();
currentElementRemoved = false;
return value;
}
@Override
public void remove() {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
remove(value);
currentElementRemoved = true;
removeExecuted = true;
}
abstract void remove(V value);
}

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -32,10 +33,12 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues; private Map<ByteBuf, ByteBuf> firstValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter; private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter;
protected long iterPos = 0; protected long nextIterPos;
protected long startPos = -1;
protected InetSocketAddress client; protected InetSocketAddress client;
private boolean finished; private boolean finished;
private boolean currentElementRemoved;
private boolean removeExecuted; private boolean removeExecuted;
protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry; protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
@ -44,10 +47,20 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
if (finished) { if (finished) {
return false; return false;
} }
if (iter == null || !iter.hasNext()) { if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos;
do {
prevIterPos = nextIterPos;
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator(); MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
client = res.getRedisClient(); client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) { if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap()); firstValues = convert(res.getMap());
} else { } else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap()); Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
@ -61,9 +74,14 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
free(newValues); free(newValues);
} }
iter = res.getMap().entrySet().iterator(); iter = res.getMap().entrySet().iterator();
iterPos = res.getPos(); nextIterPos = res.getPos();
} while (!iter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
} }
return iter.hasNext(); return iter.hasNext();
} }
protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator(); protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator();
@ -90,7 +108,7 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
} }
entry = iter.next(); entry = iter.next();
removeExecuted = false; currentElementRemoved = false;
return getValue(entry); return getValue(entry);
} }
@ -108,14 +126,16 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
@Override @Override
public void remove() { public void remove() {
if (removeExecuted) { if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted"); throw new IllegalStateException("Element been already deleted");
} }
if (iter == null) {
throw new IllegalStateException();
}
// lazy init iterator
hasNext();
iter.remove(); iter.remove();
removeKey(); removeKey();
currentElementRemoved = true;
removeExecuted = true; removeExecuted = true;
} }

@ -15,6 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -23,7 +24,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -90,53 +90,16 @@ public class RedissonKeys implements RKeys {
} }
private Iterator<String> createKeysIterator(final int slot, final String pattern) { private Iterator<String> createKeysIterator(final int slot, final String pattern) {
return new Iterator<String>() { return new RedissonBaseIterator<String>() {
private List<String> firstValues;
private Iterator<String> iter;
private long iterPos;
private boolean removeExecuted;
private String value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
ListScanResult<String> res = scanIterator(slot, iterPos, pattern);
if (iterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
@Override @Override
public String next() { ListScanResult<String> iterator(InetSocketAddress client, long nextIterPos) {
if (!hasNext()) { return RedissonKeys.this.scanIterator(slot, nextIterPos, pattern);
throw new NoSuchElementException("No such element");
}
value = iter.next();
removeExecuted = false;
return value;
} }
@Override @Override
public void remove() { void remove(String value) {
if (removeExecuted) { RedissonKeys.this.delete(value);
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
delete(value);
removeExecuted = true;
} }
}; };

@ -29,7 +29,7 @@ public class RedissonMapIterator<K, V, M> extends RedissonBaseMapIterator<K, V,
} }
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() { protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, iterPos); return map.scanIterator(client, nextIterPos);
} }
protected void removeKey() { protected void removeKey() {

@ -29,7 +29,7 @@ public class RedissonMultiMapKeysIterator<K, V, M> extends RedissonBaseMapIterat
} }
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() { protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, iterPos); return map.scanIterator(client, nextIterPos);
} }
protected void removeKey() { protected void removeKey() {

@ -25,7 +25,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScoredCodec; import org.redisson.client.codec.ScoredCodec;
@ -245,52 +244,16 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return new Iterator<V>() { return new RedissonBaseIterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long iterPos;
private boolean removeExecuted;
private V value;
@Override @Override
public boolean hasNext() { ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
if (iter == null || !iter.hasNext()) { return scanIterator(client, nextIterPos);
ListScanResult<V> res = scanIterator(client, iterPos);
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
} }
@Override @Override
public V next() { void remove(V value) {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
removeExecuted = false;
return value;
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
iter.remove();
RedissonScoredSortedSet.this.remove(value); RedissonScoredSortedSet.this.remove(value);
removeExecuted = true;
} }
}; };

@ -82,64 +82,16 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return new Iterator<V>() { return new RedissonBaseIterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
@Override @Override
public V next() { ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
if (!hasNext()) { return scanIterator(client, nextIterPos);
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
} }
@Override @Override
public void remove() { void remove(V value) {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
RedissonSet.this.remove(value); RedissonSet.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
} }
}; };

@ -161,64 +161,16 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return new Iterator<V>() { return new RedissonBaseIterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
@Override @Override
public V next() { ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
if (!hasNext()) { return scanIterator(client, nextIterPos);
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
} }
@Override @Override
public void remove() { void remove(V value) {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
RedissonSetCache.this.remove(value); RedissonSetCache.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
} }
}; };

@ -22,7 +22,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -128,64 +127,16 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return new Iterator<V>() { return new RedissonBaseIterator<V>() {
private List<V> firstValues;
private Iterator<V> iter;
private InetSocketAddress client;
private long nextIterPos;
private boolean currentElementRemoved;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null || !iter.hasNext()) {
if (nextIterPos == -1) {
return false;
}
long prevIterPos = nextIterPos;
ListScanResult<V> res = scanIterator(client, nextIterPos);
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
return false;
}
iter = res.getValues().iterator();
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1;
}
}
return iter.hasNext();
}
@Override @Override
public V next() { ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
if (!hasNext()) { return scanIterator(client, nextIterPos);
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
currentElementRemoved = false;
return value;
} }
@Override @Override
public void remove() { void remove(V value) {
if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
RedissonSetMultimapValues.this.remove(value); RedissonSetMultimapValues.this.remove(value);
currentElementRemoved = true;
removeExecuted = true;
} }
}; };

@ -23,6 +23,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.JsonJacksonCodec;
import org.redisson.core.Predicate; import org.redisson.core.Predicate;
import org.redisson.core.RMap; import org.redisson.core.RMap;
import org.redisson.core.RSet;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -257,6 +258,26 @@ public class RedissonMapTest extends BaseTest {
assertThat(val2).isEqualTo(4); assertThat(val2).isEqualTo(4);
} }
@Test
public void testIteratorRemoveHighVolume() throws InterruptedException {
RMap<Integer, Integer> map = redisson.getMap("simpleMap");
for (int i = 0; i < 10000; i++) {
map.put(i, i*10);
}
int cnt = 0;
Iterator<Integer> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
Integer integer = iterator.next();
iterator.remove();
cnt++;
}
Assert.assertEquals(10000, cnt);
assertThat(map).isEmpty();
Assert.assertEquals(0, map.size());
}
@Test @Test
public void testIterator() { public void testIterator() {
RMap<Integer, Integer> rMap = redisson.getMap("123"); RMap<Integer, Integer> rMap = redisson.getMap("123");

Loading…
Cancel
Save