From 0090b8796d5c4ba6b1eeea1c56a09a6e0eac4387 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 15 Feb 2019 15:10:08 +0300 Subject: [PATCH] refactoring --- .../data/connection/RedissonConnection.java | 18 ++++++++---------- .../data/connection/RedissonConnection.java | 15 ++++++++------- .../data/connection/RedissonConnection.java | 15 ++++++++------- .../data/connection/RedissonBaseReactive.java | 17 ++++++----------- .../data/connection/RedissonConnection.java | 15 ++++++++------- .../data/connection/RedissonBaseReactive.java | 15 ++++++--------- .../data/connection/RedissonConnection.java | 18 ++++++++---------- .../RedissonReactiveSubscription.java | 12 ++++++------ 8 files changed, 58 insertions(+), 67 deletions(-) diff --git a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 18bd60401..717af7e57 100644 --- a/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-16/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -34,6 +34,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.redisson.Redisson; import org.redisson.SlotCallback; @@ -88,9 +89,6 @@ import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - /** * Redisson connection * @@ -271,20 +269,20 @@ public class RedissonConnection extends AbstractRedisConnection { final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); final AtomicLong executed = new AtomicLong(range2key.size()); - FutureListener> listener = new FutureListener>() { + BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { @Override - public void operationComplete(Future> future) throws Exception { - if (future.isSuccess()) { - List result = (List) future.get(); + public void accept(List r, Throwable u) { + if (u == null) { + List result = (List) r; for (Long res : result) { if (res != null) { count.addAndGet(res); } } } else { - failed.set(future.cause()); + failed.set(u); } - + checkExecution(result, failed, count, executed); } }; @@ -296,7 +294,7 @@ public class RedissonConnection extends AbstractRedisConnection { } RFuture> future = es.executeAsync(); - future.addListener(listener); + future.onComplete(listener); } return result; diff --git a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 455c4643c..b8c7268b1 100644 --- a/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-17/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -34,6 +34,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.redisson.Redisson; import org.redisson.SlotCallback; @@ -273,20 +274,20 @@ public class RedissonConnection extends AbstractRedisConnection { final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); final AtomicLong executed = new AtomicLong(range2key.size()); - FutureListener> listener = new FutureListener>() { + BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { @Override - public void operationComplete(Future> future) throws Exception { - if (future.isSuccess()) { - List result = (List) future.get(); + public void accept(List r, Throwable u) { + if (u == null) { + List result = (List) r; for (Long res : result) { if (res != null) { count.addAndGet(res); } } } else { - failed.set(future.cause()); + failed.set(u); } - + checkExecution(result, failed, count, executed); } }; @@ -298,7 +299,7 @@ public class RedissonConnection extends AbstractRedisConnection { } RFuture> future = es.executeAsync(); - future.addListener(listener); + future.onComplete(listener); } return result; diff --git a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 53fd6afb4..a7622bbab 100644 --- a/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-18/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.redisson.Redisson; import org.redisson.SlotCallback; @@ -282,20 +283,20 @@ public class RedissonConnection extends AbstractRedisConnection { final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); final AtomicLong executed = new AtomicLong(range2key.size()); - FutureListener> listener = new FutureListener>() { + BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { @Override - public void operationComplete(Future> future) throws Exception { - if (future.isSuccess()) { - List result = (List) future.get(); + public void accept(List r, Throwable u) { + if (u == null) { + List result = (List) r; for (Long res : result) { if (res != null) { count.addAndGet(res); } } } else { - failed.set(future.cause()); + failed.set(u); } - + checkExecution(result, failed, count, executed); } }; @@ -307,7 +308,7 @@ public class RedissonConnection extends AbstractRedisConnection { } RFuture> future = es.executeAsync(); - future.addListener(listener); + future.onComplete(listener); } return result; diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java index bebf83fe7..dbde90815 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java @@ -31,8 +31,6 @@ import org.redisson.reactive.CommandReactiveExecutor; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.RedisClusterNode; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -59,16 +57,13 @@ abstract class RedissonBaseReactive { RFuture toStringFuture(RFuture f) { RPromise promise = new RedissonPromise<>(); - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - promise.trySuccess("OK"); + f.onComplete((res, e) -> { + if (e != null) { + promise.tryFailure(e); + return; } + + promise.trySuccess("OK"); }); return promise; } diff --git a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index deb700144..d7433d035 100644 --- a/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-20/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.redisson.Redisson; import org.redisson.SlotCallback; @@ -282,20 +283,20 @@ public class RedissonConnection extends AbstractRedisConnection { final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); final AtomicLong executed = new AtomicLong(range2key.size()); - FutureListener> listener = new FutureListener>() { + BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { @Override - public void operationComplete(Future> future) throws Exception { - if (future.isSuccess()) { - List result = (List) future.get(); + public void accept(List r, Throwable u) { + if (u == null) { + List result = (List) r; for (Long res : result) { if (res != null) { count.addAndGet(res); } } } else { - failed.set(future.cause()); + failed.set(u); } - + checkExecution(result, failed, count, executed); } }; @@ -307,7 +308,7 @@ public class RedissonConnection extends AbstractRedisConnection { } RFuture> future = es.executeAsync(); - future.addListener(listener); + future.onComplete(listener); } return result; diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java index bebf83fe7..e5a929188 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonBaseReactive.java @@ -59,16 +59,13 @@ abstract class RedissonBaseReactive { RFuture toStringFuture(RFuture f) { RPromise promise = new RedissonPromise<>(); - f.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - promise.trySuccess("OK"); + f.onComplete((res, e) -> { + if (e != null) { + promise.tryFailure(e); + return; } + + promise.trySuccess("OK"); }); return promise; } diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java index 65aa72f01..463731a84 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonConnection.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import org.redisson.Redisson; import org.redisson.SlotCallback; @@ -99,9 +100,6 @@ import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - /** * Redisson connection * @@ -282,20 +280,20 @@ public class RedissonConnection extends AbstractRedisConnection { final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); final AtomicLong executed = new AtomicLong(range2key.size()); - FutureListener> listener = new FutureListener>() { + BiConsumer, Throwable> listener = new BiConsumer, Throwable>() { @Override - public void operationComplete(Future> future) throws Exception { - if (future.isSuccess()) { - List result = (List) future.get(); + public void accept(List r, Throwable u) { + if (u == null) { + List result = (List) r; for (Long res : result) { if (res != null) { count.addAndGet(res); } } } else { - failed.set(future.cause()); + failed.set(u); } - + checkExecution(result, failed, count, executed); } }; @@ -307,7 +305,7 @@ public class RedissonConnection extends AbstractRedisConnection { } RFuture> future = es.executeAsync(); - future.addListener(listener); + future.onComplete(listener); } return result; diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java index 0610f5e51..f31a7c1a1 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSubscription.java @@ -63,8 +63,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { CountableListener listener = new CountableListener(result, null, channels.length); for (ByteBuffer channel : channels) { RFuture f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); - f.addListener(e -> RedissonReactiveSubscription.this.channels.put(channel, e.getNow())); - f.addListener(listener); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res)); + f.onComplete(listener); } return Mono.fromFuture(result); } @@ -79,8 +79,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { CountableListener listener = new CountableListener(result, null, patterns.length); for (ByteBuffer channel : patterns) { RFuture f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); - f.addListener(e -> RedissonReactiveSubscription.this.patterns.put(channel, e.getNow())); - f.addListener(listener); + f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res)); + f.onComplete(listener); } return Mono.fromFuture(result); } @@ -96,7 +96,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { CountableListener listener = new CountableListener(result, null, channels.length); for (ByteBuffer channel : channels) { RFuture f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE); - f.addListener(listener); + f.onComplete(listener); } return Mono.fromFuture(result); } @@ -112,7 +112,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription { CountableListener listener = new CountableListener(result, null, patterns.length); for (ByteBuffer channel : patterns) { RFuture f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE); - f.addListener(listener); + f.onComplete(listener); } return Mono.fromFuture(result); }