refactoring

pull/1705/head
Nikita 7 years ago
parent adc3b7e552
commit 30c9f9d59c

@ -57,6 +57,10 @@ public class CommandReactiveBatchService extends CommandReactiveService {
return publisher;
}
public <R> Publisher<R> superReactive(Supplier<RFuture<R>> supplier) {
return super.reactive(supplier);
}
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {

@ -33,6 +33,7 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
super(connectionManager);
}
@Override
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
return new NettyFuturePublisher<R>(supplier);
}

@ -71,7 +71,6 @@ import org.redisson.api.RStreamReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.command.CommandReactiveBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
@ -281,7 +280,7 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public Publisher<BatchResult<?>> execute() {
return new NettyFuturePublisher<BatchResult<?>>(new Supplier<RFuture<BatchResult<?>>>() {
return executorService.superReactive(new Supplier<RFuture<BatchResult<?>>>() {
@Override
public RFuture<BatchResult<?>> get() {
return executorService.executeAsync(options);

@ -70,7 +70,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
@Override
public Publisher<Integer> addListener(final PatternStatusListener listener) {
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
RPromise<Integer> promise = new RedissonPromise<Integer>();
@ -82,7 +82,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
@Override
public Publisher<Integer> addListener(final PatternMessageListener<M> listener) {
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
RPromise<Integer> promise = new RedissonPromise<Integer>();

@ -118,7 +118,7 @@ public class RedissonTransactionReactive implements RTransactionReactive {
@Override
public Publisher<Void> commit() {
return new NettyFuturePublisher<Void>(new Supplier<RFuture<Void>>() {
return executorService.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return transaction.commitAsync();
@ -128,7 +128,7 @@ public class RedissonTransactionReactive implements RTransactionReactive {
@Override
public Publisher<Void> rollback() {
return new NettyFuturePublisher<Void>(new Supplier<RFuture<Void>>() {
return executorService.reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return transaction.rollbackAsync();

Loading…
Cancel
Save