BatchOptions object should be supplied during Batch creation. #1392

pull/1423/head
Nikita 7 years ago
parent dc6e80ebb9
commit 3fd1015f20

@ -18,6 +18,7 @@ package org.redisson;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNodesGroup;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.MapOptions;
@ -578,14 +579,19 @@ public class Redisson implements RedissonClient {
}
@Override
public RBatch createBatch() {
RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager);
public RBatch createBatch(BatchOptions options) {
RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager, options);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
}
@Override
public RBatch createBatch() {
return createBatch(BatchOptions.defaults());
}
@Override
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache);

@ -17,6 +17,7 @@ package org.redisson;
import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicLongAsync;
@ -57,19 +58,12 @@ public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
private final BatchOptions options;
private long timeout;
private int retryAttempts;
private long retryInterval;
private int syncSlaves;
private long syncTimeout;
private boolean skipResult;
private boolean atomic;
public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
this.options = options;
}
@Override
@ -234,59 +228,48 @@ public class RedissonBatch implements RBatch {
@Override
public RBatch syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
options.syncSlaves(slaves, timeout, unit);
return this;
}
@Override
public RBatch atomic() {
this.atomic = true;
options.atomic();
return this;
}
@Override
public RBatch skipResult() {
this.skipResult = true;
options.skipResult();
return this;
}
@Override
public RBatch retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
options.retryAttempts(retryAttempts);
return this;
}
@Override
public RBatch retryInterval(long retryInterval, TimeUnit unit) {
this.retryInterval = unit.toMillis(retryInterval);
options.retryInterval(retryInterval, unit);
return this;
}
@Override
public RBatch timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
options.responseTimeout(timeout, unit);
return this;
}
@Override
public BatchResult<?> execute() {
return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic);
}
@Override
public void executeSkipResult() {
executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval, atomic);
}
@Override
public RFuture<Void> executeSkipResultAsync() {
return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval, atomic);
return executorService.execute(options);
}
@Override
public RFuture<BatchResult<?>> executeAsync() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic);
return executorService.executeAsync(options);
}
@Override

@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNode;
import org.redisson.api.MapOptions;
import org.redisson.api.Node;
@ -328,14 +329,19 @@ public class RedissonReactive implements RedissonReactiveClient {
}
@Override
public RBatchReactive createBatch() {
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager);
public RBatchReactive createBatch(BatchOptions options) {
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, options);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
return batch;
}
@Override
public RBatchReactive createBatch() {
return createBatch(BatchOptions.defaults());
}
@Override
public RKeysReactive getKeys() {
return new RedissonKeysReactive(commandExecutor);

@ -0,0 +1,155 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Configuration for Batch.
*
* @author Nikita Koksharov
*
*/
public class BatchOptions {
private long responseTimeout;
private int retryAttempts;
private long retryInterval;
private long syncTimeout;
private int syncSlaves;
private boolean skipResult;
private boolean atomic;
private BatchOptions() {
}
public static BatchOptions defaults() {
return new BatchOptions();
}
public long getResponseTimeout() {
return responseTimeout;
}
/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* <p>
* Default is <code>3000 milliseconds</code>
*
* @param timeout value
* @param unit value
* @return self instance
*/
public BatchOptions responseTimeout(long timeout, TimeUnit unit) {
this.responseTimeout = unit.toMillis(timeout);
return this;
}
public int getRetryAttempts() {
return retryAttempts;
}
/**
* Defines attempts amount to send Redis commands batch
* if it hasn't been sent already.
* <p>
* Default is <code>3 attempts</code>
*
* @param retryAttempts value
* @return self instance
*/
public BatchOptions retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
return this;
}
public long getRetryInterval() {
return retryInterval;
}
/**
* Defines time interval for each attempt to send Redis commands batch
* if it hasn't been sent already.
* <p>
* Default is <code>1500 milliseconds</code>
*
* @param retryInterval - time interval
* @param retryIntervalUnit - time interval unit
* @return self instance
*/
public BatchOptions retryInterval(long retryInterval, TimeUnit retryIntervalUnit) {
this.retryInterval = retryIntervalUnit.toMillis(retryInterval);
return this;
}
/**
* Synchronize write operations execution within defined timeout
* across specified amount of Redis slave nodes.
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves - synchronization timeout
* @param timeout - synchronization timeout
* @param unit - synchronization timeout time unit
* @return self instance
*/
public BatchOptions syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
return this;
}
public long getSyncTimeout() {
return syncTimeout;
}
public int getSyncSlaves() {
return syncSlaves;
}
/**
* Atomically executes all batched commands as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
* @return self instance
*/
public BatchOptions atomic() {
atomic = true;
return this;
}
public boolean isAtomic() {
return atomic;
}
/**
* Inform Redis not to send reply. It may save network traffic.
* <p>
* NOTE: Redis 3.2+ required
*
* @return self instance
*/
public BatchOptions skipResult() {
skipResult = true;
return this;
}
public boolean isSkipResult() {
return skipResult;
}
}

@ -357,6 +357,12 @@ public interface RBatch {
*/
RLexSortedSetAsync getLexSortedSet(String name);
/**
* Returns bitSet instance by name.
*
* @param name - name of object
* @return BitSet object
*/
RBitSetAsync getBitSet(String name);
/**
@ -397,89 +403,39 @@ public interface RBatch {
RFuture<BatchResult<?>> executeAsync();
/*
* Use {@link #skipResult()}
* Use BatchOptions#atomic
*/
@Deprecated
void executeSkipResult();
RBatch atomic();
/*
* Use {@link #skipResult()}
* Use BatchOptions#skipResult
*/
@Deprecated
RFuture<Void> executeSkipResultAsync();
/**
* Atomically executes all batched commands as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
* @return
*/
RBatch atomic();
/**
* Inform Redis not to send reply for this batch.
* Such approach saves network traffic.
* <p>
* NOTE: Redis 3.2+ required
*
* @return self instance
*/
RBatch skipResult();
/**
* Synchronize write operations execution across defined amount
* of Redis slave nodes within defined timeout.
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves amount to sync
* @param timeout for sync operation
* @param unit value
* @return self instance
/*
* Use BatchOptions#syncSlaves
*/
@Deprecated
RBatch syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* <p>
* <code>0</code> value means use <code>Config.setTimeout</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param timeout value
* @param unit value
* @return self instance
/*
* Use BatchOptions#responseTimeout
*/
@Deprecated
RBatch timeout(long timeout, TimeUnit unit);
/**
* Defines time interval for each attempt to send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryInterval</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryInterval value
* @param unit value
* @return self instance
/*
* Use BatchOptions#retryInterval
*/
@Deprecated
RBatch retryInterval(long retryInterval, TimeUnit unit);
/**
* Defines attempts amount to re-send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryAttempts</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryAttempts value
* @return self instance
/*
* Use BatchOptions#retryAttempts
*/
@Deprecated
RBatch retryAttempts(int retryAttempts);
}

@ -251,66 +251,40 @@ public interface RBatchReactive {
*/
Publisher<BatchResult<?>> execute();
/**
* Command replies are skipped such approach saves response bandwidth.
* <p>
* NOTE: Redis 3.2+ required
*
* @return self instance
/*
* Use BatchOptions#atomic
*/
@Deprecated
RBatchReactive atomic();
/*
* Use BatchOptions#skipResult
*/
@Deprecated
RBatchReactive skipResult();
/**
*
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves number to sync
* @param timeout for sync operation
* @param unit value
* @return self instance
/*
* Use BatchOptions#syncSlaves
*/
@Deprecated
RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* <p>
* <code>0</code> value means use <code>Config.setTimeout</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param timeout value
* @param unit value
* @return self instance
/*
* Use BatchOptions#responseTimeout
*/
@Deprecated
RBatchReactive timeout(long timeout, TimeUnit unit);
/**
* Defines time interval for another one attempt send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryInterval</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryInterval value
* @param unit value
* @return self instance
/*
* Use BatchOptions#retryInterval
*/
@Deprecated
RBatchReactive retryInterval(long retryInterval, TimeUnit unit);
/**
* Defines attempts amount to re-send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryAttempts</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryAttempts value
* @return self instance
/*
* Use BatchOptions#retryAttempts
*/
@Deprecated
RBatchReactive retryAttempts(int retryAttempts);
}

@ -902,7 +902,7 @@ public interface RedissonClient {
/**
* Creates transaction with <b>READ_COMMITTED</b> isolation level.
*
* @param options - transaction options
* @param options - transaction configuration
* @return Transaction object
*/
RTransaction createTransaction(TransactionOptions options);
@ -913,8 +913,16 @@ public interface RedissonClient {
* <p>
* See <a href="http://redis.io/topics/pipelining">http://redis.io/topics/pipelining</a>
*
* @param options - batch configuration
* @return Batch object
*/
RBatch createBatch(BatchOptions options);
/*
* Use #createBatch(BatchOptions)
*
*/
@Deprecated
RBatch createBatch();
/**

@ -507,8 +507,15 @@ public interface RedissonReactiveClient {
*
* See <a href="http://redis.io/topics/pipelining">http://redis.io/topics/pipelining</a>
*
* @param options - batch configuration
* @return Batch object
*/
RBatchReactive createBatch(BatchOptions options);
/*
* Use createBatch(BatchOptions)
*/
@Deprecated
RBatchReactive createBatch();
/**

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisAskException;
@ -43,7 +44,6 @@ import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
@ -140,18 +140,18 @@ public class CommandBatchService extends CommandAsyncService {
}
public BatchResult<?> execute() {
RFuture<BatchResult<?>> f = executeAsync(0, 0, false, 0, 0, 0, false);
RFuture<BatchResult<?>> f = executeAsync(BatchOptions.defaults());
return get(f);
}
public BatchResult<?> execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {
RFuture<BatchResult<?>> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
public BatchResult<?> execute(BatchOptions options) {
RFuture<BatchResult<?>> f = executeAsync(options);
return get(f);
}
public RFuture<Void> executeAsyncVoid() {
final RedissonPromise<Void> promise = new RedissonPromise<Void>();
RFuture<BatchResult<?>> res = executeAsync(0, 0, false, 0, 0, 0, false);
RFuture<BatchResult<?>> res = executeAsync(BatchOptions.defaults());
res.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
@ -166,10 +166,10 @@ public class CommandBatchService extends CommandAsyncService {
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, false, 0, 0, 0, false);
return executeAsync(BatchOptions.defaults());
}
public <R> RFuture<R> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {
public <R> RFuture<R> executeAsync(BatchOptions options) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
@ -179,7 +179,7 @@ public class CommandBatchService extends CommandAsyncService {
}
executed = true;
if (atomic) {
if (options.isAtomic()) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.getCommands().addFirst(multiCommand);
@ -188,7 +188,7 @@ public class CommandBatchService extends CommandAsyncService {
}
}
if (skipResult) {
if (options.isSkipResult()) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(offCommand);
@ -197,16 +197,17 @@ public class CommandBatchService extends CommandAsyncService {
}
}
if (syncSlaves > 0) {
if (options.getSyncSlaves() > 0) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT, new Object[] { syncSlaves, syncTimeout }, index.incrementAndGet());
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,
new Object[] { options.getSyncSlaves(), options.getSyncTimeout() }, index.incrementAndGet());
entry.getCommands().add(waitCommand);
}
}
RPromise<R> resultPromise;
final RPromise<Void> voidPromise = new RedissonPromise<Void>();
if (skipResult) {
if (options.isSkipResult()) {
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
@ -272,13 +273,13 @@ public class CommandBatchService extends CommandAsyncService {
}
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval, atomic);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, options);
}
return resultPromise;
}
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
final int attempt, final boolean noResult, final long responseTimeout, final int retryAttempts, final long retryInterval, final boolean atomic) {
final int attempt, final BatchOptions options) {
if (mainPromise.isCancelled()) {
free(entry);
return;
@ -302,8 +303,8 @@ public class CommandBatchService extends CommandAsyncService {
}
final int attempts;
if (retryAttempts > 0) {
attempts = retryAttempts;
if (options.getRetryAttempts() > 0) {
attempts = options.getRetryAttempts();
} else {
attempts = connectionManager.getConfig().getRetryAttempts();
}
@ -374,13 +375,13 @@ public class CommandBatchService extends CommandAsyncService {
int count = attempt + 1;
mainPromise.removeListener(mainPromiseListener);
execute(entry, source, mainPromise, slots, count, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
execute(entry, source, mainPromise, slots, count, options);
}
};
long interval = connectionManager.getConfig().getRetryInterval();
if (retryInterval > 0) {
interval = retryInterval;
if (options.getRetryInterval() > 0) {
interval = options.getRetryInterval();
}
Timeout timeout = connectionManager.newTimeout(retryTimerTask, interval, TimeUnit.MILLISECONDS);
@ -390,7 +391,7 @@ public class CommandBatchService extends CommandAsyncService {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult, responseTimeout, attempts, atomic);
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), options.getResponseTimeout(), attempts, options.isAtomic());
}
});
@ -408,19 +409,19 @@ public class CommandBatchService extends CommandAsyncService {
RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
execute(entry, nodeSource, mainPromise, slots, attempt, options);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors();
NodeSource nodeSource = new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK);
execute(entry, nodeSource, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
execute(entry, nodeSource, mainPromise, slots, attempt, options);
return;
}
if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
execute(entry, source, mainPromise, slots, attempt, options);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
@ -428,7 +429,7 @@ public class CommandBatchService extends CommandAsyncService {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt, noResult, responseTimeout, retryAttempts, retryInterval, atomic);
execute(entry, source, mainPromise, slots, attempt, options);
}
}, 1, TimeUnit.SECONDS);
return;

@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
@ -61,7 +62,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect);
}
public RFuture<BatchResult<?>> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
@ -71,7 +72,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
});
}
return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval, atomic);
return batchService.executeAsync(options);
}
@Override

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive;
@ -55,19 +56,12 @@ public class RedissonBatchReactive implements RBatchReactive {
private final EvictionScheduler evictionScheduler;
private final CommandReactiveBatchService executorService;
private final BatchOptions options;
private long timeout;
private int retryAttempts;
private long retryInterval;
private int syncSlaves;
private long syncTimeout;
private boolean skipResult;
private boolean atomic;
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandReactiveBatchService(connectionManager);
this.options = options;
}
@Override
@ -220,44 +214,43 @@ public class RedissonBatchReactive implements RBatchReactive {
return new NettyFuturePublisher<BatchResult<?>>(new Supplier<RFuture<BatchResult<?>>>() {
@Override
public RFuture<BatchResult<?>> get() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval, atomic);
return executorService.executeAsync(options);
}
});
}
public RBatchReactive atomic() {
this.atomic = true;
options.atomic();
return this;
}
@Override
public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
options.syncSlaves(slaves, timeout, unit);
return this;
}
@Override
public RBatchReactive skipResult() {
this.skipResult = true;
options.skipResult();
return this;
}
@Override
public RBatchReactive retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
options.retryAttempts(retryAttempts);
return this;
}
@Override
public RBatchReactive retryInterval(long retryInterval, TimeUnit unit) {
this.retryInterval = unit.toMillis(retryInterval);
options.retryInterval(retryInterval, unit);
return this;
}
@Override
public RBatchReactive timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
options.responseTimeout(timeout, unit);
return this;
}

@ -32,6 +32,7 @@ import org.redisson.RedissonBatch;
import org.redisson.RedissonLocalCachedMap;
import org.redisson.RedissonObject;
import org.redisson.RedissonTopic;
import org.redisson.api.BatchOptions;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
@ -190,8 +191,14 @@ public class RedissonTransaction implements RTransaction {
}
try {
transactionExecutor.execute(syncSlaves, options.getSyncTimeout(), false,
options.getResponseTimeout(), options.getRetryAttempts(), options.getRetryInterval(), true);
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.atomic();
transactionExecutor.execute(batchOptions);
} catch (Exception e) {
throw new TransactionException("Unable to execute transaction", e);
}
@ -212,7 +219,7 @@ public class RedissonTransaction implements RTransaction {
return;
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager());
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) {
String name = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.TOPIC_SUFFIX);
RTopicAsync<Object> topic = publishBatch.getTopic(name, LocalCachedMessageCodec.INSTANCE);
@ -234,7 +241,7 @@ public class RedissonTransaction implements RTransaction {
}
Map<TransactionalOperation, List<byte[]>> hashes = new HashMap<TransactionalOperation, List<byte[]>>(localCaches.size());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager());
RedissonBatch batch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (TransactionalOperation transactionalOperation : operations) {
if (localCaches.contains(transactionalOperation.getName())) {
MapOperation mapOperation = (MapOperation) transactionalOperation;
@ -281,7 +288,7 @@ public class RedissonTransaction implements RTransaction {
});
}
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager());
RedissonBatch publishBatch = new RedissonBatch(null, commandExecutor.getConnectionManager(), BatchOptions.defaults());
for (final Entry<TransactionalOperation, List<byte[]>> entry : hashes.entrySet()) {
String disabledKeysName = RedissonObject.suffixName(entry.getKey().getName(), RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX);
RMultimapCacheAsync<LocalCachedMapDisabledKey, String> multimap = publishBatch.getListMultimapCache(disabledKeysName, entry.getKey().getCodec());
@ -342,7 +349,7 @@ public class RedissonTransaction implements RTransaction {
}
try {
executorService.execute(0, 0, false, 0, 0, 0, true);
executorService.execute(BatchOptions.defaults());
} catch (Exception e) {
throw new TransactionException("Unable to execute transaction", e);
}

@ -17,6 +17,7 @@ import org.junit.Assume;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
@ -24,8 +25,8 @@ import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
@ -34,13 +35,13 @@ public class RedissonBatchTest extends BaseTest {
// @Test
public void testBatchRedirect() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
for (int i = 0; i < 5; i++) {
batch.getMap("" + i).fastPutAsync("" + i, i);
}
batch.execute();
batch = redisson.createBatch();
batch = redisson.createBatch(BatchOptions.defaults());
for (int i = 0; i < 1; i++) {
batch.getMap("" + i).sizeAsync();
batch.getMap("" + i).containsValueAsync("" + i);
@ -52,11 +53,13 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBigRequestAtomic() {
RBatch batch = redisson.createBatch();
batch.atomic();
batch.timeout(15, TimeUnit.SECONDS);
batch.retryInterval(1, TimeUnit.SECONDS);
batch.retryAttempts(5);
BatchOptions options = BatchOptions.defaults()
.atomic()
.responseTimeout(15, TimeUnit.SECONDS)
.retryInterval(1, TimeUnit.SECONDS)
.retryAttempts(5);
RBatch batch = redisson.createBatch(options);
for (int i = 0; i < 100; i++) {
batch.getBucket("" + i).setAsync(i);
batch.getBucket("" + i).getAsync();
@ -87,13 +90,15 @@ public class RedissonBatchTest extends BaseTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch();
BatchOptions options = BatchOptions.defaults()
.syncSlaves(1, 1, TimeUnit.SECONDS);
RBatch batch = redisson.createBatch(options);
for (int i = 0; i < 100; i++) {
RMapAsync<String, String> map = batch.getMap("test");
map.putAsync("" + i, "" + i);
}
batch.syncSlaves(1, 1, TimeUnit.SECONDS);
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
@ -102,7 +107,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testWriteTimeout() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
for (int i = 0; i < 200000; i++) {
RMapCacheAsync<String, String> map = batch.getMapCache("test");
map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS);
@ -113,13 +118,16 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testSkipResult() {
Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0);
RBatch batch = redisson.createBatch();
BatchOptions options = BatchOptions.defaults()
.skipResult();
RBatch batch = redisson.createBatch(options);
batch.getBucket("A1").setAsync("001");
batch.getBucket("A2").setAsync("001");
batch.getBucket("A3").setAsync("001");
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
batch.skipResult();
batch.execute();
assertThat(redisson.getBucket("A1").isExists()).isFalse();
@ -128,7 +136,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBatchNPE() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
batch.getBucket("A1").setAsync("001");
batch.getBucket("A2").setAsync("001");
batch.getBucket("A3").setAsync("001");
@ -139,8 +147,10 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testAtomic() {
RBatch batch = redisson.createBatch();
batch.atomic();
BatchOptions options = BatchOptions.defaults()
.atomic();
RBatch batch = redisson.createBatch(options);
RFuture<Long> f1 = batch.getAtomicLong("A1").addAndGetAsync(1);
RFuture<Long> f2 = batch.getAtomicLong("A2").addAndGetAsync(2);
RFuture<Long> f3 = batch.getAtomicLong("A3").addAndGetAsync(3);
@ -176,13 +186,15 @@ public class RedissonBatchTest extends BaseTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch();
BatchOptions options = BatchOptions.defaults()
.atomic()
.syncSlaves(1, 1, TimeUnit.SECONDS);
RBatch batch = redisson.createBatch(options);
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGetAsync(i);
}
batch.atomic();
batch.syncSlaves(1, 1, TimeUnit.SECONDS);
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
int i = 0;
@ -196,7 +208,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testDifferentCodecs() {
RBatch b = redisson.createBatch();
RBatch b = redisson.createBatch(BatchOptions.defaults());
b.getMap("test1").putAsync("1", "2");
b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3");
RFuture<Object> val1 = b.getMap("test1").getAsync("1");
@ -209,7 +221,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBatchList() {
RBatch b = redisson.createBatch();
RBatch b = redisson.createBatch(BatchOptions.defaults());
RListAsync<Integer> listAsync = b.getList("list");
for (int i = 1; i < 540; i++) {
listAsync.addAsync(i);
@ -220,7 +232,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBatchBigRequest() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
for (int i = 0; i < 210; i++) {
batch.getMap("test").fastPutAsync("1", "2");
batch.getMap("test").fastPutAsync("2", "3");
@ -234,7 +246,7 @@ public class RedissonBatchTest extends BaseTest {
@Test(expected=RedisException.class)
public void testExceptionHandling() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
batch.getMap("test").putAsync("1", "2");
batch.getScript().evalAsync(Mode.READ_WRITE, "wrong_code", RScript.ReturnType.VALUE);
batch.execute();
@ -242,7 +254,7 @@ public class RedissonBatchTest extends BaseTest {
@Test(expected=IllegalStateException.class)
public void testTwice() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
batch.getMap("test").putAsync("1", "2");
batch.execute();
batch.execute();
@ -251,14 +263,14 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testEmpty() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
batch.execute();
}
@Test
public void testOrdering() throws InterruptedException {
ExecutorService e = Executors.newFixedThreadPool(16);
final RBatch batch = redisson.createBatch();
final RBatch batch = redisson.createBatch(BatchOptions.defaults());
final AtomicLong index = new AtomicLong(-1);
final List<RFuture<Long>> futures = new CopyOnWriteArrayList<>();
for (int i = 0; i < 500; i++) {
@ -292,7 +304,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void test() {
RBatch batch = redisson.createBatch();
RBatch batch = redisson.createBatch(BatchOptions.defaults());
batch.getMap("test").fastPutAsync("1", "2");
batch.getMap("test").fastPutAsync("2", "3");
batch.getMap("test").putAsync("2", "5");

@ -3,6 +3,7 @@ package org.redisson;
import java.util.List;
import static org.junit.Assert.*;
import org.junit.Test;
import org.redisson.api.BatchOptions;
import org.redisson.api.RBatch;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBucket;
@ -35,7 +36,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
@Test
public void testBatch() throws InterruptedException {
RBatchReactive batch = redisson.createBatch();
RBatchReactive batch = redisson.createBatch(BatchOptions.defaults());
RBucketReactive<Object> b1 = batch.getBucket("b1");
RBucketReactive<Object> b2 = batch.getBucket("b2");
RBucketReactive<Object> b3 = batch.getBucket("b3");
@ -44,7 +45,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
b3.set(b1);
sync(batch.execute());
batch = redisson.createBatch();
batch = redisson.createBatch(BatchOptions.defaults());
batch.getBucket("b1").get();
batch.getBucket("b2").get();
batch.getBucket("b3").get();
@ -56,7 +57,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
@Test
public void testReactiveToNormal() throws InterruptedException {
RBatchReactive batch = redisson.createBatch();
RBatchReactive batch = redisson.createBatch(BatchOptions.defaults());
RBucketReactive<Object> b1 = batch.getBucket("b1");
RBucketReactive<Object> b2 = batch.getBucket("b2");
RBucketReactive<Object> b3 = batch.getBucket("b3");
@ -66,7 +67,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
sync(batch.execute());
RedissonClient lredisson = Redisson.create(redisson.getConfig());
RBatch b = lredisson.createBatch();
RBatch b = lredisson.createBatch(BatchOptions.defaults());
b.getBucket("b1").getAsync();
b.getBucket("b2").getAsync();
b.getBucket("b3").getAsync();

Loading…
Cancel
Save