From 46e8e8846a260999b6706e331b40943de0ffb813 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 13 Jul 2018 17:14:56 +0300 Subject: [PATCH] refactoring --- .../client/handler/CommandDecoder.java | 2 +- .../client/protocol/CommandsData.java | 20 ++++++++++++------- .../redisson/command/CommandAsyncService.java | 2 +- .../redisson/command/CommandBatchService.java | 14 +++++++------ .../java/org/redisson/RedisClientTest.java | 4 ++-- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 4b893c3df..dc6e83f71 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -228,7 +228,7 @@ public class CommandDecoder extends ReplayingDecoder { } try { - decode(in, commandData, null, ctx.channel(), !commandBatch.isAtomic()); + decode(in, commandData, null, ctx.channel(), commandBatch.isQueued()); } finally { if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { commandsData.remove(); diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index 381edd7e4..172a89b07 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -32,33 +32,39 @@ public class CommandsData implements QueueCommand { private final RPromise promise; private final boolean skipResult; private final boolean atomic; + private final boolean queued; - public CommandsData(RPromise promise, List> commands) { - this(promise, commands, null); + public CommandsData(RPromise promise, List> commands, boolean queued) { + this(promise, commands, null, false, false, queued); } public CommandsData(RPromise promise, List> commands, List> attachedCommands) { - this(promise, commands, attachedCommands, false, false); + this(promise, commands, attachedCommands, false, false, true); } - - public CommandsData(RPromise promise, List> commands, boolean skipResult, boolean atomic) { - this(promise, commands, null, skipResult, atomic); + public CommandsData(RPromise promise, List> commands, boolean skipResult, boolean atomic, boolean queued) { + this(promise, commands, null, skipResult, atomic, queued); } - public CommandsData(RPromise promise, List> commands, List> attachedCommands, boolean skipResult, boolean atomic) { + public CommandsData(RPromise promise, List> commands, List> attachedCommands, + boolean skipResult, boolean atomic, boolean queued) { super(); this.promise = promise; this.commands = commands; this.skipResult = skipResult; this.atomic = atomic; this.attachedCommands = attachedCommands; + this.queued = queued; } public RPromise getPromise() { return promise; } + public boolean isQueued() { + return queued; + } + public boolean isAtomic() { return atomic; } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 778a157cb..5cf491ec4 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -1051,7 +1051,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); RPromise main = new RedissonPromise(); - ChannelFuture future = connection.send(new CommandsData(main, list)); + ChannelFuture future = connection.send(new CommandsData(main, list, false)); details.setWriteFuture(future); } else { if (log.isDebugEnabled()) { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 98928c826..8bbfdf195 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -269,7 +269,7 @@ public class CommandBatchService extends CommandAsyncService { } list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); RPromise main = new RedissonPromise(); - ChannelFuture future = connection.send(new CommandsData(main, list)); + ChannelFuture future = connection.send(new CommandsData(main, list, true)); details.setWriteFuture(future); } else { if (log.isDebugEnabled()) { @@ -282,7 +282,7 @@ public class CommandBatchService extends CommandAsyncService { list.add(new CommandData(new RedissonPromise(), details.getCodec(), RedisCommands.MULTI, new Object[]{})); list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); RPromise main = new RedissonPromise(); - ChannelFuture future = connection.send(new CommandsData(main, list)); + ChannelFuture future = connection.send(new CommandsData(main, list, true)); connectionEntry.setFirstCommand(false); details.setWriteFuture(future); } else { @@ -733,7 +733,7 @@ public class CommandBatchService extends CommandAsyncService { @Override public void operationComplete(Future connFuture) throws Exception { checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), - options.getResponseTimeout(), attempts, options.getExecutionMode() != ExecutionMode.IN_MEMORY); + options.getResponseTimeout(), attempts, options.getExecutionMode()); } }); @@ -824,7 +824,7 @@ public class CommandBatchService extends CommandAsyncService { private void checkConnectionFuture(final Entry entry, final NodeSource source, final RPromise mainPromise, final RPromise attemptPromise, final AsyncDetails details, - RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, boolean atomic) { + RFuture connFuture, final boolean noResult, final long responseTimeout, final int attempts, ExecutionMode executionMode) { if (connFuture.isCancelled()) { return; } @@ -841,6 +841,8 @@ public class CommandBatchService extends CommandAsyncService { } final RedisConnection connection = connFuture.getNow(); + boolean isAtomic = executionMode != ExecutionMode.IN_MEMORY; + boolean isQueued = executionMode == ExecutionMode.REDIS_READ_ATOMIC || executionMode == ExecutionMode.REDIS_WRITE_ATOMIC; List> list = new LinkedList>(); if (source.getRedirect() == Redirect.ASK) { @@ -848,14 +850,14 @@ public class CommandBatchService extends CommandAsyncService { list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); } for (BatchCommandData c : entry.getCommands()) { - if (c.getPromise().isSuccess() && !isWaitCommand(c) && !atomic) { + if (c.getPromise().isSuccess() && !isWaitCommand(c) && !isAtomic) { // skip successful commands continue; } list.add(c); } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult, atomic)); + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult, isAtomic, isQueued)); details.setWriteFuture(future); details.getWriteFuture().addListener(new ChannelFutureListener() { diff --git a/redisson/src/test/java/org/redisson/RedisClientTest.java b/redisson/src/test/java/org/redisson/RedisClientTest.java index 8192f1a88..d40907477 100644 --- a/redisson/src/test/java/org/redisson/RedisClientTest.java +++ b/redisson/src/test/java/org/redisson/RedisClientTest.java @@ -148,7 +148,7 @@ public class RedisClientTest { commands.add(cmd4); RPromise p = new RedissonPromise(); - conn.send(new CommandsData(p, commands)); + conn.send(new CommandsData(p, commands, false)); assertThat(cmd1.getPromise().get()).isEqualTo("PONG"); assertThat(cmd2.getPromise().get()).isEqualTo(1); @@ -183,7 +183,7 @@ public class RedisClientTest { } RPromise p = new RedissonPromise(); - conn.send(new CommandsData(p, commands)); + conn.send(new CommandsData(p, commands, false)); for (CommandData commandData : commands) { commandData.getPromise().get();