refactoring

pull/1461/head
Nikita 7 years ago
parent 733f64d5af
commit e8a654979e

@ -16,8 +16,6 @@
package org.redisson.reactive; package org.redisson.reactive;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -26,7 +24,6 @@ import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
import reactor.rx.Stream; import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription; import reactor.rx.subscription.ReactiveSubscription;
@ -54,8 +51,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
public void subscribe(final Subscriber<? super M> t) { public void subscribe(final Subscriber<? super M> t) {
t.onSubscribe(new ReactiveSubscription<M>(this, t) { t.onSubscribe(new ReactiveSubscription<M>(this, t) {
private Map<HashValue, HashValue> firstValues; private long nextIterPos = 0;
private long iterPos = 0;
private RedisClient client; private RedisClient client;
private long currentIndex; private long currentIndex;
@ -66,17 +62,9 @@ public class RedissonMapReactiveIterator<K, V, M> {
nextValues(); nextValues();
} }
private Map<HashValue, HashValue> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<HashValue, HashValue> result = new HashMap<HashValue, HashValue>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getHash(), entry.getValue().getHash());
}
return result;
}
protected void nextValues() { protected void nextValues() {
final ReactiveSubscription<M> m = this; final ReactiveSubscription<M> m = this;
map.scanIteratorReactive(client, iterPos).subscribe(new Subscriber<MapScanResult<ScanObjectEntry, ScanObjectEntry>>() { map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<MapScanResult<ScanObjectEntry, ScanObjectEntry>>() {
@Override @Override
public void onSubscribe(Subscription s) { public void onSubscribe(Subscription s) {
@ -85,16 +73,15 @@ public class RedissonMapReactiveIterator<K, V, M> {
@Override @Override
public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) { public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) {
client = res.getRedisClient(); if (currentIndex == 0) {
if (iterPos == 0 && firstValues == null) { client = null;
firstValues = convert(res.getMap()); nextIterPos = 0;
} else if (convert(res.getMap()).equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return; return;
} }
iterPos = res.getPos(); client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : res.getMap().entrySet()) { for (Entry<ScanObjectEntry, ScanObjectEntry> entry : res.getMap().entrySet()) {
M val = getValue(entry); M val = getValue(entry);
m.onNext(val); m.onNext(val);
@ -104,6 +91,11 @@ public class RedissonMapReactiveIterator<K, V, M> {
return; return;
} }
} }
if (res.getPos() == 0) {
currentIndex = 0;
m.onComplete();
}
} }
@Override @Override

@ -15,16 +15,12 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.misc.HashValue;
import reactor.rx.Stream; import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription; import reactor.rx.subscription.ReactiveSubscription;
@ -41,8 +37,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
public void subscribe(final Subscriber<? super V> t) { public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) { t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<HashValue> firstValues;
private List<HashValue> lastValues;
private long nextIterPos; private long nextIterPos;
private RedisClient client; private RedisClient client;
@ -53,12 +47,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
nextValues(); nextValues();
} }
private void handle(List<ScanObjectEntry> vals) {
for (ScanObjectEntry val : vals) {
onNext((V)val.getObj());
}
}
protected void nextValues() { protected void nextValues() {
final ReactiveSubscription<V> m = this; final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() { scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() {
@ -72,54 +60,18 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
public void onNext(ListScanResult<ScanObjectEntry> res) { public void onNext(ListScanResult<ScanObjectEntry> res) {
if (finished) { if (finished) {
client = null; client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0; nextIterPos = 0;
return; return;
} }
long prevIterPos = nextIterPos;
lastValues = convert(res.getValues());
client = res.getRedisClient(); client = res.getRedisClient();
nextIterPos = res.getPos();
if (nextIterPos == 0 && firstValues == null) { for (ScanObjectEntry val : res.getValues()) {
firstValues = lastValues; m.onNext((V)val.getObj());
lastValues = null;
if (firstValues.isEmpty()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
}
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
if (res.getPos() == 0) {
finished = true;
m.onComplete();
return;
}
}
} else if (lastValues.removeAll(firstValues)) {
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
prevIterPos = -1;
finished = true;
m.onComplete();
return;
}
} }
handle(res.getValues()); if (res.getPos() == 0) {
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
finished = true; finished = true;
m.onComplete(); m.onComplete();
} }
@ -142,14 +94,6 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}); });
} }
private List<HashValue> convert(List<ScanObjectEntry> list) {
List<HashValue> result = new ArrayList<HashValue>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getHash());
}
return result;
}
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos); protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(RedisClient client, long nextIterPos);
} }

Loading…
Cancel
Save