diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index 496f814bc..bd87bec7d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -61,7 +61,7 @@ public class RedissonBatch implements RBatch { private final BatchOptions options; public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) { - this.executorService = new CommandBatchService(connectionManager); + this.executorService = new CommandBatchService(connectionManager, options); this.evictionScheduler = evictionScheduler; this.options = options; } @@ -264,12 +264,12 @@ public class RedissonBatch implements RBatch { @Override public BatchResult execute() { - return executorService.execute(options); + return executorService.execute(BatchOptions.defaults()); } @Override public RFuture> executeAsync() { - return executorService.executeAsync(options); + return executorService.executeAsync(BatchOptions.defaults()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index ef5044c91..1911a0311 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -37,6 +37,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; @@ -370,7 +371,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc @Override public RFuture containsAsync(Object o) { - return commandExecutor.readAsync(getName(), codec, RedisCommands.ZSCORE_CONTAINS, getName(), encode(o)); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZSCORE_CONTAINS, getName(), encode(o)); } @Override @@ -380,7 +381,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc @Override public RFuture getScoreAsync(V o) { - return commandExecutor.readAsync(getName(), codec, RedisCommands.ZSCORE, getName(), encode(o)); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZSCORE, getName(), encode(o)); } @Override diff --git a/redisson/src/main/java/org/redisson/api/BatchOptions.java b/redisson/src/main/java/org/redisson/api/BatchOptions.java index dfd15d5c3..509fb3857 100644 --- a/redisson/src/main/java/org/redisson/api/BatchOptions.java +++ b/redisson/src/main/java/org/redisson/api/BatchOptions.java @@ -18,13 +18,54 @@ package org.redisson.api; import java.util.concurrent.TimeUnit; /** - * Configuration for Batch. + * Configuration for Batch objecct. * * @author Nikita Koksharov * */ public class BatchOptions { + public enum ExecutionMode { + + /** + * Store batched invocations in Redis and execute them atomically as a single command. + *

+ * Please note, that in cluster mode all objects should be on the same cluster slot. + * https://github.com/antirez/redis/issues/3682 + * + */ + REDIS_READ_ATOMIC, + + /** + * Store batched invocations in Redis and execute them atomically as a single command. + *

+ * Please note, that in cluster mode all objects should be on the same cluster slot. + * https://github.com/antirez/redis/issues/3682 + * + */ + REDIS_WRITE_ATOMIC, + + /** + * Store batched invocations in memory on Redisson side and execute them on Redis. + *

+ * Default mode + * + */ + IN_MEMORY, + + /** + * Store batched invocations on Redisson side and executes them atomically on Redis as a single command. + *

+ * Please note, that in cluster mode all objects should be on the same cluster slot. + * https://github.com/antirez/redis/issues/3682 + * + */ + IN_MEMORY_ATOMIC, + + } + + private ExecutionMode executionMode = ExecutionMode.IN_MEMORY; + private long responseTimeout; private int retryAttempts; private long retryInterval; @@ -32,7 +73,6 @@ public class BatchOptions { private long syncTimeout; private int syncSlaves; private boolean skipResult; - private boolean atomic; private BatchOptions() { } @@ -122,20 +162,14 @@ public class BatchOptions { } /** - * Switches batch to atomic mode. Redis atomically executes all commands of this batch as a single command. - *

- * Please note, that in cluster mode all objects should be on the same cluster slot. - * https://github.com/antirez/redis/issues/3682 + * Use {@link #executionMode(ExecutionMode)} setting instead * - * @return self instance */ + @Deprecated public BatchOptions atomic() { - atomic = true; + executionMode = ExecutionMode.IN_MEMORY_ATOMIC; return this; } - public boolean isAtomic() { - return atomic; - } /** * Inform Redis not to send reply. This allows to save network traffic for commands with batch with big response. @@ -152,4 +186,27 @@ public class BatchOptions { return skipResult; } + /** + * Sets execution mode. + * + * @see ExecutionMode + * + * @param executionMode - batch execution mode + * @return self instance + */ + public BatchOptions executionMode(ExecutionMode executionMode) { + this.executionMode = executionMode; + return this; + } + public ExecutionMode getExecutionMode() { + return executionMode; + } + + @Override + public String toString() { + return "BatchOptions [queueStore=" + executionMode + "]"; + } + + + } diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisable.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisable.java index 22066ff39..a5611cacf 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisable.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisable.java @@ -15,12 +15,14 @@ */ package org.redisson.cache; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class LocalCachedMapDisable { +public class LocalCachedMapDisable implements Serializable { private byte[][] keyHashes; private long timeout; diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisabledKey.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisabledKey.java index 0abb1df9a..b3e5cf673 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisabledKey.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMapDisabledKey.java @@ -15,12 +15,14 @@ */ package org.redisson.cache; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class LocalCachedMapDisabledKey { +public class LocalCachedMapDisabledKey implements Serializable { private String requestId; private long timeout; diff --git a/redisson/src/main/java/org/redisson/cache/LocalCachedMapEnable.java b/redisson/src/main/java/org/redisson/cache/LocalCachedMapEnable.java index b2bcbbdd9..031440968 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCachedMapEnable.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCachedMapEnable.java @@ -15,12 +15,14 @@ */ package org.redisson.cache; +import java.io.Serializable; + /** * * @author Nikita Koksharov * */ -public class LocalCachedMapEnable { +public class LocalCachedMapEnable implements Serializable { private byte[][] keyHashes; private String requestId; diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index e2352e7d7..d41ad631d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -198,6 +198,8 @@ public class CommandDecoder extends ReplayingDecoder { } } } + + ThreadLocal>> commandsData = new ThreadLocal>>(); private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, CommandsData commandBatch) throws Exception { @@ -214,9 +216,22 @@ public class CommandDecoder extends ReplayingDecoder { || RedisCommands.EXEC.getName().equals(cmd.getName()) || RedisCommands.WAIT.getName().equals(cmd.getName())) { commandData = (CommandData) commandBatch.getCommands().get(i); + if (RedisCommands.EXEC.getName().equals(cmd.getName())) { + if (commandBatch.getAttachedCommands() != null) { + commandsData.set(commandBatch.getAttachedCommands()); + } else { + commandsData.set(commandBatch.getCommands()); + } + } } - decode(in, commandData, null, ctx.channel()); + try { + decode(in, commandData, null, ctx.channel()); + } finally { + if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { + commandsData.remove(); + } + } if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName()) && commandData.getPromise().isSuccess()) { @@ -230,7 +245,7 @@ public class CommandDecoder extends ReplayingDecoder { } Object res = iter.next(); - handleResult((CommandData) command, null, res, false, ctx.channel()); + completeResponse((CommandData) command, res, ctx.channel()); } if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) { @@ -365,13 +380,33 @@ public class CommandDecoder extends ReplayingDecoder { } } + @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData data, List parts, Channel channel, long size, List respParts) throws IOException { - for (int i = respParts.size(); i < size; i++) { - decode(in, data, respParts, channel); - if (state().isMakeCheckpoint()) { - checkpoint(); + if (parts == null && commandsData.get() != null) { + List> commands = commandsData.get(); + for (int i = respParts.size(); i < size; i++) { + int suffix = 0; + if (RedisCommands.MULTI.getName().equals(commands.get(0).getCommand().getName())) { + suffix = 1; + } + CommandData commandData = (CommandData) commands.get(i+suffix); + decode(in, commandData, respParts, channel); + if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) { + data.tryFailure(commandData.cause()); + } + + if (state().isMakeCheckpoint()) { + checkpoint(); + } + } + } else { + for (int i = respParts.size(); i < size; i++) { + decode(in, data, respParts, channel); + if (state().isMakeCheckpoint()) { + checkpoint(); + } } } @@ -402,9 +437,13 @@ public class CommandDecoder extends ReplayingDecoder { if (parts != null) { parts.add(result); } else { - if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result)); - } + completeResponse(data, result, channel); + } + } + + protected void completeResponse(CommandData data, Object result, Channel channel) { + if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result)); } } @@ -425,9 +464,11 @@ public class CommandDecoder extends ReplayingDecoder { Decoder decoder = data.getCommand().getReplayDecoder(); if (parts != null) { MultiDecoder multiDecoder = data.getCommand().getReplayMultiDecoder(); - Decoder mDecoder = multiDecoder.getDecoder(parts.size(), state()); - if (mDecoder != null) { - decoder = mDecoder; + if (multiDecoder != null) { + Decoder mDecoder = multiDecoder.getDecoder(parts.size(), state()); + if (mDecoder != null) { + decoder = mDecoder; + } } } if (decoder == null) { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index 0f2253418..ce41802c6 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -43,6 +43,7 @@ import io.netty.util.internal.PlatformDependent; */ public class CommandPubSubDecoder extends CommandDecoder { + private static final List MESSAGES = Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe"); // It is not needed to use concurrent map because responses are coming consecutive private final Map entries = new HashMap(); private final Map> commands = PlatformDependent.newConcurrentHashMap(); @@ -159,7 +160,7 @@ public class CommandPubSubDecoder extends CommandDecoder { return null; } String command = parts.get(0).toString(); - if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) { + if (MESSAGES.contains(command)) { String channelName = parts.get(1).toString(); PubSubKey key = new PubSubKey(channelName, command); CommandData commandData = commands.get(key); @@ -173,6 +174,8 @@ public class CommandPubSubDecoder extends CommandDecoder { } else if (command.equals("pmessage")) { String patternName = (String) parts.get(1); return entries.get(patternName).getDecoder(); + } else if (command.equals("pong")) { + return null; } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index 955e70e51..6f68cc14e 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -28,20 +28,31 @@ import org.redisson.misc.RPromise; public class CommandsData implements QueueCommand { private final List> commands; + private final List> attachedCommands; private final RPromise promise; private final boolean skipResult; private final boolean atomic; public CommandsData(RPromise promise, List> commands) { - this(promise, commands, false, false); + this(promise, commands, null); } + public CommandsData(RPromise promise, List> commands, List> attachedCommands) { + this(promise, commands, attachedCommands, false, false); + } + + public CommandsData(RPromise promise, List> commands, boolean skipResult, boolean atomic) { + this(promise, commands, null, skipResult, atomic); + } + + public CommandsData(RPromise promise, List> commands, List> attachedCommands, boolean skipResult, boolean atomic) { super(); this.promise = promise; this.commands = commands; this.skipResult = skipResult; this.atomic = atomic; + this.attachedCommands = attachedCommands; } public RPromise getPromise() { @@ -56,6 +67,10 @@ public class CommandsData implements QueueCommand { return skipResult; } + public List> getAttachedCommands() { + return attachedCommands; + } + public List> getCommands() { return commands; } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index dd5585e50..083407c32 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -408,8 +408,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } private void checkSlaveNodesChange(Collection newPartitions) { + Set lastPartitions = getLastPartitions(); for (ClusterPartition newPart : newPartitions) { - for (ClusterPartition currentPart : getLastPartitions()) { + for (ClusterPartition currentPart : lastPartitions) { if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { continue; } @@ -479,10 +480,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return addedSlaves; } - private Collection slots(Collection partitions) { - Set result = new HashSet(MAX_SLOT); + private int slotsAmount(Collection partitions) { + int result = 0; for (ClusterPartition clusterPartition : partitions) { - result.addAll(clusterPartition.getSlots()); + result += clusterPartition.getSlots().size(); } return result; } @@ -500,9 +501,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private RFuture checkMasterNodesChange(ClusterServersConfig cfg, Collection newPartitions) { List newMasters = new ArrayList(); + Set lastPartitions = getLastPartitions(); for (final ClusterPartition newPart : newPartitions) { boolean masterFound = false; - for (ClusterPartition currentPart : getLastPartitions()) { + for (ClusterPartition currentPart : lastPartitions) { if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) { continue; } @@ -567,13 +569,24 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions) { - Collection newPartitionsSlots = slots(newPartitions); - if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) { + int newSlotsAmount = slotsAmount(newPartitions); + if (newSlotsAmount == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) { return; } - Set removedSlots = new HashSet(lastPartitions.keySet()); - removedSlots.removeAll(newPartitionsSlots); + Set removedSlots = new HashSet(); + for (Integer slot : lastPartitions.keySet()) { + boolean found = false; + for (ClusterPartition clusterPartition : newPartitions) { + if (clusterPartition.getSlots().contains(slot)) { + found = true; + break; + } + } + if (!found) { + removedSlots.add(slot); + } + } lastPartitions.keySet().removeAll(removedSlots); if (!removedSlots.isEmpty()) { log.info("{} slots found to remove", removedSlots.size()); @@ -587,9 +600,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } - - Set addedSlots = new HashSet(newPartitionsSlots); - addedSlots.removeAll(lastPartitions.keySet()); + Set addedSlots = new HashSet(); + for (ClusterPartition clusterPartition : newPartitions) { + for (Integer slot : clusterPartition.getSlots()) { + if (!lastPartitions.containsKey(slot)) { + addedSlots.add(slot); + } + } + } if (!addedSlots.isEmpty()) { log.info("{} slots found to add", addedSlots.size()); } @@ -611,8 +629,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } private void checkSlotsMigration(Collection newPartitions) { - Set currentPartitions = getLastPartitions(); - for (ClusterPartition currentPartition : currentPartitions) { + for (ClusterPartition currentPartition : getLastPartitions()) { for (ClusterPartition newPartition : newPartitions) { if (!currentPartition.getNodeId().equals(newPartition.getNodeId()) // skip master change case @@ -750,7 +767,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { super.shutdown(); } - private HashSet getLastPartitions() { + private Set getLastPartitions() { return new HashSet(lastPartitions.values()); } diff --git a/redisson/src/main/java/org/redisson/codec/FstCodec.java b/redisson/src/main/java/org/redisson/codec/FstCodec.java index c3b448c90..1404c6ba3 100644 --- a/redisson/src/main/java/org/redisson/codec/FstCodec.java +++ b/redisson/src/main/java/org/redisson/codec/FstCodec.java @@ -90,6 +90,9 @@ public class FstCodec extends BaseCodec { } catch (IOException e) { out.release(); throw e; + } catch (Exception e) { + out.release(); + throw new IOException(e); } } }; diff --git a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java index f8e5145f3..59228e686 100755 --- a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -79,6 +79,9 @@ public class JsonJacksonCodec extends BaseCodec { } catch (IOException e) { out.release(); throw e; + } catch (Exception e) { + out.release(); + throw new IOException(e); } } }; diff --git a/redisson/src/main/java/org/redisson/command/BatchPromise.java b/redisson/src/main/java/org/redisson/command/BatchPromise.java new file mode 100644 index 000000000..c51b7dfab --- /dev/null +++ b/redisson/src/main/java/org/redisson/command/BatchPromise.java @@ -0,0 +1,59 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.command; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.redisson.api.RFuture; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; + +/** + * + * @author Nikita Koksharov + * + */ +public class BatchPromise extends RedissonPromise { + + private final AtomicBoolean executed; + private final RFuture sentPromise = new RedissonPromise(); + + public BatchPromise(AtomicBoolean executed) { + super(); + this.executed = executed; + } + + public RFuture getSentPromise() { + return sentPromise; + } + + @Override + public RPromise sync() throws InterruptedException { + if (executed.get()) { + return super.sync(); + } + return (RPromise) sentPromise.sync(); + } + + @Override + public RPromise syncUninterruptibly() { + if (executed.get()) { + return super.syncUninterruptibly(); + } + return (RPromise) sentPromise.syncUninterruptibly(); + } + +} diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 3a80e2af4..ed5a6623a 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -181,32 +181,36 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); return l.await(timeout, timeoutUnit); } + + protected RPromise createPromise() { + return new RedissonPromise(); + } @Override public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); - async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false); + RPromise mainPromise = createPromise(); + async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false, null); return mainPromise; } @Override public RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); + RPromise mainPromise = createPromise(); int slot = connectionManager.calcSlot(name); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null); return mainPromise; } @Override public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); - async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false); + RPromise mainPromise = createPromise(); + async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false, null); return mainPromise; } @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { - final RPromise> mainPromise = new RedissonPromise>(); + final RPromise> mainPromise = createPromise(); final Collection nodes = connectionManager.getEntrySet(); final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); @@ -239,14 +243,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null); } return mainPromise; } @Override public RFuture readRandomAsync(RedisCommand command, Object... params) { - final RPromise mainPromise = new RedissonPromise(); + final RPromise mainPromise = createPromise(); final List nodes = new ArrayList(connectionManager.getEntrySet()); Collections.shuffle(nodes); @@ -277,7 +281,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); MasterSlaveEntry entry = nodes.remove(0); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false, null); } @Override @@ -328,7 +332,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : nodes) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null); } return mainPromise; } @@ -347,22 +351,22 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); + RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0, false); + async(true, source, codec, command, params, mainPromise, 0, false, null); return mainPromise; } public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); - async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false); + RPromise mainPromise = createPromise(); + async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null); return mainPromise; } @Override public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); - async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false); + RPromise mainPromise = createPromise(); + async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null); return mainPromise; } @@ -432,19 +436,19 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (MasterSlaveEntry entry : entries) { RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true, null); } return mainPromise; } private RFuture evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - RPromise mainPromise = new RedissonPromise(); + RPromise mainPromise = createPromise(); List args = new ArrayList(2 + keys.size() + params.length); args.add(script); args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false); + async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null); return mainPromise; } @@ -455,15 +459,15 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = new RedissonPromise(); + RPromise mainPromise = createPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0, false); + async(false, source, codec, command, params, mainPromise, 0, false, null); return mainPromise; } protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt, - final boolean ignoreRedirect) { + final boolean ignoreRedirect, final RFuture connFuture) { if (mainPromise.isCancelled()) { free(params); return; @@ -492,12 +496,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } - final RFuture connectionFuture; - if (readOnlyMode) { - connectionFuture = connectionManager.connectionReadOp(source, command); - } else { - connectionFuture = connectionManager.connectionWriteOp(source, command); - } + final RFuture connectionFuture = getConnection(readOnlyMode, source, command); final RPromise attemptPromise = new RedissonPromise(); details.init(connectionFuture, attemptPromise, @@ -586,7 +585,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { count, details.getCommand(), Arrays.toString(details.getParams())); } details.removeMainPromiseListener(); - async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect, connFuture); AsyncDetails.release(details); } @@ -615,22 +614,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } final RedisConnection connection = connFuture.getNow(); - if (details.getSource().getRedirect() == Redirect.ASK) { - List> list = new ArrayList>(2); - RPromise promise = new RedissonPromise(); - list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); - list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); - RPromise main = new RedissonPromise(); - ChannelFuture future = connection.send(new CommandsData(main, list)); - details.setWriteFuture(future); - } else { - if (log.isDebugEnabled()) { - log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", - details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection); - } - ChannelFuture future = connection.send(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); - details.setWriteFuture(future); - } + sendCommand(details, connection); details.getWriteFuture().addListener(new ChannelFutureListener() { @Override @@ -651,6 +635,17 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); } + protected RFuture getConnection(final boolean readOnlyMode, final NodeSource source, + final RedisCommand command) { + final RFuture connectionFuture; + if (readOnlyMode) { + connectionFuture = connectionManager.connectionReadOp(source, command); + } else { + connectionFuture = connectionManager.connectionWriteOp(source, command); + } + return connectionFuture; + } + protected void free(final Object[] params) { for (Object obj : params) { ReferenceCountUtil.safeRelease(obj); @@ -801,7 +796,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); } - private void checkAttemptFuture(final NodeSource source, final AsyncDetails details, + protected void checkAttemptFuture(final NodeSource source, final AsyncDetails details, Future future, final boolean ignoreRedirect) { details.getTimeout().cancel(); if (future.isCancelled()) { @@ -819,7 +814,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); AsyncDetails.release(details); return; } @@ -827,14 +822,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (future.cause() instanceof RedisAskException && !ignoreRedirect) { RedisAskException ex = (RedisAskException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); AsyncDetails.release(details); return; } if (future.cause() instanceof RedisLoadingException) { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); AsyncDetails.release(details); return; } @@ -844,7 +839,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public void run(Timeout timeout) throws Exception { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture()); } }, 1, TimeUnit.SECONDS); @@ -860,22 +855,30 @@ public class CommandAsyncService implements CommandAsyncExecutor { ((ScanResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient()); } - if (isRedissonReferenceSupportEnabled()) { - handleReference(details.getMainPromise(), res); - } else { - details.getMainPromise().trySuccess(res); - } + handleSuccess(details.getMainPromise(), details.getCommand(), res); } else { - details.getMainPromise().tryFailure(future.cause()); + handleError(details.getMainPromise(), future.cause()); } AsyncDetails.release(details); } catch (RuntimeException e) { - details.getMainPromise().tryFailure(e); + handleError(details.getMainPromise(), e); throw e; } } + protected void handleError(RPromise promise, Throwable cause) { + promise.tryFailure(cause); + } + + protected void handleSuccess(RPromise promise, RedisCommand command, R res) { + if (isRedissonReferenceSupportEnabled()) { + handleReference(promise, res); + } else { + promise.trySuccess(res); + } + } + private void handleReference(RPromise mainPromise, R res) { try { mainPromise.trySuccess(tryHandleReference(res)); @@ -1032,4 +1035,23 @@ public class CommandAsyncService implements CommandAsyncExecutor { return (R) res; } } + + protected void sendCommand(final AsyncDetails details, final RedisConnection connection) { + if (details.getSource().getRedirect() == Redirect.ASK) { + List> list = new ArrayList>(2); + RPromise promise = new RedissonPromise(); + list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); + list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); + RPromise main = new RedissonPromise(); + ChannelFuture future = connection.send(new CommandsData(main, list)); + details.setWriteFuture(future); + } else { + if (log.isDebugEnabled()) { + log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", + details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection); + } + ChannelFuture future = connection.send(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); + details.setWriteFuture(future); + } + } } diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 732ca9e08..4199aaf37 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -16,18 +16,24 @@ package org.redisson.command; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; import org.redisson.api.BatchOptions; +import org.redisson.api.BatchOptions.ExecutionMode; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.client.RedisAskException; @@ -48,9 +54,11 @@ import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; +import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonPromise; +import org.redisson.pubsub.AsyncSemaphore; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -67,10 +75,32 @@ import io.netty.util.internal.PlatformDependent; */ public class CommandBatchService extends CommandAsyncService { + public static class ConnectionEntry { + + boolean firstCommand = true; + RFuture connectionFuture; + + public RFuture getConnectionFuture() { + return connectionFuture; + } + + public void setConnectionFuture(RFuture connectionFuture) { + this.connectionFuture = connectionFuture; + } + + public boolean isFirstCommand() { + return firstCommand; + } + + public void setFirstCommand(boolean firstCommand) { + this.firstCommand = firstCommand; + } + + } + public static class Entry { Deque> commands = new LinkedBlockingDeque>(); - volatile boolean readOnlyMode = true; public Deque> getCommands() { @@ -85,6 +115,7 @@ public class CommandBatchService extends CommandAsyncService { return readOnlyMode; } + public void clearErrors() { for (BatchCommandData commandEntry : commands) { commandEntry.clearError(); @@ -96,14 +127,22 @@ public class CommandBatchService extends CommandAsyncService { private final AtomicInteger index = new AtomicInteger(); private ConcurrentMap commands = PlatformDependent.newConcurrentHashMap(); + private ConcurrentMap connections = PlatformDependent.newConcurrentHashMap(); + + private BatchOptions options; private Map, List> nestedServices = PlatformDependent.newConcurrentHashMap(); - private volatile boolean executed; + private AtomicBoolean executed = new AtomicBoolean(); public CommandBatchService(ConnectionManager connectionManager) { super(connectionManager); } + + public CommandBatchService(ConnectionManager connectionManager, BatchOptions options) { + super(connectionManager); + this.options = options; + } public void add(RFuture future, List services) { nestedServices.put(future, services); @@ -111,32 +150,202 @@ public class CommandBatchService extends CommandAsyncService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { - if (executed) { + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { + if (executed.get()) { throw new IllegalStateException("Batch already has been executed!"); } - Entry entry = commands.get(nodeSource.getEntry()); + if (nodeSource.getEntry() != null) { + Entry entry = commands.get(nodeSource.getEntry()); + if (entry == null) { + entry = new Entry(); + Entry oldEntry = commands.putIfAbsent(nodeSource.getEntry(), entry); + if (oldEntry != null) { + entry = oldEntry; + } + } + + if (!readOnlyMode) { + entry.setReadOnlyMode(false); + } + if (isRedissonReferenceSupportEnabled()) { + for (int i = 0; i < params.length; i++) { + RedissonReference reference = RedissonObjectFactory.toReference(connectionManager.getCfg(), params[i]); + if (reference != null) { + params[i] = reference; + } + } + } + BatchCommandData commandData = new BatchCommandData(mainPromise, codec, command, params, index.incrementAndGet()); + entry.getCommands().add(commandData); + } + + if (!isRedisBasedQueue()) { + return; + } + + if (!readOnlyMode && this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC) { + throw new IllegalStateException("Data modification commands can't be used with queueStore=REDIS_READ_ATOMIC"); + } + + super.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, true, connFuture); + } + + AsyncSemaphore semaphore = new AsyncSemaphore(0); + + @Override + protected RPromise createPromise() { + if (isRedisBasedQueue()) { + return new BatchPromise(executed); + } + return super.createPromise(); + } + + @Override + protected void releaseConnection(NodeSource source, RFuture connectionFuture, + boolean isReadOnly, RPromise attemptPromise, AsyncDetails details) { + if (!isRedisBasedQueue() || RedisCommands.EXEC.getName().equals(details.getCommand().getName())) { + super.releaseConnection(source, connectionFuture, isReadOnly, attemptPromise, details); + } + } + + @Override + protected void handleSuccess(RPromise promise, RedisCommand command, R res) { + if (RedisCommands.EXEC.getName().equals(command.getName())) { + super.handleSuccess(promise, command, res); + return; + } + + if (isRedisBasedQueue()) { + BatchPromise batchPromise = (BatchPromise) promise; + RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); + super.handleSuccess(sentPromise, command, res); + semaphore.release(); + } + } + + @Override + protected void handleError(RPromise promise, Throwable cause) { + if (isRedisBasedQueue() && promise instanceof BatchPromise) { + BatchPromise batchPromise = (BatchPromise) promise; + RPromise sentPromise = (RPromise) batchPromise.getSentPromise(); + super.handleError(sentPromise, cause); + semaphore.release(); + return; + } + + super.handleError(promise, cause); + } + + @Override + protected void sendCommand(AsyncDetails details, RedisConnection connection) { + if (!isRedisBasedQueue()) { + super.sendCommand(details, connection); + return; + } + + ConnectionEntry connectionEntry = connections.get(details.getSource().getEntry()); + + if (details.getSource().getRedirect() == Redirect.ASK) { + List> list = new ArrayList>(2); + RPromise promise = new RedissonPromise(); + list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); + if (connectionEntry.isFirstCommand()) { + list.add(new CommandData(promise, details.getCodec(), RedisCommands.MULTI, new Object[]{})); + connectionEntry.setFirstCommand(false); + } + list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); + RPromise main = new RedissonPromise(); + ChannelFuture future = connection.send(new CommandsData(main, list)); + details.setWriteFuture(future); + } else { + if (log.isDebugEnabled()) { + log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}", + details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection); + } + + if (connectionEntry.isFirstCommand()) { + List> list = new ArrayList>(2); + list.add(new CommandData(new RedissonPromise(), details.getCodec(), RedisCommands.MULTI, new Object[]{})); + list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); + RPromise main = new RedissonPromise(); + ChannelFuture future = connection.send(new CommandsData(main, list)); + connectionEntry.setFirstCommand(false); + details.setWriteFuture(future); + } else { + if (RedisCommands.EXEC.getName().equals(details.getCommand().getName())) { + Entry entry = commands.get(details.getSource().getEntry()); + + List> list = new LinkedList>(); + + if (options.isSkipResult()) { +// BatchCommandData offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); +// entry.getCommands().addFirst(offCommand); +// list.add(offCommand); + list.add(new CommandData(new RedissonPromise(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "OFF" })); + } + + list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); + + if (options.isSkipResult()) { +// BatchCommandData onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet()); +// entry.getCommands().add(onCommand); +// list.add(onCommand); + list.add(new CommandData(new RedissonPromise(), details.getCodec(), RedisCommands.CLIENT_REPLY, new Object[]{ "ON" })); + } + if (options.getSyncSlaves() > 0) { + BatchCommandData waitCommand = new BatchCommandData(RedisCommands.WAIT, + new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet()); + list.add(waitCommand); + entry.getCommands().add(waitCommand); + } + + RPromise main = new RedissonPromise(); + ChannelFuture future = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands()))); + details.setWriteFuture(future); + } else { + ChannelFuture future = connection.send(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); + details.setWriteFuture(future); + } + } + } + } + + @Override + protected RFuture getConnection(boolean readOnlyMode, NodeSource source, + RedisCommand command) { + if (!isRedisBasedQueue()) { + return super.getConnection(readOnlyMode, source, command); + } + + ConnectionEntry entry = connections.get(source.getEntry()); if (entry == null) { - entry = new Entry(); - Entry oldEntry = commands.putIfAbsent(nodeSource.getEntry(), entry); + entry = new ConnectionEntry(); + ConnectionEntry oldEntry = connections.putIfAbsent(source.getEntry(), entry); if (oldEntry != null) { entry = oldEntry; } } - if (!readOnlyMode) { - entry.setReadOnlyMode(false); + + if (entry.getConnectionFuture() != null) { + return entry.getConnectionFuture(); } - if (isRedissonReferenceSupportEnabled()) { - for (int i = 0; i < params.length; i++) { - RedissonReference reference = RedissonObjectFactory.toReference(connectionManager.getCfg(), params[i]); - if (reference != null) { - params[i] = reference; - } + + synchronized (this) { + if (entry.getConnectionFuture() != null) { + return entry.getConnectionFuture(); } + + RFuture connectionFuture; + if (this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { + connectionFuture = connectionManager.connectionWriteOp(source, command); + } else { + connectionFuture = connectionManager.connectionReadOp(source, command); + } + connectionFuture.syncUninterruptibly(); + entry.setConnectionFuture(connectionFuture); + return connectionFuture; } - BatchCommandData commandData = new BatchCommandData(mainPromise, codec, command, params, index.incrementAndGet()); - entry.getCommands().add(commandData); } public BatchResult execute() { @@ -170,16 +379,133 @@ public class CommandBatchService extends CommandAsyncService { } public RFuture executeAsync(BatchOptions options) { - if (executed) { + if (executed.get()) { throw new IllegalStateException("Batch already executed!"); } if (commands.isEmpty()) { return RedissonPromise.newSucceededFuture(null); } - executed = true; + + if (this.options == null) { + this.options = options; + } + + if (isRedisBasedQueue()) { + int permits = 0; + for (Entry entry : commands.values()) { + permits += entry.getCommands().size(); + }; + + final RPromise resultPromise = new RedissonPromise(); + semaphore.acquire(new Runnable() { + @Override + public void run() { + for (Entry entry : commands.values()) { + for (BatchCommandData command : entry.getCommands()) { + if (command.getPromise().isDone() && !command.getPromise().isSuccess()) { + resultPromise.tryFailure(command.getPromise().cause()); + break; + } + } + } + + if (resultPromise.isDone()) { + return; + } + + final RPromise>> mainPromise = new RedissonPromise>>(); + final Map> result = new ConcurrentHashMap>(); + final CountableListener>> listener = new CountableListener>>(mainPromise, result); + listener.setCounter(connections.size()); + for (final Map.Entry entry : commands.entrySet()) { + ConnectionEntry connection = connections.get(entry.getKey()); + final RPromise> execPromise = new RedissonPromise>(); + async(false, new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, + new Object[] {}, execPromise, 0, false, connection.getConnectionFuture()); + execPromise.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + mainPromise.tryFailure(future.cause()); + return; + } + + BatchCommandData lastCommand = (BatchCommandData) entry.getValue().getCommands().peekLast(); + result.put(entry.getKey(), future.getNow()); + if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { + lastCommand.getPromise().addListener(new FutureListener() { + @Override + public void operationComplete(Future ft) throws Exception { + if (!ft.isSuccess()) { + mainPromise.tryFailure(ft.cause()); + return; + } + + execPromise.addListener(listener); + } + }); + } else { + execPromise.addListener(listener); + } + } + }); + } + + executed.set(true); + + mainPromise.addListener(new FutureListener>>() { + @Override + public void operationComplete(Future>> future) throws Exception { + if (!future.isSuccess()) { + resultPromise.tryFailure(future.cause()); + return; + } + + for (java.util.Map.Entry> entry : future.getNow().entrySet()) { + Entry commandEntry = commands.get(entry.getKey()); + Iterator resultIter = entry.getValue().iterator(); + for (BatchCommandData data : commandEntry.getCommands()) { + if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + break; + } + RPromise promise = (RPromise) data.getPromise(); + promise.trySuccess(resultIter.next()); + } + } + + List entries = new ArrayList(); + for (Entry e : commands.values()) { + entries.addAll(e.getCommands()); + } + Collections.sort(entries); + List responses = new ArrayList(entries.size()); + int syncedSlaves = 0; + for (BatchCommandData commandEntry : entries) { + if (isWaitCommand(commandEntry)) { + syncedSlaves += (Integer) commandEntry.getPromise().getNow(); + } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) + && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) { + Object entryResult = commandEntry.getPromise().getNow(); + entryResult = tryHandleReference(entryResult); + responses.add(entryResult); + } + } + + BatchResult result = new BatchResult(responses, syncedSlaves); + resultPromise.trySuccess((R)result); + + commands = null; + } + }); + } + }, permits); + return resultPromise; + } - if (options.isAtomic()) { + executed.set(true); + + if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) { for (Entry entry : commands.values()) { BatchCommandData multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet()); entry.getCommands().addFirst(multiCommand); @@ -188,7 +514,7 @@ public class CommandBatchService extends CommandAsyncService { } } - if (options.isSkipResult()) { + if (this.options.isSkipResult()) { for (Entry entry : commands.values()) { BatchCommandData offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); entry.getCommands().addFirst(offCommand); @@ -197,21 +523,21 @@ public class CommandBatchService extends CommandAsyncService { } } - if (options.getSyncSlaves() > 0) { + if (this.options.getSyncSlaves() > 0) { for (Entry entry : commands.values()) { BatchCommandData waitCommand = new BatchCommandData(RedisCommands.WAIT, - new Object[] { options.getSyncSlaves(), options.getSyncTimeout() }, index.incrementAndGet()); + new Object[] { this.options.getSyncSlaves(), this.options.getSyncTimeout() }, index.incrementAndGet()); entry.getCommands().add(waitCommand); } } RPromise resultPromise; final RPromise voidPromise = new RedissonPromise(); - if (options.isSkipResult()) { + if (this.options.isSkipResult()) { voidPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - commands = null; +// commands = null; nestedServices.clear(); } }); @@ -273,11 +599,15 @@ public class CommandBatchService extends CommandAsyncService { } for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, options); + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, this.options); } return resultPromise; } + protected boolean isRedisBasedQueue() { + return options != null && (this.options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC || this.options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC); + } + private void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, final int attempt, final BatchOptions options) { if (mainPromise.isCancelled()) { @@ -335,7 +665,7 @@ public class CommandBatchService extends CommandAsyncService { if (connectionFuture.isSuccess()) { if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { if (details.getAttempt() == attempts) { - if (details.getWriteFuture().cancel(false)) { + if (details.getWriteFuture() == null || details.getWriteFuture().cancel(false)) { if (details.getException() == null) { details.setException(new RedisTimeoutException("Unable to send batch after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")); } @@ -391,7 +721,8 @@ public class CommandBatchService extends CommandAsyncService { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), options.getResponseTimeout(), attempts, options.isAtomic()); + checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), + options.getResponseTimeout(), attempts, options.getExecutionMode() != ExecutionMode.IN_MEMORY); } }); @@ -500,7 +831,7 @@ public class CommandBatchService extends CommandAsyncService { final RedisConnection connection = connFuture.getNow(); - List> list = new ArrayList>(entry.getCommands().size() + 1); + List> list = new LinkedList>(); if (source.getRedirect() == Redirect.ASK) { RPromise promise = new RedissonPromise(); list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java index d403b1aa3..80fd30795 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java @@ -24,6 +24,7 @@ import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonReactiveClient; +import org.redisson.client.RedisConnection; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -56,8 +57,8 @@ public class CommandReactiveBatchService extends CommandReactiveService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect, RFuture connFuture) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect, connFuture); } public RFuture> executeAsync(BatchOptions options) { diff --git a/redisson/src/main/java/org/redisson/executor/ClassLoaderDelegator.java b/redisson/src/main/java/org/redisson/executor/ClassLoaderDelegator.java deleted file mode 100644 index 997ae463d..000000000 --- a/redisson/src/main/java/org/redisson/executor/ClassLoaderDelegator.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.executor; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.util.Enumeration; - -/** - * - * @author Nikita Koksharov - * - */ -public class ClassLoaderDelegator extends ClassLoader { - - private final ThreadLocal threadLocalClassLoader = new ThreadLocal(); - - public void setCurrentClassLoader(ClassLoader classLoader) { - threadLocalClassLoader.set(classLoader); - } - - public int hashCode() { - return threadLocalClassLoader.get().hashCode(); - } - - public boolean equals(Object obj) { - return threadLocalClassLoader.get().equals(obj); - } - - public String toString() { - return threadLocalClassLoader.get().toString(); - } - - public Class loadClass(String name) throws ClassNotFoundException { - return threadLocalClassLoader.get().loadClass(name); - } - - public URL getResource(String name) { - return threadLocalClassLoader.get().getResource(name); - } - - public Enumeration getResources(String name) throws IOException { - return threadLocalClassLoader.get().getResources(name); - } - - public InputStream getResourceAsStream(String name) { - return threadLocalClassLoader.get().getResourceAsStream(name); - } - - public void setDefaultAssertionStatus(boolean enabled) { - threadLocalClassLoader.get().setDefaultAssertionStatus(enabled); - } - - public void setPackageAssertionStatus(String packageName, boolean enabled) { - threadLocalClassLoader.get().setPackageAssertionStatus(packageName, enabled); - } - - public void setClassAssertionStatus(String className, boolean enabled) { - threadLocalClassLoader.get().setClassAssertionStatus(className, enabled); - } - - public void clearAssertionStatus() { - threadLocalClassLoader.get().clearAssertionStatus(); - } - - public void clearCurrentClassLoader() { - threadLocalClassLoader.remove(); - } - -} diff --git a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java index e53396e8b..772919cd4 100644 --- a/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java +++ b/redisson/src/main/java/org/redisson/executor/TasksRunnerService.java @@ -53,10 +53,7 @@ import io.netty.util.concurrent.FutureListener; */ public class TasksRunnerService implements RemoteExecutorService { - private final ClassLoaderDelegator classLoader = new ClassLoaderDelegator(); - private final Codec codec; - private final ClassLoader codecClassLoader; private final String name; private final CommandExecutor commandExecutor; @@ -77,12 +74,7 @@ public class TasksRunnerService implements RemoteExecutorService { this.redisson = redisson; this.responses = responses; - try { - this.codecClassLoader = codec.getClassLoader(); - this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); - } catch (Exception e) { - throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); - } + this.codec = codec; } public void setTasksRetryIntervalName(String tasksRetryInterval) { @@ -184,11 +176,10 @@ public class TasksRunnerService implements RemoteExecutorService { try { buf.writeBytes(state); - RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader); + RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader()); cl.loadClass(className, classBody); - classLoader.setCurrentClassLoader(cl); - - Callable callable = decode(buf); + + Callable callable = decode(cl, buf); return callable.call(); } catch (RedissonShutdownException e) { return null; @@ -248,12 +239,17 @@ public class TasksRunnerService implements RemoteExecutorService { }); } - @SuppressWarnings("unchecked") - private T decode(ByteBuf buf) throws IOException { - T task = (T) codec.getValueDecoder().decode(buf, null); - Injector.inject(task, redisson); - return task; + private T decode(RedissonClassLoader cl, ByteBuf buf) throws IOException { + try { + Codec codec = this.codec.getClass().getConstructor(ClassLoader.class).newInstance(cl); + T task = (T) codec.getValueDecoder().decode(buf, null); + Injector.inject(task, redisson); + return task; + } catch (Exception e) { + throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); + } + } @Override @@ -266,11 +262,10 @@ public class TasksRunnerService implements RemoteExecutorService { try { buf.writeBytes(state); - RedissonClassLoader cl = new RedissonClassLoader(codecClassLoader); + RedissonClassLoader cl = new RedissonClassLoader(codec.getClassLoader()); cl.loadClass(className, classBody); - classLoader.setCurrentClassLoader(cl); - Runnable runnable = decode(buf); + Runnable runnable = decode(cl, buf); runnable.run(); } catch (RedissonShutdownException e) { // skip @@ -295,8 +290,6 @@ public class TasksRunnerService implements RemoteExecutorService { * @param requestId */ private void finish(String requestId) { - classLoader.clearCurrentClassLoader(); - commandExecutor.evalWriteAsync(name, StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local scheduled = redis.call('zscore', KEYS[5], ARGV[3]);" + "if scheduled == false then " diff --git a/redisson/src/main/java/org/redisson/misc/CountableListener.java b/redisson/src/main/java/org/redisson/misc/CountableListener.java index 81a4695d9..77c6c5c17 100644 --- a/redisson/src/main/java/org/redisson/misc/CountableListener.java +++ b/redisson/src/main/java/org/redisson/misc/CountableListener.java @@ -36,7 +36,7 @@ public class CountableListener implements FutureListener { } public CountableListener(RPromise result, T value) { - this(null, null, 0); + this(result, value, 0); } public CountableListener(RPromise result, T value, int count) { diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 9994e01f7..6b68e00a5 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -28,8 +28,55 @@ import java.util.concurrent.TimeUnit; */ public class AsyncSemaphore { + private static class Entry { + + private Runnable runnable; + private int permits; + + public Entry(Runnable runnable, int permits) { + super(); + this.runnable = runnable; + this.permits = permits; + } + + public int getPermits() { + return permits; + } + + public Runnable getRunnable() { + return runnable; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((runnable == null) ? 0 : runnable.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Entry other = (Entry) obj; + if (runnable == null) { + if (other.runnable != null) + return false; + } else if (!runnable.equals(other.runnable)) + return false; + return true; + } + + + } + private int counter; - private final Set listeners = new LinkedHashSet(); + private final Set listeners = new LinkedHashSet(); public AsyncSemaphore(int permits) { counter = permits; @@ -75,15 +122,18 @@ public class AsyncSemaphore { } public void acquire(Runnable listener) { + acquire(listener, 1); + } + + public void acquire(Runnable listener, int permits) { boolean run = false; synchronized (this) { - if (counter == 0) { - listeners.add(listener); + if (counter < permits) { + listeners.add(new Entry(listener, permits)); return; - } - if (counter > 0) { - counter--; + } else { + counter -= permits; run = true; } } @@ -95,7 +145,7 @@ public class AsyncSemaphore { public boolean remove(Runnable listener) { synchronized (this) { - return listeners.remove(listener); + return listeners.remove(new Entry(listener, 0)); } } @@ -104,19 +154,22 @@ public class AsyncSemaphore { } public void release() { - Runnable runnable = null; + Entry entryToAcquire = null; synchronized (this) { counter++; - Iterator iter = listeners.iterator(); + Iterator iter = listeners.iterator(); if (iter.hasNext()) { - runnable = iter.next(); - iter.remove(); + Entry entry = iter.next(); + if (entry.getPermits() >= counter) { + iter.remove(); + entryToAcquire = entry; + } } } - if (runnable != null) { - acquire(runnable); + if (entryToAcquire != null) { + acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits()); } } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index 34b55a997..700e071df 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -79,7 +80,7 @@ public class RedissonTransaction implements RTransaction { private final AtomicBoolean executed = new AtomicBoolean(); private final TransactionOptions options; - private List operations = new ArrayList(); + private List operations = new CopyOnWriteArrayList(); private Set localCaches = new HashSet(); private final long startTime = System.currentTimeMillis(); @@ -183,8 +184,9 @@ public class RedissonTransaction implements RTransaction { checkTimeout(); + BatchOptions batchOptions = createOptions(); - final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); + final CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions); for (TransactionalOperation transactionalOperation : operations) { transactionalOperation.commit(transactionExecutor); } @@ -208,21 +210,8 @@ public class RedissonTransaction implements RTransaction { result.tryFailure(e); return; } - - int syncSlaves = 0; - if (!commandExecutor.getConnectionManager().isClusterMode()) { - MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next(); - syncSlaves = entry.getAvailableClients() - 1; - } - - BatchOptions batchOptions = BatchOptions.defaults() - .syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS) - .responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS) - .retryAttempts(options.getRetryAttempts()) - .retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS) - .atomic(); - - RFuture transactionFuture = transactionExecutor.executeAsync(batchOptions); + + RFuture> transactionFuture = transactionExecutor.executeAsync(); transactionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -242,6 +231,22 @@ public class RedissonTransaction implements RTransaction { return result; } + private BatchOptions createOptions() { + int syncSlaves = 0; + if (!commandExecutor.getConnectionManager().isClusterMode()) { + MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next(); + syncSlaves = entry.getAvailableClients() - 1; + } + + BatchOptions batchOptions = BatchOptions.defaults() + .syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS) + .responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS) + .retryAttempts(options.getRetryAttempts()) + .retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS) + .atomic(); + return batchOptions; + } + @Override public void commit() { commit(localCaches, operations); @@ -252,8 +257,9 @@ public class RedissonTransaction implements RTransaction { checkTimeout(); + BatchOptions batchOptions = createOptions(); - CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager()); + CommandBatchService transactionExecutor = new CommandBatchService(commandExecutor.getConnectionManager(), batchOptions); for (TransactionalOperation transactionalOperation : operations) { transactionalOperation.commit(transactionExecutor); } @@ -268,21 +274,9 @@ public class RedissonTransaction implements RTransaction { throw e; } - int syncSlaves = 0; - if (!commandExecutor.getConnectionManager().isClusterMode()) { - MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntrySet().iterator().next(); - syncSlaves = entry.getAvailableClients() - 1; - } - try { - BatchOptions batchOptions = BatchOptions.defaults() - .syncSlaves(syncSlaves, options.getSyncTimeout(), TimeUnit.MILLISECONDS) - .responseTimeout(options.getResponseTimeout(), TimeUnit.MILLISECONDS) - .retryAttempts(options.getRetryAttempts()) - .retryInterval(options.getRetryInterval(), TimeUnit.MILLISECONDS) - .atomic(); - transactionExecutor.execute(batchOptions); + transactionExecutor.execute(); } catch (Exception e) { throw new TransactionException("Unable to execute transaction", e); } @@ -294,6 +288,7 @@ public class RedissonTransaction implements RTransaction { private void checkTimeout() { if (options.getTimeout() != -1 && System.currentTimeMillis() - startTime > options.getTimeout()) { + rollbackAsync(); throw new TransactionTimeoutException("Transaction was discarded due to timeout " + options.getTimeout() + " milliseconds"); } } @@ -582,7 +577,7 @@ public class RedissonTransaction implements RTransaction { } try { - executorService.execute(BatchOptions.defaults()); + executorService.execute(); } catch (Exception e) { throw new TransactionException("Unable to rollback transaction", e); } @@ -601,7 +596,7 @@ public class RedissonTransaction implements RTransaction { } final RPromise result = new RedissonPromise(); - RFuture future = executorService.executeAsync(BatchOptions.defaults()); + RFuture> future = executorService.executeAsync(); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 4dd008f7c..be1530de8 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -3,6 +3,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -14,10 +15,14 @@ import java.util.concurrent.atomic.AtomicLong; import org.junit.Assert; import org.junit.Assume; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.api.BatchOptions; +import org.redisson.api.BatchOptions.ExecutionMode; import org.redisson.api.BatchResult; import org.redisson.api.RBatch; import org.redisson.api.RFuture; @@ -31,17 +36,40 @@ import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; import org.redisson.config.Config; +@RunWith(Parameterized.class) public class RedissonBatchTest extends BaseTest { + @Parameterized.Parameters(name= "{index} - {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] { + {BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY)}, + {BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC)} + }); + } + + @Parameterized.Parameter(0) + public BatchOptions batchOptions; + + @Before + public void before() throws IOException, InterruptedException { + super.before(); + if (batchOptions.getExecutionMode() == ExecutionMode.IN_MEMORY) { + batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.IN_MEMORY); + } + if (batchOptions.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC) { + batchOptions = BatchOptions.defaults().executionMode(ExecutionMode.REDIS_WRITE_ATOMIC); + } + } + // @Test public void testBatchRedirect() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 5; i++) { batch.getMap("" + i).fastPutAsync("" + i, i); } batch.execute(); - batch = redisson.createBatch(BatchOptions.defaults()); + batch = redisson.createBatch(batchOptions); for (int i = 0; i < 1; i++) { batch.getMap("" + i).sizeAsync(); batch.getMap("" + i).containsValueAsync("" + i); @@ -53,13 +81,13 @@ public class RedissonBatchTest extends BaseTest { @Test public void testBigRequestAtomic() { - BatchOptions options = BatchOptions.defaults() + batchOptions .atomic() .responseTimeout(15, TimeUnit.SECONDS) .retryInterval(1, TimeUnit.SECONDS) .retryAttempts(5); - RBatch batch = redisson.createBatch(options); + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 100; i++) { batch.getBucket("" + i).setAsync(i); batch.getBucket("" + i).getAsync(); @@ -87,19 +115,22 @@ public class RedissonBatchTest extends BaseTest { Config config = new Config(); config.useClusterServers() + .setTimeout(1000000) + .setRetryInterval(1000000) .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); RedissonClient redisson = Redisson.create(config); - BatchOptions options = BatchOptions.defaults() + batchOptions .syncSlaves(1, 1, TimeUnit.SECONDS); - RBatch batch = redisson.createBatch(options); + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 100; i++) { RMapAsync map = batch.getMap("test"); map.putAsync("" + i, "" + i); } BatchResult result = batch.execute(); + assertThat(result.getResponses()).hasSize(100); assertThat(result.getSyncedSlaves()).isEqualTo(1); process.shutdown(); @@ -107,7 +138,7 @@ public class RedissonBatchTest extends BaseTest { @Test public void testWriteTimeout() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 200000; i++) { RMapCacheAsync map = batch.getMapCache("test"); map.putAsync("" + i, "" + i, 10, TimeUnit.SECONDS); @@ -119,10 +150,10 @@ public class RedissonBatchTest extends BaseTest { public void testSkipResult() { Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("3.2.0") > 0); - BatchOptions options = BatchOptions.defaults() + batchOptions .skipResult(); - RBatch batch = redisson.createBatch(options); + RBatch batch = redisson.createBatch(batchOptions); batch.getBucket("A1").setAsync("001"); batch.getBucket("A2").setAsync("001"); batch.getBucket("A3").setAsync("001"); @@ -136,7 +167,7 @@ public class RedissonBatchTest extends BaseTest { @Test public void testBatchNPE() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); batch.getBucket("A1").setAsync("001"); batch.getBucket("A2").setAsync("001"); batch.getBucket("A3").setAsync("001"); @@ -147,10 +178,10 @@ public class RedissonBatchTest extends BaseTest { @Test public void testAtomic() { - BatchOptions options = BatchOptions.defaults() + batchOptions .atomic(); - RBatch batch = redisson.createBatch(options); + RBatch batch = redisson.createBatch(batchOptions); RFuture f1 = batch.getAtomicLong("A1").addAndGetAsync(1); RFuture f2 = batch.getAtomicLong("A2").addAndGetAsync(2); RFuture f3 = batch.getAtomicLong("A3").addAndGetAsync(3); @@ -183,14 +214,15 @@ public class RedissonBatchTest extends BaseTest { Config config = new Config(); config.useClusterServers() + .setTimeout(123000) .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); RedissonClient redisson = Redisson.create(config); - BatchOptions options = BatchOptions.defaults() + batchOptions .atomic() .syncSlaves(1, 1, TimeUnit.SECONDS); - RBatch batch = redisson.createBatch(options); + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 10; i++) { batch.getAtomicLong("{test}" + i).addAndGetAsync(i); } @@ -208,7 +240,7 @@ public class RedissonBatchTest extends BaseTest { @Test public void testDifferentCodecs() { - RBatch b = redisson.createBatch(BatchOptions.defaults()); + RBatch b = redisson.createBatch(batchOptions); b.getMap("test1").putAsync("1", "2"); b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3"); RFuture val1 = b.getMap("test1").getAsync("1"); @@ -219,9 +251,22 @@ public class RedissonBatchTest extends BaseTest { Assert.assertEquals("3", val2.getNow()); } + @Test + public void testDifferentCodecsAtomic() { + RBatch b = redisson.createBatch(batchOptions.atomic()); + b.getMap("test1").putAsync("1", "2"); + b.getMap("test2", StringCodec.INSTANCE).putAsync("21", "3"); + RFuture val1 = b.getMap("test1").getAsync("1"); + RFuture val2 = b.getMap("test2", StringCodec.INSTANCE).getAsync("21"); + b.execute(); + + Assert.assertEquals("2", val1.getNow()); + Assert.assertEquals("3", val2.getNow()); + } + @Test public void testBatchList() { - RBatch b = redisson.createBatch(BatchOptions.defaults()); + RBatch b = redisson.createBatch(batchOptions); RListAsync listAsync = b.getList("list"); for (int i = 1; i < 540; i++) { listAsync.addAsync(i); @@ -232,7 +277,7 @@ public class RedissonBatchTest extends BaseTest { @Test public void testBatchBigRequest() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); for (int i = 0; i < 210; i++) { batch.getMap("test").fastPutAsync("1", "2"); batch.getMap("test").fastPutAsync("2", "3"); @@ -246,7 +291,7 @@ public class RedissonBatchTest extends BaseTest { @Test(expected=RedisException.class) public void testExceptionHandling() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); batch.getMap("test").putAsync("1", "2"); batch.getScript().evalAsync(Mode.READ_WRITE, "wrong_code", RScript.ReturnType.VALUE); batch.execute(); @@ -254,7 +299,7 @@ public class RedissonBatchTest extends BaseTest { @Test(expected=IllegalStateException.class) public void testTwice() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); batch.getMap("test").putAsync("1", "2"); batch.execute(); batch.execute(); @@ -263,14 +308,14 @@ public class RedissonBatchTest extends BaseTest { @Test public void testEmpty() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); batch.execute(); } @Test public void testOrdering() throws InterruptedException { ExecutorService e = Executors.newFixedThreadPool(16); - final RBatch batch = redisson.createBatch(BatchOptions.defaults()); + final RBatch batch = redisson.createBatch(batchOptions); final AtomicLong index = new AtomicLong(-1); final List> futures = new CopyOnWriteArrayList<>(); for (int i = 0; i < 500; i++) { @@ -304,7 +349,7 @@ public class RedissonBatchTest extends BaseTest { @Test public void test() { - RBatch batch = redisson.createBatch(BatchOptions.defaults()); + RBatch batch = redisson.createBatch(batchOptions); batch.getMap("test").fastPutAsync("1", "2"); batch.getMap("test").fastPutAsync("2", "3"); batch.getMap("test").putAsync("2", "5"); diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java index e3f71c352..65fe32140 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java @@ -22,7 +22,7 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest { @Test public void testFastPut() throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(200); + ExecutorService executor = Executors.newFixedThreadPool(2000); for (int i = 0; i < 2000; i++) { executor.submit(() -> { for (int j = 0; j < 100; j++) {