Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit c03febb578

@ -61,7 +61,7 @@ public class RedissonBatch implements RBatch {
private final BatchOptions options;
public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {
this.executorService = new CommandBatchService(connectionManager);
this.executorService = new CommandBatchService(connectionManager, options);
this.evictionScheduler = evictionScheduler;
this.options = options;
}
@ -264,12 +264,12 @@ public class RedissonBatch implements RBatch {
@Override
public BatchResult<?> execute() {
return executorService.execute(options);
return executorService.execute(BatchOptions.defaults());
}
@Override
public RFuture<BatchResult<?>> executeAsync() {
return executorService.executeAsync(options);
return executorService.executeAsync(BatchOptions.defaults());
}
@Override

@ -37,6 +37,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
@ -370,7 +371,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Boolean> containsAsync(Object o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZSCORE_CONTAINS, getName(), encode(o));
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZSCORE_CONTAINS, getName(), encode(o));
}
@Override
@ -380,7 +381,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<Double> getScoreAsync(V o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZSCORE, getName(), encode(o));
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZSCORE, getName(), encode(o));
}
@Override

@ -18,13 +18,54 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Configuration for Batch.
* Configuration for Batch objecct.
*
* @author Nikita Koksharov
*
*/
public class BatchOptions {
public enum ExecutionMode {
/**
* Store batched invocations in Redis and execute them atomically as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
*/
REDIS_READ_ATOMIC,
/**
* Store batched invocations in Redis and execute them atomically as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
*/
REDIS_WRITE_ATOMIC,
/**
* Store batched invocations in memory on Redisson side and execute them on Redis.
* <p>
* Default mode
*
*/
IN_MEMORY,
/**
* Store batched invocations on Redisson side and executes them atomically on Redis as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
*
*/
IN_MEMORY_ATOMIC,
}
private ExecutionMode executionMode = ExecutionMode.IN_MEMORY;
private long responseTimeout;
private int retryAttempts;
private long retryInterval;
@ -32,7 +73,6 @@ public class BatchOptions {
private long syncTimeout;
private int syncSlaves;
private boolean skipResult;
private boolean atomic;
private BatchOptions() {
}
@ -122,20 +162,14 @@ public class BatchOptions {
}
/**
* Switches batch to atomic mode. Redis atomically executes all commands of this batch as a single command.
* <p>
* Please note, that in cluster mode all objects should be on the same cluster slot.
* https://github.com/antirez/redis/issues/3682
* Use {@link #executionMode(ExecutionMode)} setting instead
*
* @return self instance
*/
@Deprecated
public BatchOptions atomic() {
atomic = true;
executionMode = ExecutionMode.IN_MEMORY_ATOMIC;
return this;
}
public boolean isAtomic() {
return atomic;
}
/**
* Inform Redis not to send reply. This allows to save network traffic for commands with batch with big response.
@ -152,4 +186,27 @@ public class BatchOptions {
return skipResult;
}
/**
* Sets execution mode.
*
* @see ExecutionMode
*
* @param executionMode - batch execution mode
* @return self instance
*/
public BatchOptions executionMode(ExecutionMode executionMode) {
this.executionMode = executionMode;
return this;
}
public ExecutionMode getExecutionMode() {
return executionMode;
}
@Override
public String toString() {
return "BatchOptions [queueStore=" + executionMode + "]";
}
}

@ -15,12 +15,14 @@
*/
package org.redisson.cache;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class LocalCachedMapDisable {
public class LocalCachedMapDisable implements Serializable {
private byte[][] keyHashes;
private long timeout;

@ -15,12 +15,14 @@
*/
package org.redisson.cache;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class LocalCachedMapDisabledKey {
public class LocalCachedMapDisabledKey implements Serializable {
private String requestId;
private long timeout;

@ -15,12 +15,14 @@
*/
package org.redisson.cache;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class LocalCachedMapEnable {
public class LocalCachedMapEnable implements Serializable {
private byte[][] keyHashes;
private String requestId;

@ -199,6 +199,8 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
ThreadLocal<List<CommandData<?, ?>>> commandsData = new ThreadLocal<List<CommandData<?, ?>>>();
private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandsData commandBatch) throws Exception {
int i = state().getBatchIndex();
@ -214,9 +216,22 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|| RedisCommands.EXEC.getName().equals(cmd.getName())
|| RedisCommands.WAIT.getName().equals(cmd.getName())) {
commandData = (CommandData<Object, Object>) commandBatch.getCommands().get(i);
if (RedisCommands.EXEC.getName().equals(cmd.getName())) {
if (commandBatch.getAttachedCommands() != null) {
commandsData.set(commandBatch.getAttachedCommands());
} else {
commandsData.set(commandBatch.getCommands());
}
}
}
decode(in, commandData, null, ctx.channel());
try {
decode(in, commandData, null, ctx.channel());
} finally {
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
commandsData.remove();
}
}
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())
&& commandData.getPromise().isSuccess()) {
@ -230,7 +245,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
Object res = iter.next();
handleResult((CommandData<Object, Object>) command, null, res, false, ctx.channel());
completeResponse((CommandData<Object, Object>) command, res, ctx.channel());
}
if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) {
@ -365,13 +380,33 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
@SuppressWarnings("unchecked")
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts)
throws IOException {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel);
if (state().isMakeCheckpoint()) {
checkpoint();
if (parts == null && commandsData.get() != null) {
List<CommandData<?, ?>> commands = commandsData.get();
for (int i = respParts.size(); i < size; i++) {
int suffix = 0;
if (RedisCommands.MULTI.getName().equals(commands.get(0).getCommand().getName())) {
suffix = 1;
}
CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix);
decode(in, commandData, respParts, channel);
if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) {
data.tryFailure(commandData.cause());
}
if (state().isMakeCheckpoint()) {
checkpoint();
}
}
} else {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel);
if (state().isMakeCheckpoint()) {
checkpoint();
}
}
}
@ -402,9 +437,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts != null) {
parts.add(result);
} else {
if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result));
}
completeResponse(data, result, channel);
}
}
protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result));
}
}
@ -425,9 +464,11 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state());
if (mDecoder != null) {
decoder = mDecoder;
if (multiDecoder != null) {
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state());
if (mDecoder != null) {
decoder = mDecoder;
}
}
}
if (decoder == null) {

@ -43,6 +43,7 @@ import io.netty.util.internal.PlatformDependent;
*/
public class CommandPubSubDecoder extends CommandDecoder {
private static final List<String> MESSAGES = Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe");
// It is not needed to use concurrent map because responses are coming consecutive
private final Map<String, PubSubEntry> entries = new HashMap<String, PubSubEntry>();
private final Map<PubSubKey, CommandData<Object, Object>> commands = PlatformDependent.newConcurrentHashMap();
@ -159,7 +160,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
return null;
}
String command = parts.get(0).toString();
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) {
if (MESSAGES.contains(command)) {
String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = commands.get(key);
@ -173,6 +174,8 @@ public class CommandPubSubDecoder extends CommandDecoder {
} else if (command.equals("pmessage")) {
String patternName = (String) parts.get(1);
return entries.get(patternName).getDecoder();
} else if (command.equals("pong")) {
return null;
}
}

@ -28,20 +28,31 @@ import org.redisson.misc.RPromise;
public class CommandsData implements QueueCommand {
private final List<CommandData<?, ?>> commands;
private final List<CommandData<?, ?>> attachedCommands;
private final RPromise<Void> promise;
private final boolean skipResult;
private final boolean atomic;
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands) {
this(promise, commands, false, false);
this(promise, commands, null);
}
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands) {
this(promise, commands, attachedCommands, false, false);
}
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic) {
this(promise, commands, null, skipResult, atomic);
}
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands, boolean skipResult, boolean atomic) {
super();
this.promise = promise;
this.commands = commands;
this.skipResult = skipResult;
this.atomic = atomic;
this.attachedCommands = attachedCommands;
}
public RPromise<Void> getPromise() {
@ -56,6 +67,10 @@ public class CommandsData implements QueueCommand {
return skipResult;
}
public List<CommandData<?, ?>> getAttachedCommands() {
return attachedCommands;
}
public List<CommandData<?, ?>> getCommands() {
return commands;
}

@ -408,8 +408,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
Set<ClusterPartition> lastPartitions = getLastPartitions();
for (ClusterPartition newPart : newPartitions) {
for (ClusterPartition currentPart : getLastPartitions()) {
for (ClusterPartition currentPart : lastPartitions) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
@ -479,10 +480,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return addedSlaves;
}
private Collection<Integer> slots(Collection<ClusterPartition> partitions) {
Set<Integer> result = new HashSet<Integer>(MAX_SLOT);
private int slotsAmount(Collection<ClusterPartition> partitions) {
int result = 0;
for (ClusterPartition clusterPartition : partitions) {
result.addAll(clusterPartition.getSlots());
result += clusterPartition.getSlots().size();
}
return result;
}
@ -500,9 +501,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private RFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
List<ClusterPartition> newMasters = new ArrayList<ClusterPartition>();
Set<ClusterPartition> lastPartitions = getLastPartitions();
for (final ClusterPartition newPart : newPartitions) {
boolean masterFound = false;
for (ClusterPartition currentPart : getLastPartitions()) {
for (ClusterPartition currentPart : lastPartitions) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
@ -567,13 +569,24 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
Collection<Integer> newPartitionsSlots = slots(newPartitions);
if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
int newSlotsAmount = slotsAmount(newPartitions);
if (newSlotsAmount == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
return;
}
Set<Integer> removedSlots = new HashSet<Integer>(lastPartitions.keySet());
removedSlots.removeAll(newPartitionsSlots);
Set<Integer> removedSlots = new HashSet<Integer>();
for (Integer slot : lastPartitions.keySet()) {
boolean found = false;
for (ClusterPartition clusterPartition : newPartitions) {
if (clusterPartition.getSlots().contains(slot)) {
found = true;
break;
}
}
if (!found) {
removedSlots.add(slot);
}
}
lastPartitions.keySet().removeAll(removedSlots);
if (!removedSlots.isEmpty()) {
log.info("{} slots found to remove", removedSlots.size());
@ -587,9 +600,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
Set<Integer> addedSlots = new HashSet<Integer>(newPartitionsSlots);
addedSlots.removeAll(lastPartitions.keySet());
Set<Integer> addedSlots = new HashSet<Integer>();
for (ClusterPartition clusterPartition : newPartitions) {
for (Integer slot : clusterPartition.getSlots()) {
if (!lastPartitions.containsKey(slot)) {
addedSlots.add(slot);
}
}
}
if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots.size());
}
@ -611,8 +629,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
Set<ClusterPartition> currentPartitions = getLastPartitions();
for (ClusterPartition currentPartition : currentPartitions) {
for (ClusterPartition currentPartition : getLastPartitions()) {
for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())
// skip master change case
@ -750,7 +767,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
super.shutdown();
}
private HashSet<ClusterPartition> getLastPartitions() {
private Set<ClusterPartition> getLastPartitions() {
return new HashSet<ClusterPartition>(lastPartitions.values());
}

@ -90,6 +90,9 @@ public class FstCodec extends BaseCodec {
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
}
};

@ -79,6 +79,9 @@ public class JsonJacksonCodec extends BaseCodec {
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
}
};

@ -0,0 +1,59 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.command;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.api.RFuture;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
/**
*
* @author Nikita Koksharov
*
*/
public class BatchPromise<T> extends RedissonPromise<T> {
private final AtomicBoolean executed;
private final RFuture<Void> sentPromise = new RedissonPromise<Void>();
public BatchPromise(AtomicBoolean executed) {
super();
this.executed = executed;
}
public RFuture<Void> getSentPromise() {
return sentPromise;
}
@Override
public RPromise<T> sync() throws InterruptedException {
if (executed.get()) {
return super.sync();
}
return (RPromise<T>) sentPromise.sync();
}
@Override
public RPromise<T> syncUninterruptibly() {
if (executed.get()) {
return super.syncUninterruptibly();
}
return (RPromise<T>) sentPromise.syncUninterruptibly();
}
}

@ -182,31 +182,35 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return l.await(timeout, timeoutUnit);
}
protected <R> RPromise<R> createPromise() {
return new RedissonPromise<R>();
}
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false);
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
RPromise<R> mainPromise = createPromise();
int slot = connectionManager.calcSlot(name);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false);
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = new RedissonPromise<Collection<R>>();
final RPromise<Collection<R>> mainPromise = createPromise();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size());
@ -239,14 +243,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = new RedissonPromise<R>();
promise.addListener(listener);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null);
}
return mainPromise;
}
@Override
public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object... params) {
final RPromise<R> mainPromise = new RedissonPromise<R>();
final RPromise<R> mainPromise = createPromise();
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
Collections.shuffle(nodes);
@ -277,7 +281,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
MasterSlaveEntry entry = nodes.remove(0);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false, null);
}
@Override
@ -328,7 +332,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : nodes) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null);
}
return mainPromise;
}
@ -347,22 +351,22 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0, false);
async(true, source, codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
RPromise<R> mainPromise = createPromise();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
@ -432,19 +436,19 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (MasterSlaveEntry entry : entries) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true, null);
}
return mainPromise;
}
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
RPromise<R> mainPromise = createPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false);
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null);
return mainPromise;
}
@ -455,15 +459,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = new RedissonPromise<R>();
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0, false);
async(false, source, codec, command, params, mainPromise, 0, false, null);
return mainPromise;
}
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
final boolean ignoreRedirect) {
final boolean ignoreRedirect, final RFuture<RedisConnection> connFuture) {
if (mainPromise.isCancelled()) {
free(params);
return;
@ -492,12 +496,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
final RFuture<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}
final RFuture<RedisConnection> connectionFuture = getConnection(readOnlyMode, source, command);
final RPromise<R> attemptPromise = new RedissonPromise<R>();
details.init(connectionFuture, attemptPromise,
@ -586,7 +585,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
count, details.getCommand(), Arrays.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect, connFuture);
AsyncDetails.release(details);
}
@ -615,22 +614,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
final RedisConnection connection = connFuture.getNow();
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
} else {
if (log.isDebugEnabled()) {
log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
sendCommand(details, connection);
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
@ -651,6 +635,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
}
protected <V> RFuture<RedisConnection> getConnection(final boolean readOnlyMode, final NodeSource source,
final RedisCommand<V> command) {
final RFuture<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, command);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, command);
}
return connectionFuture;
}
protected void free(final Object[] params) {
for (Object obj : params) {
ReferenceCountUtil.safeRelease(obj);
@ -801,7 +796,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
}
private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
protected <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
Future<R> future, final boolean ignoreRedirect) {
details.getTimeout().cancel();
if (future.isCancelled()) {
@ -819,7 +814,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
AsyncDetails.release(details);
return;
}
@ -827,14 +822,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
AsyncDetails.release(details);
return;
}
@ -844,7 +839,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
}
}, 1, TimeUnit.SECONDS);
@ -860,22 +855,30 @@ public class CommandAsyncService implements CommandAsyncExecutor {
((ScanResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
}
if (isRedissonReferenceSupportEnabled()) {
handleReference(details.getMainPromise(), res);
} else {
details.getMainPromise().trySuccess(res);
}
handleSuccess(details.getMainPromise(), details.getCommand(), res);
} else {
details.getMainPromise().tryFailure(future.cause());
handleError(details.getMainPromise(), future.cause());
}
AsyncDetails.release(details);
} catch (RuntimeException e) {
details.getMainPromise().tryFailure(e);
handleError(details.getMainPromise(), e);
throw e;
}
}
protected <R> void handleError(RPromise<R> promise, Throwable cause) {
promise.tryFailure(cause);
}
protected <R> void handleSuccess(RPromise<R> promise, RedisCommand<?> command, R res) {
if (isRedissonReferenceSupportEnabled()) {
handleReference(promise, res);
} else {
promise.trySuccess(res);
}
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
try {
mainPromise.trySuccess(tryHandleReference(res));
@ -1032,4 +1035,23 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return (R) res;
}
}
protected <R, V> void sendCommand(final AsyncDetails<V, R> details, final RedisConnection connection) {
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
} else {
if (log.isDebugEnabled()) {
log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
}
}

@ -16,18 +16,24 @@
package org.redisson.command;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisAskException;
@ -48,9 +54,11 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.AsyncSemaphore;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -67,10 +75,32 @@ import io.netty.util.internal.PlatformDependent;
*/
public class CommandBatchService extends CommandAsyncService {
public static class ConnectionEntry {
boolean firstCommand = true;
RFuture<RedisConnection> connectionFuture;
public RFuture<RedisConnection> getConnectionFuture() {
return connectionFuture;
}
public void setConnectionFuture(RFuture<RedisConnection> connectionFuture) {
this.connectionFuture = connectionFuture;
}
public boolean isFirstCommand() {
return firstCommand;
}
public void setFirstCommand(boolean firstCommand) {
this.firstCommand = firstCommand;
}
}
public static class Entry {
Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<BatchCommandData<?,?>>();
volatile boolean readOnlyMode = true;
public Deque<BatchCommandData<?, ?>> getCommands() {
@ -85,6 +115,7 @@ public class CommandBatchService extends CommandAsyncService {
return readOnlyMode;
}
public void clearErrors() {
for (BatchCommandData<?, ?> commandEntry : commands) {
commandEntry.clearError();
@ -96,47 +127,225 @@ public class CommandBatchService extends CommandAsyncService {
private final AtomicInteger index = new AtomicInteger();
private ConcurrentMap<MasterSlaveEntry, Entry> commands = PlatformDependent.newConcurrentHashMap();
private ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = PlatformDependent.newConcurrentHashMap();
private BatchOptions options;
private Map<RFuture<?>, List<CommandBatchService>> nestedServices = PlatformDependent.newConcurrentHashMap();
private volatile boolean executed;
private AtomicBoolean executed = new AtomicBoolean();
public CommandBatchService(ConnectionManager connectionManager) {
super(connectionManager);
}
public CommandBatchService(ConnectionManager connectionManager, BatchOptions options) {
super(connectionManager);
this.options = options;
}
public void add(RFuture<?> future, List<CommandBatchService> services) {
nestedServices.put(future, services);
}
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
if (executed) {
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
if (executed.get()) {
throw new IllegalStateException("Batch already has been executed!");
}
Entry entry = commands.get(nodeSource.getEntry());
if (nodeSource.getEntry() != null) {
Entry entry = commands.get(nodeSource.getEntry());
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(nodeSource.getEntry(), entry);
if (oldEntry != null) {
entry = oldEntry;
}
}
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
}
if (isRedissonReferenceSupportEnabled()) {
for (int i = 0; i < params.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(connectionManager.getCfg(), params[i]);
if (reference != null) {
params[i] = reference;
}
}
}
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, params, index.incrementAndGet());
entry.getCommands().add(commandData);
}
if (!isRedisBasedQueue()) {
return;
}
if (!readOnlyMode && this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC) {
throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC");
}
super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true, connFuture);
}
AsyncSemaphore semaphore = new AsyncSemaphore(0);
@Override
protected <R> RPromise<R> createPromise() {
if (isRedisBasedQueue()) {
return new BatchPromise<R>(executed);
}
return super.createPromise();
}
@Override
protected <V, R> void releaseConnection(NodeSource source, RFuture<RedisConnection> connectionFuture,
boolean isReadOnly, RPromise<R> attemptPromise, AsyncDetails<V, R> details) {
if (!isRedisBasedQueue() || RedisCommands.EXEC.getName().equals(details.getCommand().getName())) {
super.releaseConnection(source, connectionFuture, isReadOnly, attemptPromise, details);
}
}
@Override
protected <R> void handleSuccess(RPromise<R> promise, RedisCommand<?> command, R res) {
if (RedisCommands.EXEC.getName().equals(command.getName())) {
super.handleSuccess(promise, command, res);
return;
}
if (isRedisBasedQueue()) {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleSuccess(sentPromise, command, res);
semaphore.release();
}
}
@Override
protected <R> void handleError(RPromise<R> promise, Throwable cause) {
if (isRedisBasedQueue() && promise instanceof BatchPromise) {
BatchPromise<R> batchPromise = (BatchPromise<R>) promise;
RPromise<R> sentPromise = (RPromise<R>) batchPromise.getSentPromise();
super.handleError(sentPromise, cause);
semaphore.release();
return;
}
super.handleError(promise, cause);
}
@Override
protected <R, V> void sendCommand(AsyncDetails<V, R> details, RedisConnection connection) {
if (!isRedisBasedQueue()) {
super.sendCommand(details, connection);
return;
}
ConnectionEntry connectionEntry = connections.get(details.getSource().getEntry());
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
if (connectionEntry.isFirstCommand()) {
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.MULTI, new Object[]{}));
connectionEntry.setFirstCommand(false);
}
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list));
details.setWriteFuture(future);
} else {
if (log.isDebugEnabled()) {
log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
}
if (connectionEntry.isFirstCommand()) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.MULTI, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list));
connectionEntry.setFirstCommand(false);
details.setWriteFuture(future);
} else {
if (RedisCommands.EXEC.getName().equals(details.getCommand().getName())) {
Entry entry = commands.get(details.getSource().getEntry());
List<CommandData<?, ?>> list = new LinkedList<CommandData<?, ?>>();
if (options.isSkipResult()) {
// BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
// entry.getCommands().addFirst(offCommand);
// list.add(offCommand);
list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" }));
}
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
if (options.isSkipResult()) {
// BatchCommandData<?, ?> onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
// entry.getCommands().add(onCommand);
// list.add(onCommand);
list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "ON" }));
}
if (options.getSyncSlaves() > 0) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,
new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
list.add(waitCommand);
entry.getCommands().add(waitCommand);
}
RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands())));
details.setWriteFuture(future);
} else {
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
}
}
}
@Override
protected <V> RFuture<RedisConnection> getConnection(boolean readOnlyMode, NodeSource source,
RedisCommand<V> command) {
if (!isRedisBasedQueue()) {
return super.getConnection(readOnlyMode, source, command);
}
ConnectionEntry entry = connections.get(source.getEntry());
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(nodeSource.getEntry(), entry);
entry = new ConnectionEntry();
ConnectionEntry oldEntry = connections.putIfAbsent(source.getEntry(), entry);
if (oldEntry != null) {
entry = oldEntry;
}
}
if (!readOnlyMode) {
entry.setReadOnlyMode(false);
if (entry.getConnectionFuture() != null) {
return entry.getConnectionFuture();
}
if (isRedissonReferenceSupportEnabled()) {
for (int i = 0; i < params.length; i++) {
RedissonReference reference = RedissonObjectFactory.toReference(connectionManager.getCfg(), params[i]);
if (reference != null) {
params[i] = reference;
}
synchronized (this) {
if (entry.getConnectionFuture() != null) {
return entry.getConnectionFuture();
}
RFuture<RedisConnection> connectionFuture;
if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
connectionFuture = connectionManager.connectionWriteOp(source, command);
} else {
connectionFuture = connectionManager.connectionReadOp(source, command);
}
connectionFuture.syncUninterruptibly();
entry.setConnectionFuture(connectionFuture);
return connectionFuture;
}
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, params, index.incrementAndGet());
entry.getCommands().add(commandData);
}
public BatchResult<?> execute() {
@ -170,16 +379,133 @@ public class CommandBatchService extends CommandAsyncService {
}
public <R> RFuture<R> executeAsync(BatchOptions options) {
if (executed) {
if (executed.get()) {
throw new IllegalStateException("Batch already executed!");
}
if (commands.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
executed = true;
if (options.isAtomic()) {
if (this.options == null) {
this.options = options;
}
if (isRedisBasedQueue()) {
int permits = 0;
for (Entry entry : commands.values()) {
permits += entry.getCommands().size();
};
final RPromise<R> resultPromise = new RedissonPromise<R>();
semaphore.acquire(new Runnable() {
@Override
public void run() {
for (Entry entry : commands.values()) {
for (BatchCommandData<?, ?> command : entry.getCommands()) {
if (command.getPromise().isDone() && !command.getPromise().isSuccess()) {
resultPromise.tryFailure(command.getPromise().cause());
break;
}
}
}
if (resultPromise.isDone()) {
return;
}
final RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<Map<MasterSlaveEntry, List<Object>>>();
final Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<MasterSlaveEntry, List<Object>>();
final CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<Map<MasterSlaveEntry, List<Object>>>(mainPromise, result);
listener.setCounter(connections.size());
for (final Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
ConnectionEntry connection = connections.get(entry.getKey());
final RPromise<List<Object>> execPromise = new RedissonPromise<List<Object>>();
async(false, new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, 0, false, connection.getConnectionFuture());
execPromise.addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
mainPromise.tryFailure(future.cause());
return;
}
BatchCommandData<?, Integer> lastCommand = (BatchCommandData<?, Integer>) entry.getValue().getCommands().peekLast();
result.put(entry.getKey(), future.getNow());
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
lastCommand.getPromise().addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> ft) throws Exception {
if (!ft.isSuccess()) {
mainPromise.tryFailure(ft.cause());
return;
}
execPromise.addListener(listener);
}
});
} else {
execPromise.addListener(listener);
}
}
});
}
executed.set(true);
mainPromise.addListener(new FutureListener<Map<MasterSlaveEntry, List<Object>>>() {
@Override
public void operationComplete(Future<Map<MasterSlaveEntry, List<Object>>> future) throws Exception {
if (!future.isSuccess()) {
resultPromise.tryFailure(future.cause());
return;
}
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : future.getNow().entrySet()) {
Entry commandEntry = commands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
RPromise<Object> promise = (RPromise<Object>) data.getPromise();
promise.trySuccess(resultIter.next());
}
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
syncedSlaves += (Integer) commandEntry.getPromise().getNow();
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
}
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
resultPromise.trySuccess((R)result);
commands = null;
}
});
}
}, permits);
return resultPromise;
}
executed.set(true);
if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
entry.getCommands().addFirst(multiCommand);
@ -188,7 +514,7 @@ public class CommandBatchService extends CommandAsyncService {
}
}
if (options.isSkipResult()) {
if (this.options.isSkipResult()) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(offCommand);
@ -197,21 +523,21 @@ public class CommandBatchService extends CommandAsyncService {
}
}
if (options.getSyncSlaves() > 0) {
if (this.options.getSyncSlaves() > 0) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT,
new Object[] { options.getSyncSlaves(), options.getSyncTimeout() }, index.incrementAndGet());
new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet());
entry.getCommands().add(waitCommand);
}
}
RPromise<R> resultPromise;
final RPromise<Void> voidPromise = new RedissonPromise<Void>();
if (options.isSkipResult()) {
if (this.options.isSkipResult()) {
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
commands = null;
// commands = null;
nestedServices.clear();
}
});
@ -273,11 +599,15 @@ public class CommandBatchService extends CommandAsyncService {
}
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, options);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, this.options);
}
return resultPromise;
}
protected boolean isRedisBasedQueue() {
return options != null && (this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC || this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC);
}
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
final int attempt, final BatchOptions options) {
if (mainPromise.isCancelled()) {
@ -335,7 +665,7 @@ public class CommandBatchService extends CommandAsyncService {
if (connectionFuture.isSuccess()) {
if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
if (details.getAttempt() == attempts) {
if (details.getWriteFuture().cancel(false)) {
if (details.getWriteFuture() == null || details.getWriteFuture().cancel(false)) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to send batch after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
}
@ -391,7 +721,8 @@ public class CommandBatchService extends CommandAsyncService {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), options.getResponseTimeout(), attempts, options.isAtomic());
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(),
options.getResponseTimeout(), attempts, options.getExecutionMode() != ExecutionMode.IN_MEMORY);
}
});
@ -500,7 +831,7 @@ public class CommandBatchService extends CommandAsyncService {
final RedisConnection connection = connFuture.getNow();
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
List<CommandData<?, ?>> list = new LinkedList<CommandData<?, ?>>();
if (source.getRedirect() == Redirect.ASK) {
RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));

@ -24,6 +24,7 @@ import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
@ -56,8 +57,8 @@ public class CommandReactiveBatchService extends CommandReactiveService {
@Override
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect);
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect, RFuture<RedisConnection> connFuture) {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture);
}
public RFuture<BatchResult<?>> executeAsync(BatchOptions options) {

@ -1,84 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.executor;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Enumeration;
/**
*
* @author Nikita Koksharov
*
*/
public class ClassLoaderDelegator extends ClassLoader {
private final ThreadLocal<ClassLoader> threadLocalClassLoader = new ThreadLocal<ClassLoader>();
public void setCurrentClassLoader(ClassLoader classLoader) {
threadLocalClassLoader.set(classLoader);
}
public int hashCode() {
return threadLocalClassLoader.get().hashCode();
}
public boolean equals(Object obj) {
return threadLocalClassLoader.get().equals(obj);
}
public String toString() {
return threadLocalClassLoader.get().toString();
}
public Class<?> loadClass(String name) throws ClassNotFoundException {
return threadLocalClassLoader.get().loadClass(name);
}
public URL getResource(String name) {
return threadLocalClassLoader.get().getResource(name);
}
public Enumeration<URL> getResources(String name) throws IOException {
return threadLocalClassLoader.get().getResources(name);
}
public InputStream getResourceAsStream(String name) {
return threadLocalClassLoader.get().getResourceAsStream(name);
}
public void setDefaultAssertionStatus(boolean enabled) {
threadLocalClassLoader.get().setDefaultAssertionStatus(enabled);
}
public void setPackageAssertionStatus(String packageName, boolean enabled) {
threadLocalClassLoader.get().setPackageAssertionStatus(packageName, enabled);
}
public void setClassAssertionStatus(String className, boolean enabled) {
threadLocalClassLoader.get().setClassAssertionStatus(className, enabled);
}
public void clearAssertionStatus() {
threadLocalClassLoader.get().clearAssertionStatus();
}
public void clearCurrentClassLoader() {
threadLocalClassLoader.remove();
}
}

@ -53,10 +53,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class TasksRunnerService implements RemoteExecutorService {
private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator();
private final Codec codec;
private final ClassLoader codecClassLoader;
private final String name;
private final CommandExecutor commandExecutor;
@ -77,12 +74,7 @@ public class TasksRunnerService implements RemoteExecutorService {
this.redisson = redisson;
this.responses = responses;
try {
this.codecClassLoader = codec.getClassLoader();
this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
this.codec = codec;
}
public void setTasksRetryIntervalName(String tasksRetryInterval) {
@ -184,11 +176,10 @@ public class TasksRunnerService implements RemoteExecutorService {
try {
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader);
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl);
Callable<?> callable = decode(buf);
Callable<?> callable = decode(cl, buf);
return callable.call();
} catch (RedissonShutdownException e) {
return null;
@ -248,12 +239,17 @@ public class TasksRunnerService implements RemoteExecutorService {
});
}
@SuppressWarnings("unchecked")
private <T> T decode(ByteBuf buf) throws IOException {
T task = (T) codec.getValueDecoder().decode(buf, null);
Injector.inject(task, redisson);
return task;
private <T> T decode(RedissonClassLoader cl, ByteBuf buf) throws IOException {
try {
Codec codec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl);
T task = (T) codec.getValueDecoder().decode(buf, null);
Injector.inject(task, redisson);
return task;
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
}
@Override
@ -266,11 +262,10 @@ public class TasksRunnerService implements RemoteExecutorService {
try {
buf.writeBytes(state);
RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader);
RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader());
cl.loadClass(className, classBody);
classLoader.setCurrentClassLoader(cl);
Runnable runnable = decode(buf);
Runnable runnable = decode(cl, buf);
runnable.run();
} catch (RedissonShutdownException e) {
// skip
@ -295,8 +290,6 @@ public class TasksRunnerService implements RemoteExecutorService {
* @param requestId
*/
private void finish(String requestId) {
classLoader.clearCurrentClassLoader();
commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);"
+ "if scheduled == false then "

@ -36,7 +36,7 @@ public class CountableListener<T> implements FutureListener<Object> {
}
public CountableListener(RPromise<T> result, T value) {
this(null, null, 0);
this(result, value, 0);
}
public CountableListener(RPromise<T> result, T value, int count) {

@ -28,8 +28,55 @@ import java.util.concurrent.TimeUnit;
*/
public class AsyncSemaphore {
private static class Entry {
private Runnable runnable;
private int permits;
public Entry(Runnable runnable, int permits) {
super();
this.runnable = runnable;
this.permits = permits;
}
public int getPermits() {
return permits;
}
public Runnable getRunnable() {
return runnable;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((runnable == null) ? 0 : runnable.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Entry other = (Entry) obj;
if (runnable == null) {
if (other.runnable != null)
return false;
} else if (!runnable.equals(other.runnable))
return false;
return true;
}
}
private int counter;
private final Set<Runnable> listeners = new LinkedHashSet<Runnable>();
private final Set<Entry> listeners = new LinkedHashSet<Entry>();
public AsyncSemaphore(int permits) {
counter = permits;
@ -75,15 +122,18 @@ public class AsyncSemaphore {
}
public void acquire(Runnable listener) {
acquire(listener, 1);
}
public void acquire(Runnable listener, int permits) {
boolean run = false;
synchronized (this) {
if (counter == 0) {
listeners.add(listener);
if (counter < permits) {
listeners.add(new Entry(listener, permits));
return;
}
if (counter > 0) {
counter--;
} else {
counter -= permits;
run = true;
}
}
@ -95,7 +145,7 @@ public class AsyncSemaphore {
public boolean remove(Runnable listener) {
synchronized (this) {
return listeners.remove(listener);
return listeners.remove(new Entry(listener, 0));
}
}
@ -104,19 +154,22 @@ public class AsyncSemaphore {
}
public void release() {
Runnable runnable = null;
Entry entryToAcquire = null;
synchronized (this) {
counter++;
Iterator<Runnable> iter = listeners.iterator();
Iterator<Entry> iter = listeners.iterator();
if (iter.hasNext()) {
runnable = iter.next();
iter.remove();
Entry entry = iter.next();
if (entry.getPermits() >= counter) {
iter.remove();
entryToAcquire = entry;
}
}
}
if (runnable != null) {
acquire(runnable);
if (entryToAcquire != null) {
acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits());
}
}

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -79,7 +80,7 @@ public class RedissonTransaction implements RTransaction {
private final AtomicBoolean executed = new AtomicBoolean();
private final TransactionOptions options;
private List<TransactionalOperation> operations = new ArrayList<TransactionalOperation>();
private List<TransactionalOperation> operations = new CopyOnWriteArrayList<TransactionalOperation>();
private Set<String> localCaches = new HashSet<String>();
private final long startTime = System.currentTimeMillis();
@ -183,8 +184,9 @@ public class RedissonTransaction implements RTransaction {
checkTimeout();
BatchOptions batchOptions = createOptions();
final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager());
final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.commit(transactionExecutor);
}
@ -209,20 +211,7 @@ public class RedissonTransaction implements RTransaction {
return;
}
int syncSlaves = 0;
if (!commandExecutor.getConnectionManager().isClusterMode()) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next();
syncSlaves = entry.getAvailableClients() - 1;
}
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.atomic();
RFuture<Object> transactionFuture = transactionExecutor.executeAsync(batchOptions);
RFuture<List<?>> transactionFuture = transactionExecutor.executeAsync();
transactionFuture.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
@ -242,6 +231,22 @@ public class RedissonTransaction implements RTransaction {
return result;
}
private BatchOptions createOptions() {
int syncSlaves = 0;
if (!commandExecutor.getConnectionManager().isClusterMode()) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next();
syncSlaves = entry.getAvailableClients() - 1;
}
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.atomic();
return batchOptions;
}
@Override
public void commit() {
commit(localCaches, operations);
@ -252,8 +257,9 @@ public class RedissonTransaction implements RTransaction {
checkTimeout();
BatchOptions batchOptions = createOptions();
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager());
CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions);
for (TransactionalOperation transactionalOperation : operations) {
transactionalOperation.commit(transactionExecutor);
}
@ -268,21 +274,9 @@ public class RedissonTransaction implements RTransaction {
throw e;
}
int syncSlaves = 0;
if (!commandExecutor.getConnectionManager().isClusterMode()) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next();
syncSlaves = entry.getAvailableClients() - 1;
}
try {
BatchOptions batchOptions = BatchOptions.defaults()
.syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS)
.responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS)
.retryAttempts(options.getRetryAttempts())
.retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS)
.atomic();
transactionExecutor.execute(batchOptions);
transactionExecutor.execute();
} catch (Exception e) {
throw new TransactionException("Unable to execute transaction", e);
}
@ -294,6 +288,7 @@ public class RedissonTransaction implements RTransaction {
private void checkTimeout() {
if (options.getTimeout() != -1 && System.currentTimeMillis() - startTime > options.getTimeout()) {
rollbackAsync();
throw new TransactionTimeoutException("Transaction was discarded due to timeout " + options.getTimeout() + " milliseconds");
}
}
@ -582,7 +577,7 @@ public class RedissonTransaction implements RTransaction {
}
try {
executorService.execute(BatchOptions.defaults());
executorService.execute();
} catch (Exception e) {
throw new TransactionException("Unable to rollback transaction", e);
}
@ -601,7 +596,7 @@ public class RedissonTransaction implements RTransaction {
}
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Object> future = executorService.executeAsync(BatchOptions.defaults());
RFuture<List<?>> future = executorService.executeAsync();
future.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {

@ -3,6 +3,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -14,10 +15,14 @@ import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
@ -31,17 +36,40 @@ import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
@RunWith(Parameterized.class)
public class RedissonBatchTest extends BaseTest {
@Parameterized.Parameters(name= "{index} - {0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY)},
{BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)}
});
}
@Parameterized.Parameter(0)
public BatchOptions batchOptions;
@Before
public void before() throws IOException, InterruptedException {
super.before();
if (batchOptions.getExecutionMode() == ExecutionMode.IN_MEMORY) {
batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY);
}
if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) {
batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC);
}
}
// @Test
public void testBatchRedirect() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 5; i++) {
batch.getMap("" + i).fastPutAsync("" + i, i);
}
batch.execute();
batch = redisson.createBatch(BatchOptions.defaults());
batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 1; i++) {
batch.getMap("" + i).sizeAsync();
batch.getMap("" + i).containsValueAsync("" + i);
@ -53,13 +81,13 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBigRequestAtomic() {
BatchOptions options = BatchOptions.defaults()
batchOptions
.atomic()
.responseTimeout(15, TimeUnit.SECONDS)
.retryInterval(1, TimeUnit.SECONDS)
.retryAttempts(5);
RBatch batch = redisson.createBatch(options);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
batch.getBucket("" + i).setAsync(i);
batch.getBucket("" + i).getAsync();
@ -87,19 +115,22 @@ public class RedissonBatchTest extends BaseTest {
Config config = new Config();
config.useClusterServers()
.setTimeout(1000000)
.setRetryInterval(1000000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
BatchOptions options = BatchOptions.defaults()
batchOptions
.syncSlaves(1, 1, TimeUnit.SECONDS);
RBatch batch = redisson.createBatch(options);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 100; i++) {
RMapAsync<String, String> map = batch.getMap("test");
map.putAsync("" + i, "" + i);
}
BatchResult<?> result = batch.execute();
assertThat(result.getResponses()).hasSize(100);
assertThat(result.getSyncedSlaves()).isEqualTo(1);
process.shutdown();
@ -107,7 +138,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testWriteTimeout() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 200000; i++) {
RMapCacheAsync<String, String> map = batch.getMapCache("test");
map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS);
@ -119,10 +150,10 @@ public class RedissonBatchTest extends BaseTest {
public void testSkipResult() {
Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0);
BatchOptions options = BatchOptions.defaults()
batchOptions
.skipResult();
RBatch batch = redisson.createBatch(options);
RBatch batch = redisson.createBatch(batchOptions);
batch.getBucket("A1").setAsync("001");
batch.getBucket("A2").setAsync("001");
batch.getBucket("A3").setAsync("001");
@ -136,7 +167,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBatchNPE() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
batch.getBucket("A1").setAsync("001");
batch.getBucket("A2").setAsync("001");
batch.getBucket("A3").setAsync("001");
@ -147,10 +178,10 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testAtomic() {
BatchOptions options = BatchOptions.defaults()
batchOptions
.atomic();
RBatch batch = redisson.createBatch(options);
RBatch batch = redisson.createBatch(batchOptions);
RFuture<Long> f1 = batch.getAtomicLong("A1").addAndGetAsync(1);
RFuture<Long> f2 = batch.getAtomicLong("A2").addAndGetAsync(2);
RFuture<Long> f3 = batch.getAtomicLong("A3").addAndGetAsync(3);
@ -183,14 +214,15 @@ public class RedissonBatchTest extends BaseTest {
Config config = new Config();
config.useClusterServers()
.setTimeout(123000)
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
BatchOptions options = BatchOptions.defaults()
batchOptions
.atomic()
.syncSlaves(1, 1, TimeUnit.SECONDS);
RBatch batch = redisson.createBatch(options);
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 10; i++) {
batch.getAtomicLong("{test}" + i).addAndGetAsync(i);
}
@ -208,7 +240,20 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testDifferentCodecs() {
RBatch b = redisson.createBatch(BatchOptions.defaults());
RBatch b = redisson.createBatch(batchOptions);
b.getMap("test1").putAsync("1", "2");
b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3");
RFuture<Object> val1 = b.getMap("test1").getAsync("1");
RFuture<Object> val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21");
b.execute();
Assert.assertEquals("2", val1.getNow());
Assert.assertEquals("3", val2.getNow());
}
@Test
public void testDifferentCodecsAtomic() {
RBatch b = redisson.createBatch(batchOptions.atomic());
b.getMap("test1").putAsync("1", "2");
b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3");
RFuture<Object> val1 = b.getMap("test1").getAsync("1");
@ -221,7 +266,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBatchList() {
RBatch b = redisson.createBatch(BatchOptions.defaults());
RBatch b = redisson.createBatch(batchOptions);
RListAsync<Integer> listAsync = b.getList("list");
for (int i = 1; i < 540; i++) {
listAsync.addAsync(i);
@ -232,7 +277,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testBatchBigRequest() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
for (int i = 0; i < 210; i++) {
batch.getMap("test").fastPutAsync("1", "2");
batch.getMap("test").fastPutAsync("2", "3");
@ -246,7 +291,7 @@ public class RedissonBatchTest extends BaseTest {
@Test(expected=RedisException.class)
public void testExceptionHandling() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
batch.getMap("test").putAsync("1", "2");
batch.getScript().evalAsync(Mode.READ_WRITE, "wrong_code", RScript.ReturnType.VALUE);
batch.execute();
@ -254,7 +299,7 @@ public class RedissonBatchTest extends BaseTest {
@Test(expected=IllegalStateException.class)
public void testTwice() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
batch.getMap("test").putAsync("1", "2");
batch.execute();
batch.execute();
@ -263,14 +308,14 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void testEmpty() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
batch.execute();
}
@Test
public void testOrdering() throws InterruptedException {
ExecutorService e = Executors.newFixedThreadPool(16);
final RBatch batch = redisson.createBatch(BatchOptions.defaults());
final RBatch batch = redisson.createBatch(batchOptions);
final AtomicLong index = new AtomicLong(-1);
final List<RFuture<Long>> futures = new CopyOnWriteArrayList<>();
for (int i = 0; i < 500; i++) {
@ -304,7 +349,7 @@ public class RedissonBatchTest extends BaseTest {
@Test
public void test() {
RBatch batch = redisson.createBatch(BatchOptions.defaults());
RBatch batch = redisson.createBatch(batchOptions);
batch.getMap("test").fastPutAsync("1", "2");
batch.getMap("test").fastPutAsync("2", "3");
batch.getMap("test").putAsync("2", "5");

@ -22,7 +22,7 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
@Test
public void testFastPut() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(200);
ExecutorService executor = Executors.newFixedThreadPool(2000);
for (int i = 0; i < 2000; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {

Loading…
Cancel
Save