@ -19,19 +19,21 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RSet;
import org.redisson.core.RSetReactive;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
* Distributed and concurrent implementation of {@link java.util.Set}
@ -61,75 +63,9 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
return commandExecutor.readObservable(getName(), codec, RedisCommands.SISMEMBER, getName(), o);
// private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
// Publisher<ListScanResult<V>> f = commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
// return get(f);
// }
// @Override
// public Iterator<V> iterator() {
// return new Iterator<V>() {
// private List<V> firstValues;
// private Iterator<V> iter;
// private InetSocketAddress client;
// private long nextIterPos;
// private boolean currentElementRemoved;
// private boolean removeExecuted;
// private V value;
// @Override
// public boolean hasNext() {
// if (iter == null || !iter.hasNext()) {
// if (nextIterPos == -1) {
// return false;
// }
// long prevIterPos = nextIterPos;
// ListScanResult<V> res = scanIterator(client, nextIterPos);
// client = res.getRedisClient();
// if (nextIterPos == 0 && firstValues == null) {
// firstValues = res.getValues();
// } else if (res.getValues().equals(firstValues)) {
// return false;
// }
// iter = res.getValues().iterator();
// nextIterPos = res.getPos();
// if (prevIterPos == nextIterPos && !removeExecuted) {
// nextIterPos = -1;
// }
// }
// return iter.hasNext();
// }
// @Override
// public V next() {
// if (!hasNext()) {
// throw new NoSuchElementException("No such element at index");
// }
// value = iter.next();
// currentElementRemoved = false;
// return value;
// }
// @Override
// public void remove() {
// if (currentElementRemoved) {
// throw new IllegalStateException("Element been already deleted");
// }
// if (iter == null) {
// throw new IllegalStateException();
// }
// iter.remove();
// RedissonSetReactive.this.remove(value);
// currentElementRemoved = true;
// removeExecuted = true;
// }
// };
// }
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
public Publisher<Long> add(V e) {
@ -207,8 +143,82 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() {
// TODO Auto-generated method stub
return null;
return new Stream<V>() {
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
protected void onRequest(final long n) {
currentIndex = n;
protected void nextValues() {
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
public void onSubscribe(Subscription s) {
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
currentIndex = 0;
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
for (V val : res.getValues()) {
if (currentIndex == 0) {
if (nextIterPos == -1) {
currentIndex = 0;
public void onError(Throwable error) {
public void onComplete() {
if (currentIndex == 0) {