CommandReactiveBatchService implemented as separate batchService for Reactive objects.

pull/970/head
Nikita
parent 0994aee746
commit cd4a848a42

@ -19,15 +19,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture;
@ -51,7 +47,6 @@ import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.reactive.NettyFuturePublisher;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -60,10 +55,13 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import reactor.fn.Supplier;
import reactor.rx.action.support.DefaultSubscriber;
public class CommandBatchService extends CommandReactiveService {
/**
*
* @author Nikita Koksharov
*
*/
public class CommandBatchService extends CommandAsyncService {
public static class Entry {
@ -93,7 +91,6 @@ public class CommandBatchService extends CommandReactiveService {
private final AtomicInteger index = new AtomicInteger();
private Queue<Publisher<?>> publishers = new ConcurrentLinkedQueue<Publisher<?>>();
private ConcurrentMap<MasterSlaveEntry, Entry> commands = PlatformDependent.newConcurrentHashMap();
private volatile boolean executed;
@ -102,13 +99,6 @@ public class CommandBatchService extends CommandReactiveService {
super(connectionManager);
}
@Override
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
NettyFuturePublisher<R> publisher = new NettyFuturePublisher<R>(supplier);
publishers.add(publisher);
return publisher;
}
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt) {
@ -151,20 +141,11 @@ public class CommandBatchService extends CommandReactiveService {
return executeAsyncVoid(false, 0, 0, 0);
}
private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
protected RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
}
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
}
@ -214,15 +195,6 @@ public class CommandBatchService extends CommandReactiveService {
throw new IllegalStateException("Batch already executed!");
}
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
}
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
}
@ -267,7 +239,7 @@ public class CommandBatchService extends CommandReactiveService {
return promise;
}
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
protected void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval) {
if (mainPromise.isCancelled()) {
return;

@ -0,0 +1,119 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.command;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise;
import org.redisson.reactive.NettyFuturePublisher;
import reactor.fn.Supplier;
import reactor.rx.action.support.DefaultSubscriber;
/**
*
* @author Nikita Koksharov
*
*/
public class CommandReactiveBatchService extends CommandReactiveService {
private final CommandBatchService batchService;
private final Queue<Publisher<?>> publishers = new ConcurrentLinkedQueue<Publisher<?>>();
public CommandReactiveBatchService(ConnectionManager connectionManager) {
super(connectionManager);
batchService = new CommandBatchService(connectionManager);
}
@Override
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
NettyFuturePublisher<R> publisher = new NettyFuturePublisher<R>(supplier);
publishers.add(publisher);
return publisher;
}
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt);
}
public List<?> execute() {
return get(executeAsync(0, 0, 0));
}
public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false, 0, 0, 0);
}
private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
}
return batchService.executeAsyncVoid(noResult, responseTimeout, retryAttempts, retryInterval);
}
public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, 0);
}
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
}
return batchService.executeAsync(responseTimeout, retryAttempts, retryInterval);
}
@Override
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
batchService.enableRedissonReferenceSupport(redissonReactive);
return super.enableRedissonReferenceSupport(redissonReactive);
}
}

@ -39,7 +39,7 @@ import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchService;
import org.redisson.command.CommandReactiveBatchService;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
@ -53,11 +53,11 @@ import reactor.fn.Supplier;
public class RedissonBatchReactive implements RBatchReactive {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
private final CommandReactiveBatchService executorService;
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandBatchService(connectionManager);
this.executorService = new CommandReactiveBatchService(connectionManager);
}
@Override

Loading…
Cancel
Save