refactoring

pull/1923/head
Nikita Koksharov 6 years ago
parent aa6f9fada9
commit 0090b8796d

@ -34,6 +34,7 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
@ -88,9 +89,6 @@ import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils; import org.springframework.util.ReflectionUtils;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/** /**
* Redisson connection * Redisson connection
* *
@ -271,20 +269,20 @@ public class RedissonConnection extends AbstractRedisConnection {
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size()); final AtomicLong executed = new AtomicLong(range2key.size());
FutureListener<List<?>> listener = new FutureListener<List<?>>() { BiConsumer<List<?>, Throwable> listener = new BiConsumer<List<?>, Throwable>() {
@Override @Override
public void operationComplete(Future<List<?>> future) throws Exception { public void accept(List<?> r, Throwable u) {
if (future.isSuccess()) { if (u == null) {
List<Long> result = (List<Long>) future.get(); List<Long> result = (List<Long>) r;
for (Long res : result) { for (Long res : result) {
if (res != null) { if (res != null) {
count.addAndGet(res); count.addAndGet(res);
} }
} }
} else { } else {
failed.set(future.cause()); failed.set(u);
} }
checkExecution(result, failed, count, executed); checkExecution(result, failed, count, executed);
} }
}; };
@ -296,7 +294,7 @@ public class RedissonConnection extends AbstractRedisConnection {
} }
RFuture<List<?>> future = es.executeAsync(); RFuture<List<?>> future = es.executeAsync();
future.addListener(listener); future.onComplete(listener);
} }
return result; return result;

@ -34,6 +34,7 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
@ -273,20 +274,20 @@ public class RedissonConnection extends AbstractRedisConnection {
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size()); final AtomicLong executed = new AtomicLong(range2key.size());
FutureListener<List<?>> listener = new FutureListener<List<?>>() { BiConsumer<List<?>, Throwable> listener = new BiConsumer<List<?>, Throwable>() {
@Override @Override
public void operationComplete(Future<List<?>> future) throws Exception { public void accept(List<?> r, Throwable u) {
if (future.isSuccess()) { if (u == null) {
List<Long> result = (List<Long>) future.get(); List<Long> result = (List<Long>) r;
for (Long res : result) { for (Long res : result) {
if (res != null) { if (res != null) {
count.addAndGet(res); count.addAndGet(res);
} }
} }
} else { } else {
failed.set(future.cause()); failed.set(u);
} }
checkExecution(result, failed, count, executed); checkExecution(result, failed, count, executed);
} }
}; };
@ -298,7 +299,7 @@ public class RedissonConnection extends AbstractRedisConnection {
} }
RFuture<List<?>> future = es.executeAsync(); RFuture<List<?>> future = es.executeAsync();
future.addListener(listener); future.onComplete(listener);
} }
return result; return result;

@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
@ -282,20 +283,20 @@ public class RedissonConnection extends AbstractRedisConnection {
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size()); final AtomicLong executed = new AtomicLong(range2key.size());
FutureListener<List<?>> listener = new FutureListener<List<?>>() { BiConsumer<List<?>, Throwable> listener = new BiConsumer<List<?>, Throwable>() {
@Override @Override
public void operationComplete(Future<List<?>> future) throws Exception { public void accept(List<?> r, Throwable u) {
if (future.isSuccess()) { if (u == null) {
List<Long> result = (List<Long>) future.get(); List<Long> result = (List<Long>) r;
for (Long res : result) { for (Long res : result) {
if (res != null) { if (res != null) {
count.addAndGet(res); count.addAndGet(res);
} }
} }
} else { } else {
failed.set(future.cause()); failed.set(u);
} }
checkExecution(result, failed, count, executed); checkExecution(result, failed, count, executed);
} }
}; };
@ -307,7 +308,7 @@ public class RedissonConnection extends AbstractRedisConnection {
} }
RFuture<List<?>> future = es.executeAsync(); RFuture<List<?>> future = es.executeAsync();
future.addListener(listener); future.onComplete(listener);
} }
return result; return result;

@ -31,8 +31,6 @@ import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisClusterNode;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -59,16 +57,13 @@ abstract class RedissonBaseReactive {
RFuture<String> toStringFuture(RFuture<Void> f) { RFuture<String> toStringFuture(RFuture<Void> f) {
RPromise<String> promise = new RedissonPromise<>(); RPromise<String> promise = new RedissonPromise<>();
f.addListener(new FutureListener<Void>() { f.onComplete((res, e) -> {
@Override if (e != null) {
public void operationComplete(Future<Void> future) throws Exception { promise.tryFailure(e);
if (!future.isSuccess()) { return;
promise.tryFailure(future.cause());
return;
}
promise.trySuccess("OK");
} }
promise.trySuccess("OK");
}); });
return promise; return promise;
} }

@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
@ -282,20 +283,20 @@ public class RedissonConnection extends AbstractRedisConnection {
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size()); final AtomicLong executed = new AtomicLong(range2key.size());
FutureListener<List<?>> listener = new FutureListener<List<?>>() { BiConsumer<List<?>, Throwable> listener = new BiConsumer<List<?>, Throwable>() {
@Override @Override
public void operationComplete(Future<List<?>> future) throws Exception { public void accept(List<?> r, Throwable u) {
if (future.isSuccess()) { if (u == null) {
List<Long> result = (List<Long>) future.get(); List<Long> result = (List<Long>) r;
for (Long res : result) { for (Long res : result) {
if (res != null) { if (res != null) {
count.addAndGet(res); count.addAndGet(res);
} }
} }
} else { } else {
failed.set(future.cause()); failed.set(u);
} }
checkExecution(result, failed, count, executed); checkExecution(result, failed, count, executed);
} }
}; };
@ -307,7 +308,7 @@ public class RedissonConnection extends AbstractRedisConnection {
} }
RFuture<List<?>> future = es.executeAsync(); RFuture<List<?>> future = es.executeAsync();
future.addListener(listener); future.onComplete(listener);
} }
return result; return result;

@ -59,16 +59,13 @@ abstract class RedissonBaseReactive {
RFuture<String> toStringFuture(RFuture<Void> f) { RFuture<String> toStringFuture(RFuture<Void> f) {
RPromise<String> promise = new RedissonPromise<>(); RPromise<String> promise = new RedissonPromise<>();
f.addListener(new FutureListener<Void>() { f.onComplete((res, e) -> {
@Override if (e != null) {
public void operationComplete(Future<Void> future) throws Exception { promise.tryFailure(e);
if (!future.isSuccess()) { return;
promise.tryFailure(future.cause());
return;
}
promise.trySuccess("OK");
} }
promise.trySuccess("OK");
}); });
return promise; return promise;
} }

@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
@ -99,9 +100,6 @@ import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils; import org.springframework.util.ReflectionUtils;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/** /**
* Redisson connection * Redisson connection
* *
@ -282,20 +280,20 @@ public class RedissonConnection extends AbstractRedisConnection {
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
final AtomicLong executed = new AtomicLong(range2key.size()); final AtomicLong executed = new AtomicLong(range2key.size());
FutureListener<List<?>> listener = new FutureListener<List<?>>() { BiConsumer<List<?>, Throwable> listener = new BiConsumer<List<?>, Throwable>() {
@Override @Override
public void operationComplete(Future<List<?>> future) throws Exception { public void accept(List<?> r, Throwable u) {
if (future.isSuccess()) { if (u == null) {
List<Long> result = (List<Long>) future.get(); List<Long> result = (List<Long>) r;
for (Long res : result) { for (Long res : result) {
if (res != null) { if (res != null) {
count.addAndGet(res); count.addAndGet(res);
} }
} }
} else { } else {
failed.set(future.cause()); failed.set(u);
} }
checkExecution(result, failed, count, executed); checkExecution(result, failed, count, executed);
} }
}; };
@ -307,7 +305,7 @@ public class RedissonConnection extends AbstractRedisConnection {
} }
RFuture<List<?>> future = es.executeAsync(); RFuture<List<?>> future = es.executeAsync();
future.addListener(listener); future.onComplete(listener);
} }
return result; return result;

@ -63,8 +63,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length); CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) { for (ByteBuffer channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel)); RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, toChannelName(channel));
f.addListener(e -> RedissonReactiveSubscription.this.channels.put(channel, e.getNow())); f.onComplete((res, e) -> RedissonReactiveSubscription.this.channels.put(channel, res));
f.addListener(listener); f.onComplete(listener);
} }
return Mono.fromFuture(result); return Mono.fromFuture(result);
} }
@ -79,8 +79,8 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length); CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) { for (ByteBuffer channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE); RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(toChannelName(channel), ByteArrayCodec.INSTANCE);
f.addListener(e -> RedissonReactiveSubscription.this.patterns.put(channel, e.getNow())); f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(channel, res));
f.addListener(listener); f.onComplete(listener);
} }
return Mono.fromFuture(result); return Mono.fromFuture(result);
} }
@ -96,7 +96,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length); CountableListener<Void> listener = new CountableListener<Void>(result, null, channels.length);
for (ByteBuffer channel : channels) { for (ByteBuffer channel : channels) {
RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE); RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.UNSUBSCRIBE);
f.addListener(listener); f.onComplete(listener);
} }
return Mono.fromFuture(result); return Mono.fromFuture(result);
} }
@ -112,7 +112,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length); CountableListener<Void> listener = new CountableListener<Void>(result, null, patterns.length);
for (ByteBuffer channel : patterns) { for (ByteBuffer channel : patterns) {
RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE); RFuture<Codec> f = subscribeService.unsubscribe(toChannelName(channel), PubSubType.PUNSUBSCRIBE);
f.addListener(listener); f.onComplete(listener);
} }
return Mono.fromFuture(result); return Mono.fromFuture(result);
} }

Loading…
Cancel
Save