Slaves synchronization support during RBatch/RBatchReactive execution. #317

pull/1105/head
Nikita 7 years ago
parent a6d9a65549
commit 3e72d1fe71

@ -15,10 +15,10 @@
*/
package org.redisson;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicLongAsync;
import org.redisson.api.RBatch;
@ -64,6 +64,10 @@ public class RedissonBatch implements RBatch {
private int retryAttempts;
private long retryInterval;
private int syncSlaves;
private long syncTimeout;
private boolean skipResult;
protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
@ -230,6 +234,19 @@ public class RedissonBatch implements RBatch {
return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
}
@Override
public RBatch syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
return this;
}
@Override
public RBatch skipResult() {
this.skipResult = true;
return this;
}
@Override
public RBatch retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
@ -249,25 +266,25 @@ public class RedissonBatch implements RBatch {
}
@Override
public List<?> execute() {
return executorService.execute(timeout, retryAttempts, retryInterval);
public BatchResult<?> execute() {
return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
@Override
public void executeSkipResult() {
executorService.executeSkipResult(timeout, retryAttempts, retryInterval);
executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval);
}
@Override
public RFuture<Void> executeSkipResultAsync() {
return executorService.executeSkipResultAsync(timeout, retryAttempts, retryInterval);
return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval);
}
@Override
public RFuture<List<?>> executeAsync() {
return executorService.executeAsync(timeout, retryAttempts, retryInterval);
public RFuture<BatchResult<?>> executeAsync() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
@Override
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(id, executorService, name);

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
@ -295,7 +296,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
RFuture<List<?>> clientsFuture = send(timeout, responseName,
RFuture<BatchResult<?>> clientsFuture = send(timeout, responseName,
responseHolder.get());
clientsFuture.addListener(new FutureListener<List<?>>() {
@Override
@ -324,7 +325,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
private <T extends RRemoteServiceResponse> RFuture<List<?>> send(long timeout, String responseName, T response) {
private <T extends RRemoteServiceResponse> RFuture<BatchResult<?>> send(long timeout, String responseName, T response) {
RBatch batch = redisson.createBatch();
RBlockingQueueAsync<T> queue = batch.getBlockingQueue(responseName, getCodec());
queue.putAsync(response);

@ -0,0 +1,242 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
/**
*
* @author Nikita Koksharov
*
*/
public class BatchResult<E> implements List<E> {
private final List<E> responses;
private final int syncedSlaves;
public BatchResult(List<E> responses, int syncedSlaves) {
super();
this.responses = responses;
this.syncedSlaves = syncedSlaves;
}
/**
* Returns list with result object for each command
*
* @return list
*/
public List<?> getResponses() {
return responses;
}
/**
* Returns synchronized slaves amount involved during batch execution
*
* @return slaves
*/
public int getSyncedSlaves() {
return syncedSlaves;
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public int size() {
return responses.size();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean isEmpty() {
return responses.isEmpty();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean contains(Object o) {
return responses.contains(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public Iterator<E> iterator() {
return responses.iterator();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public Object[] toArray() {
return responses.toArray();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public <T> T[] toArray(T[] a) {
return responses.toArray(a);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean add(E e) {
return responses.add(e);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean remove(Object o) {
return responses.remove(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean containsAll(Collection<?> c) {
return responses.containsAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean addAll(Collection<? extends E> c) {
return responses.addAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean addAll(int index, Collection<? extends E> c) {
return responses.addAll(index, c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean removeAll(Collection<?> c) {
return responses.removeAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean retainAll(Collection<?> c) {
return responses.retainAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public void clear() {
responses.clear();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public E get(int index) {
return responses.get(index);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public E set(int index, E element) {
return responses.set(index, element);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public void add(int index, E element) {
responses.add(index, element);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public E remove(int index) {
return responses.remove(index);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public int indexOf(Object o) {
return responses.indexOf(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public int lastIndexOf(Object o) {
return responses.lastIndexOf(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public ListIterator<E> listIterator() {
return responses.listIterator();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public ListIterator<E> listIterator(int index) {
return responses.listIterator(index);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public List<E> subList(int fromIndex, int toIndex) {
return responses.subList(fromIndex, toIndex);
}
}

@ -15,7 +15,6 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisException;
@ -387,7 +386,7 @@ public interface RBatch {
* @throws RedisException in case of any error
*
*/
List<?> execute() throws RedisException;
BatchResult<?> execute() throws RedisException;
/**
* Executes all operations accumulated during async methods invocations asynchronously.
@ -397,36 +396,41 @@ public interface RBatch {
*
* @return List with result object for each command
*/
RFuture<List<?>> executeAsync();
RFuture<BatchResult<?>> executeAsync();
/**
* Executes all operations accumulated during async methods invocations.
* Command replies are skipped such approach saves response bandwidth.
* <p>
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed.
* <p>
* NOTE: Redis 3.2+ required
*
* @throws RedisException in case of any error
*
* Use {@link #skipResult()}
*/
@Deprecated
void executeSkipResult();
/**
* Executes all operations accumulated during async methods invocations asynchronously,
* Command replies are skipped such approach saves response bandwidth.
* <p>
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed
* Use {@link #skipResult()}
*/
@Deprecated
RFuture<Void> executeSkipResultAsync();
/**
* Inform Redis not to send reply for this batch.
* Such approach saves response bandwidth.
* <p>
* NOTE: Redis 3.2+ required
*
* @return void
* @throws RedisException in case of any error
*
* @return self instance
*/
RFuture<Void> executeSkipResultAsync();
RBatch skipResult();
/**
* Synchronize write operations execution across defined amount of Redis slave nodes.
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves amount to sync
* @param timeout for sync operation
* @param unit value
* @return self instance
*/
RBatch syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.

@ -15,7 +15,7 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
@ -249,6 +249,68 @@ public interface RBatchReactive {
*
* @return List with result object for each command
*/
Publisher<List<?>> execute();
Publisher<BatchResult<?>> execute();
/**
* Command replies are skipped such approach saves response bandwidth.
* <p>
* NOTE: Redis 3.2+ required
*
* @return self instance
*/
RBatchReactive skipResult();
/**
*
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves number to sync
* @param timeout for sync operation
* @param unit value
* @return self instance
*/
RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* <p>
* <code>0</code> value means use <code>Config.setTimeout</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param timeout value
* @param unit value
* @return self instance
*/
RBatchReactive timeout(long timeout, TimeUnit unit);
/**
* Defines time interval for another one attempt send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryInterval</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryInterval value
* @param unit value
* @return self instance
*/
RBatchReactive retryInterval(long retryInterval, TimeUnit unit);
/**
* Defines attempts amount to re-send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryAttempts</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryAttempts value
* @return self instance
*/
RBatchReactive retryAttempts(int retryAttempts);
}

@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
/**
*
@ -33,6 +34,10 @@ public class BatchCommandData<T, R> extends CommandData<T, R> implements Compara
private final int index;
private final AtomicReference<RedisRedirectException> redirectError = new AtomicReference<RedisRedirectException>();
public BatchCommandData(RedisCommand<T> command, Object[] params, int index) {
this(new RedissonPromise<R>(), null, command, params, index);
}
public BatchCommandData(RPromise<R> promise, Codec codec, RedisCommand<T> command, Object[] params, int index) {
super(promise, codec, command, params);
this.index = index;

@ -93,6 +93,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor());
RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor());
RedisStrictCommand<Integer> WAIT = new RedisStrictCommand<Integer>("WAIT", new IntegerReplayConvertor());
RedisStrictCommand<Void> CLIENT_REPLY = new RedisStrictCommand<Void>("CLIENT", "REPLY", new VoidReplayConvertor());
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());

@ -872,7 +872,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
private <T> T tryHandleReference(T o) {
protected <T> T tryHandleReference(T o) {
boolean hasConversion = false;
if (o instanceof List) {
List<Object> r = (List<Object>) o;

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
@ -47,6 +48,7 @@ import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -129,114 +131,114 @@ public class CommandBatchService extends CommandAsyncService {
entry.getCommands().add(commandData);
}
public List<?> execute() {
return get(executeAsync(0, 0, 0));
public BatchResult<?> execute() {
RFuture<BatchResult<?>> f = executeAsync(0, 0, false, 0, 0, 0);
return get(f);
}
public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
public BatchResult<?> execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
RFuture<BatchResult<?>> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval);
return get(f);
}
public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false, 0, 0, 0);
final RedissonPromise<Void> promise = new RedissonPromise<Void>();
RFuture<BatchResult<?>> res = executeAsync(0, 0, false, 0, 0, 0);
res.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
if (future.isSuccess()) {
promise.trySuccess(null);
} else {
promise.tryFailure(future.cause());
}
}
});
return promise;
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, false, 0, 0, 0);
}
protected RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
public <R> RFuture<R> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
return RedissonPromise.newSucceededFuture(null);
}
executed = true;
if (noResult) {
if (skipResult) {
for (Entry entry : commands.values()) {
RPromise<Object> s = connectionManager.newPromise();
BatchCommandData<?, ?> offCommand = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(offCommand);
RPromise<Object> s1 = connectionManager.newPromise();
BatchCommandData<?, ?> onCommand = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
BatchCommandData<?, ?> onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.getCommands().add(onCommand);
}
}
executed = true;
RPromise<Void> voidPromise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
commands = null;
if (syncSlaves > 0) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT, new Object[] { syncSlaves, syncTimeout }, index.incrementAndGet());
entry.getCommands().add(waitCommand);
}
});
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true, responseTimeout, retryAttempts, retryInterval);
}
return voidPromise;
}
public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, 0);
}
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
}
executed = true;
RPromise<Void> voidPromise = connectionManager.newPromise();
final RPromise<List<?>> promise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
RPromise<R> resultPromise;
RPromise<Void> voidPromise = new RedissonPromise<Void>();
if (skipResult) {
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
commands = null;
return;
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> result = new ArrayList<Object>(entries.size());
for (BatchCommandData<?, ?> commandEntry : entries) {
Object entryResult = commandEntry.getPromise().getNow();
if (isRedissonReferenceSupportEnabled() && entryResult instanceof RedissonReference) {
result.add(redisson != null
? RedissonObjectFactory.<Object>fromReference(redisson, (RedissonReference) entryResult)
: RedissonObjectFactory.<Object>fromReference(redissonReactive, (RedissonReference) entryResult));
} else {
result.add(entryResult);
});
resultPromise = (RPromise<R>) voidPromise;
} else {
final RPromise<Object> promise = new RedissonPromise<Object>();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
commands = null;
return;
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (!isWaitCommand(commandEntry)) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
} else {
syncedSlaves = (Integer) commandEntry.getPromise().getNow();
}
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
promise.trySuccess(result);
commands = null;
}
promise.trySuccess(result);
commands = null;
}
});
});
resultPromise = (RPromise<R>) promise;
}
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false, responseTimeout, retryAttempts, retryInterval);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval);
}
return promise;
return resultPromise;
}
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
@ -252,7 +254,7 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
final RPromise<Void> attemptPromise = connectionManager.newPromise();
final RPromise<Void> attemptPromise = new RedissonPromise<Void>();
final AsyncDetails details = new AsyncDetails();
@ -469,11 +471,11 @@ public class CommandBatchService extends CommandAsyncService {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
if (source.getRedirect() == Redirect.ASK) {
RPromise<Void> promise = connectionManager.newPromise();
RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
}
for (BatchCommandData<?, ?> c : entry.getCommands()) {
if (c.getPromise().isSuccess()) {
if (c.getPromise().isSuccess() && !isWaitCommand(c)) {
// skip successful commands
continue;
}
@ -493,4 +495,8 @@ public class CommandBatchService extends CommandAsyncService {
releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise, details);
}
protected boolean isWaitCommand(BatchCommandData<?, ?> c) {
return c.getCommand().getName().equals(RedisCommands.WAIT.getName());
}
}

@ -15,12 +15,12 @@
*/
package org.redisson.command;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
@ -61,43 +61,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt);
}
public List<?> execute() {
return get(executeAsync(0, 0, 0));
}
public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false, 0, 0, 0);
}
private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
}
return batchService.executeAsyncVoid(noResult, responseTimeout, retryAttempts, retryInterval);
}
public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, 0);
}
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
public RFuture<BatchResult<?>> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
@ -107,7 +71,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
});
}
return batchService.executeAsync(responseTimeout, retryAttempts, retryInterval);
return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval);
}
@Override

@ -15,9 +15,10 @@
*/
package org.redisson.reactive;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBitSetReactive;
@ -55,6 +56,14 @@ public class RedissonBatchReactive implements RBatchReactive {
private final EvictionScheduler evictionScheduler;
private final CommandReactiveBatchService executorService;
private long timeout;
private int retryAttempts;
private long retryInterval;
private int syncSlaves;
private long syncTimeout;
private boolean skipResult;
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandReactiveBatchService(connectionManager);
@ -206,14 +215,45 @@ public class RedissonBatchReactive implements RBatchReactive {
}
@Override
public Publisher<List<?>> execute() {
return new NettyFuturePublisher<List<?>>(new Supplier<RFuture<List<?>>>() {
public Publisher<BatchResult<?>> execute() {
return new NettyFuturePublisher<BatchResult<?>>(new Supplier<RFuture<BatchResult<?>>>() {
@Override
public RFuture<List<?>> get() {
return executorService.executeAsync();
public RFuture<BatchResult<?>> get() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
});
}
@Override
public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
return this;
}
@Override
public RBatchReactive skipResult() {
this.skipResult = true;
return this;
}
@Override
public RBatchReactive retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
return this;
}
@Override
public RBatchReactive retryInterval(long retryInterval, TimeUnit unit) {
this.retryInterval = unit.toMillis(retryInterval);
return this;
}
@Override
public RBatchReactive timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
return this;
}
public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
this.executorService.enableRedissonReferenceSupport(redissonReactive);

@ -2,6 +2,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -14,14 +15,20 @@ import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.redisson.api.RScript.Mode;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
public class RedissonBatchTest extends BaseTest {
@ -42,6 +49,40 @@ public class RedissonBatchTest extends BaseTest {
List<?> t = batch.execute();
System.out.println(t);
}
@Test
public void testSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch();
for (int i = 0; i < 100; i++) {
RMapAsync<String, String> map = batch.getMap("test");
map.putAsync("" + i, "" + i);
}
batch.syncSlaves(1, 1, TimeUnit.SECONDS);
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
process.shutdown();
}
@Test
public void testWriteTimeout() {
@ -62,7 +103,8 @@ public class RedissonBatchTest extends BaseTest {
batch.getBucket("A3").setAsync("001");
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
batch.executeSkipResult();
batch.skipResult();
batch.execute();
assertThat(redisson.getBucket("A1").isExists()).isFalse();
assertThat(redisson.getBucket("A3").isExists()).isTrue();
@ -76,7 +118,7 @@ public class RedissonBatchTest extends BaseTest {
batch.getBucket("A3").setAsync("001");
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
List result = batch.execute();
batch.execute();
}
@Test
@ -99,8 +141,8 @@ public class RedissonBatchTest extends BaseTest {
for (int i = 1; i < 540; i++) {
listAsync.addAsync(i);
}
List<?> res = b.execute();
Assert.assertEquals(539, res.size());
BatchResult<?> res = b.execute();
Assert.assertEquals(539, res.getResponses().size());
}
@Test
@ -113,8 +155,8 @@ public class RedissonBatchTest extends BaseTest {
batch.getAtomicLong("counter").incrementAndGetAsync();
batch.getAtomicLong("counter").incrementAndGetAsync();
}
List<?> res = batch.execute();
Assert.assertEquals(210*5, res.size());
BatchResult<?> res = batch.execute();
Assert.assertEquals(210*5, res.getResponses().size());
}
@Test(expected=RedisException.class)

Loading…
Cancel
Save