diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java index 213c10aa8..6aa4571c7 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactiveIterator.java @@ -16,8 +16,6 @@ package org.redisson.reactive; import java.util.AbstractMap; -import java.util.HashMap; -import java.util.Map; import java.util.Map.Entry; import org.reactivestreams.Publisher; @@ -26,7 +24,6 @@ import org.reactivestreams.Subscription; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; -import org.redisson.misc.HashValue; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -54,8 +51,7 @@ public class RedissonMapReactiveIterator { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { - private Map firstValues; - private long iterPos = 0; + private long nextIterPos = 0; private RedisClient client; private long currentIndex; @@ -66,17 +62,9 @@ public class RedissonMapReactiveIterator { nextValues(); } - private Map convert(Map map) { - Map result = new HashMap(map.size()); - for (Entry entry : map.entrySet()) { - result.put(entry.getKey().getHash(), entry.getValue().getHash()); - } - return result; - } - protected void nextValues() { final ReactiveSubscription m = this; - map.scanIteratorReactive(client, iterPos).subscribe(new Subscriber>() { + map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) { @@ -85,16 +73,15 @@ public class RedissonMapReactiveIterator { @Override public void onNext(MapScanResult res) { - client = res.getRedisClient(); - if (iterPos == 0 && firstValues == null) { - firstValues = convert(res.getMap()); - } else if (convert(res.getMap()).equals(firstValues)) { - m.onComplete(); - currentIndex = 0; + if (currentIndex == 0) { + client = null; + nextIterPos = 0; return; } - iterPos = res.getPos(); + client = res.getRedisClient(); + nextIterPos = res.getPos(); + for (Entry entry : res.getMap().entrySet()) { M val = getValue(entry); m.onNext(val); @@ -104,6 +91,11 @@ public class RedissonMapReactiveIterator { return; } } + + if (res.getPos() == 0) { + currentIndex = 0; + m.onComplete(); + } } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index c32406f60..219ed111f 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -15,16 +15,12 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.List; - import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; -import org.redisson.misc.HashValue; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -41,8 +37,6 @@ public abstract class SetReactiveIterator extends Stream { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { - private List firstValues; - private List lastValues; private long nextIterPos; private RedisClient client; @@ -53,12 +47,6 @@ public abstract class SetReactiveIterator extends Stream { nextValues(); } - private void handle(List vals) { - for (ScanObjectEntry val : vals) { - onNext((V)val.getObj()); - } - } - protected void nextValues() { final ReactiveSubscription m = this; scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @@ -72,54 +60,18 @@ public abstract class SetReactiveIterator extends Stream { public void onNext(ListScanResult res) { if (finished) { client = null; - firstValues = null; - lastValues = null; nextIterPos = 0; return; } - long prevIterPos = nextIterPos; - - lastValues = convert(res.getValues()); client = res.getRedisClient(); + nextIterPos = res.getPos(); - if (nextIterPos == 0 && firstValues == null) { - firstValues = lastValues; - lastValues = null; - if (firstValues.isEmpty()) { - client = null; - firstValues = null; - nextIterPos = 0; - prevIterPos = -1; - } - } else { - if (firstValues.isEmpty()) { - firstValues = lastValues; - lastValues = null; - if (firstValues.isEmpty()) { - if (res.getPos() == 0) { - finished = true; - m.onComplete(); - return; - } - } - } else if (lastValues.removeAll(firstValues)) { - client = null; - firstValues = null; - lastValues = null; - nextIterPos = 0; - prevIterPos = -1; - finished = true; - m.onComplete(); - return; - } + for (ScanObjectEntry val : res.getValues()) { + m.onNext((V)val.getObj()); } - handle(res.getValues()); - - nextIterPos = res.getPos(); - - if (prevIterPos == nextIterPos) { + if (res.getPos() == 0) { finished = true; m.onComplete(); } @@ -142,14 +94,6 @@ public abstract class SetReactiveIterator extends Stream { }); } - private List convert(List list) { - List result = new ArrayList(list.size()); - for (ScanObjectEntry entry : list) { - result.add(entry.getHash()); - } - return result; - } - protected abstract Publisher> scanIteratorReactive(RedisClient client, long nextIterPos); }