refactoring

pull/4061/head
Nikita Koksharov 3 years ago
parent cc956fcb53
commit 336967c799

@ -93,7 +93,7 @@ public class BaseRegion implements TransactionalDataRegion, GeneralDataRegion {
fallbackMode = true;
connectionManager.newTimeout(t -> {
RFuture<Boolean> future = mapCache.isExistsAsync();
future.onComplete((r, ex) -> {
future.whenComplete((r, ex) -> {
if (ex == null) {
fallbackMode = false;
} else {

@ -94,7 +94,7 @@ public class BaseRegion implements TransactionalDataRegion, GeneralDataRegion {
fallbackMode = true;
connectionManager.newTimeout(t -> {
RFuture<Boolean> future = mapCache.isExistsAsync();
future.onComplete((r, ex) -> {
future.whenComplete((r, ex) -> {
if (ex == null) {
fallbackMode = false;
} else {

@ -94,7 +94,7 @@ public class BaseRegion implements TransactionalDataRegion, GeneralDataRegion {
fallbackMode = true;
connectionManager.newTimeout(t -> {
RFuture<Boolean> future = mapCache.isExistsAsync();
future.onComplete((r, ex) -> {
future.whenComplete((r, ex) -> {
if (ex == null) {
fallbackMode = false;
} else {

@ -85,7 +85,7 @@ public class RedissonStorage implements DomainDataStorageAccess {
fallbackMode = true;
connectionManager.newTimeout(t -> {
RFuture<Boolean> future = mapCache.isExistsAsync();
future.onComplete((r, ex) -> {
future.whenComplete((r, ex) -> {
if (ex == null) {
fallbackMode = false;
} else {

@ -39,7 +39,7 @@ public class ElementsStream {
return;
}
futureRef.set(future);
future.onComplete((res, e) -> {
future.whenComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;

@ -54,7 +54,7 @@ public abstract class IteratorConsumer<V> implements LongConsumer {
}
protected void nextValues(FluxSink<V> emitter) {
scanIterator(client, nextIterPos).onComplete((res, e) -> {
scanIterator(client, nextIterPos).whenComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;

@ -53,7 +53,7 @@ public abstract class PublisherAdder<V> {
@Override
protected void hookOnNext(V o) {
values.getAndIncrement();
add(o).onComplete((res, e) -> {
add(o).whenComplete((res, e) -> {
if (e != null) {
promise.completeExceptionally(e);
return;

@ -88,7 +88,7 @@ public class RedissonListReactive<V> {
}
private void onRequest(boolean forward, FluxSink<V> emitter, long n) {
instance.getAsync(currentIndex).onComplete((value, e) -> {
instance.getAsync(currentIndex).whenComplete((value, e) -> {
if (e != null) {
emitter.error(e);
return;

@ -36,7 +36,7 @@ public class RedissonReliableTopicReactive {
}
public <M> Flux<M> getMessages(Class<M> type) {
return Flux.<M>create(emitter -> {
return Flux.create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<String> idRef = new AtomicReference<>();
@ -47,7 +47,7 @@ public class RedissonReliableTopicReactive {
emitter.complete();
}
});
t.onComplete((id, e) -> {
t.whenComplete((id, e) -> {
if (e != null) {
emitter.error(e);
return;

@ -37,7 +37,7 @@ public class RedissonTopicReactive {
}
public <M> Flux<M> getMessages(Class<M> type) {
return Flux.<M>create(emitter -> {
return Flux.create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
RFuture<Integer> t = topic.addListenerAsync(type, new MessageListener<M>() {
@ -50,7 +50,7 @@ public class RedissonTopicReactive {
}
}
});
t.onComplete((id, e) -> {
t.whenComplete((id, e) -> {
if (e != null) {
emitter.error(e);
return;

@ -57,7 +57,7 @@ public class RedissonTransferQueueReactive<V> {
}
protected void onRequest(boolean forward, FluxSink<V> emitter, long n) {
queue.getValueAsync(currentIndex).onComplete((value, e) -> {
queue.getValueAsync(currentIndex).whenComplete((value, e) -> {
if (e != null) {
emitter.error(e);
return;

Loading…
Cancel
Save