From 1819d3dd19a599a833b2e8073ae52a7d517cba79 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 23 Sep 2016 18:00:24 +0300 Subject: [PATCH] RBatch.executeSkipResult method added #415 --- .../main/java/org/redisson/RedissonBatch.java | 10 ++ .../main/java/org/redisson/api/RBatch.java | 25 +++++ .../client/handler/CommandDecoder.java | 2 +- .../client/protocol/CommandsData.java | 10 ++ .../client/protocol/RedisCommands.java | 1 + .../redisson/command/CommandBatchService.java | 99 +++++++++++-------- .../java/org/redisson/RedissonBatchTest.java | 15 +++ 7 files changed, 121 insertions(+), 41 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index 83983556f..fc7f9c774 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -226,6 +226,16 @@ public class RedissonBatch implements RBatch { return executorService.execute(); } + @Override + public void executeSkipResult() { + executorService.executeSkipResult(); + } + + @Override + public RFuture executeSkipResultAsync() { + return executorService.executeSkipResultAsync(); + } + @Override public RFuture> executeAsync() { return executorService.executeAsync(); diff --git a/redisson/src/main/java/org/redisson/api/RBatch.java b/redisson/src/main/java/org/redisson/api/RBatch.java index 32868333c..3908330ef 100644 --- a/redisson/src/main/java/org/redisson/api/RBatch.java +++ b/redisson/src/main/java/org/redisson/api/RBatch.java @@ -398,4 +398,29 @@ public interface RBatch { */ RFuture> executeAsync(); + /** + * Executes all operations accumulated during async methods invocations, + * but skip command replies + * + * If cluster configuration used then operations are grouped by slot ids + * and may be executed on different servers. Thus command execution order could be changed + * + * @return List with result object for each command + * @throws RedisException in case of any error + * + */ + void executeSkipResult(); + + /** + * Executes all operations accumulated during async methods invocations asynchronously, + * but skip command replies + * + * If cluster configuration used then operations are grouped by slot ids + * and may be executed on different servers. Thus command execution order could be changed + * + * @return List with result object for each command + * @throws RedisException in case of any error + * + */ + RFuture executeSkipResultAsync(); } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 284d612ec..cf98f298c 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -199,7 +199,7 @@ public class CommandDecoder extends ReplayingDecoder { } } - if (i == commandBatch.getCommands().size()) { + if (commandBatch.isNoResult() || i == commandBatch.getCommands().size()) { RPromise promise = commandBatch.getPromise(); if (error != null) { if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index 06dce5992..375e5c9eb 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -29,17 +29,27 @@ public class CommandsData implements QueueCommand { private final List> commands; private final RPromise promise; + private final boolean noResult; public CommandsData(RPromise promise, List> commands) { + this(promise, commands, false); + } + + public CommandsData(RPromise promise, List> commands, boolean noResult) { super(); this.promise = promise; this.commands = commands; + this.noResult = noResult; } public RPromise getPromise() { return promise; } + public boolean isNoResult() { + return noResult; + } + public List> getCommands() { return commands; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 8ecdd8087..f82d510e9 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -85,6 +85,7 @@ public interface RedisCommands { RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor()); RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor()); + RedisStrictCommand CLIENT_REPLY = new RedisStrictCommand("CLIENT", "REPLY", new VoidReplayConvertor()); RedisStrictCommand ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor()); RedisStrictCommand READONLY = new RedisStrictCommand("READONLY", new VoidReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index e4ade39d8..0d3c848ba 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -16,14 +16,15 @@ package org.redisson.command; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.redisson.RedissonReference; import org.redisson.api.RFuture; import org.redisson.client.RedisAskException; import org.redisson.client.RedisConnection; @@ -44,6 +45,7 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonObjectFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -52,18 +54,16 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; -import org.redisson.RedissonReference; -import org.redisson.misc.RedissonObjectFactory; public class CommandBatchService extends CommandReactiveService { public static class Entry { - Collection> commands = new ConcurrentLinkedQueue>(); + Deque> commands = new LinkedBlockingDeque>(); volatile boolean readOnlyMode = true; - public Collection> getCommands() { + public Deque> getCommands() { return commands; } @@ -116,7 +116,9 @@ public class CommandBatchService extends CommandReactiveService { RedissonReference reference = redisson != null ? RedissonObjectFactory.toReference(redisson, params[i]) : RedissonObjectFactory.toReference(redissonReactive, params[i]); - params[i] = reference == null ? params[i] : reference; + if (reference != null) { + params[i] = reference; + } } } BatchCommandData commandData = new BatchCommandData(mainPromise, codec, command, params, index.incrementAndGet()); @@ -128,6 +130,10 @@ public class CommandBatchService extends CommandReactiveService { } public RFuture executeAsyncVoid() { + return executeAsyncVoid(false); + } + + private RFuture executeAsyncVoid(boolean noResult) { if (executed) { throw new IllegalStateException("Batch already executed!"); } @@ -135,6 +141,18 @@ public class CommandBatchService extends CommandReactiveService { if (commands.isEmpty()) { return connectionManager.newSucceededFuture(null); } + + if (noResult) { + for (Entry entry : commands.values()) { + RPromise s = connectionManager.newPromise(); + BatchCommandData commandData = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); + entry.getCommands().addFirst(commandData); + RPromise s1 = connectionManager.newPromise(); + BatchCommandData commandData1 = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet()); + entry.getCommands().add(commandData1); + } + } + executed = true; RPromise voidPromise = connectionManager.newPromise(); @@ -147,11 +165,19 @@ public class CommandBatchService extends CommandReactiveService { AtomicInteger slots = new AtomicInteger(commands.size()); for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0); + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true); } return voidPromise; } - + + public void executeSkipResult() { + get(executeSkipResultAsync()); + } + + public RFuture executeSkipResultAsync() { + return executeAsyncVoid(true); + } + public RFuture> executeAsync() { if (executed) { throw new IllegalStateException("Batch already executed!"); @@ -173,7 +199,7 @@ public class CommandBatchService extends CommandReactiveService { return; } - List entries = new ArrayList(); + List> entries = new ArrayList>(); for (Entry e : commands.values()) { entries.addAll(e.getCommands()); } @@ -196,12 +222,12 @@ public class CommandBatchService extends CommandReactiveService { AtomicInteger slots = new AtomicInteger(commands.size()); for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0); + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false); } return promise; } - public void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, final int attempt) { + private void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, final int attempt, final boolean noResult) { if (mainPromise.isCancelled()) { return; } @@ -257,23 +283,19 @@ public class CommandBatchService extends CommandReactiveService { } int count = attempt + 1; - execute(entry, source, mainPromise, slots, count); + execute(entry, source, mainPromise, slots, count, noResult); } }; Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); details.setTimeout(timeout); - if (connectionFuture.isDone()) { - checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture); - } else { - connectionFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future connFuture) throws Exception { - checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture); - } - }); - } + connectionFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future connFuture) throws Exception { + checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult); + } + }); attemptPromise.addListener(new FutureListener() { @Override @@ -286,18 +308,18 @@ public class CommandBatchService extends CommandReactiveService { if (future.cause() instanceof RedisMovedException) { RedisMovedException ex = (RedisMovedException)future.cause(); entry.clearErrors(); - execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt); + execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt, noResult); return; } if (future.cause() instanceof RedisAskException) { RedisAskException ex = (RedisAskException)future.cause(); entry.clearErrors(); - execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt); + execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt, noResult); return; } if (future.cause() instanceof RedisLoadingException) { entry.clearErrors(); - execute(entry, source, mainPromise, slots, attempt); + execute(entry, source, mainPromise, slots, attempt, noResult); return; } if (future.cause() instanceof RedisTryAgainException) { @@ -305,7 +327,7 @@ public class CommandBatchService extends CommandReactiveService { connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - execute(entry, source, mainPromise, slots, attempt); + execute(entry, source, mainPromise, slots, attempt, noResult); } }, 1, TimeUnit.SECONDS); return; @@ -324,7 +346,7 @@ public class CommandBatchService extends CommandReactiveService { } private void checkWriteFuture(final RPromise attemptPromise, AsyncDetails details, - final RedisConnection connection, ChannelFuture future) { + final RedisConnection connection, ChannelFuture future, boolean noResult) { if (attemptPromise.isDone() || future.isCancelled()) { return; } @@ -347,7 +369,7 @@ public class CommandBatchService extends CommandReactiveService { private void checkConnectionFuture(final Entry entry, final NodeSource source, final RPromise mainPromise, final RPromise attemptPromise, final AsyncDetails details, - RFuture connFuture) { + RFuture connFuture, boolean noResult) { if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { return; } @@ -372,19 +394,16 @@ public class CommandBatchService extends CommandReactiveService { } list.add(c); } - ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult)); details.setWriteFuture(future); - if (details.getWriteFuture().isDone()) { - checkWriteFuture(attemptPromise, details, connection, details.getWriteFuture()); - } else { - details.getWriteFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - checkWriteFuture(attemptPromise, details, connection, future); - } - }); - } + details.getWriteFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + checkWriteFuture(attemptPromise, details, connection, future, noResult); + } + }); releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise, details); } diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 32ebce925..5875e72a0 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -9,6 +9,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.assertj.core.api.Assertions.*; import org.junit.Assert; import org.junit.Test; import org.redisson.api.RBatch; @@ -39,6 +40,20 @@ public class RedissonBatchTest extends BaseTest { System.out.println(t); } + @Test + public void testSkipResult() { + RBatch batch = redisson.createBatch(); + batch.getBucket("A1").setAsync("001"); + batch.getBucket("A2").setAsync("001"); + batch.getBucket("A3").setAsync("001"); + batch.getKeys().deleteAsync("A1"); + batch.getKeys().deleteAsync("A2"); + batch.executeSkipResult(); + + assertThat(redisson.getBucket("A1").isExists()).isFalse(); + assertThat(redisson.getBucket("A3").isExists()).isTrue(); + } + @Test public void testBatchNPE() { RBatch batch = redisson.createBatch();