diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index 702c878ab..1d4b62446 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -15,10 +15,10 @@ */ package org.redisson; -import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.redisson.api.BatchResult; import org.redisson.api.RAtomicDoubleAsync; import org.redisson.api.RAtomicLongAsync; import org.redisson.api.RBatch; @@ -64,6 +64,10 @@ public class RedissonBatch implements RBatch { private int retryAttempts; private long retryInterval; + private int syncSlaves; + private long syncTimeout; + private boolean skipResult; + protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { this.executorService = new CommandBatchService(connectionManager); this.evictionScheduler = evictionScheduler; @@ -230,6 +234,19 @@ public class RedissonBatch implements RBatch { return new RedissonSetCache(codec, evictionScheduler, executorService, name, null); } + @Override + public RBatch syncSlaves(int slaves, long timeout, TimeUnit unit) { + this.syncSlaves = slaves; + this.syncTimeout = unit.toMillis(timeout); + return this; + } + + @Override + public RBatch skipResult() { + this.skipResult = true; + return this; + } + @Override public RBatch retryAttempts(int retryAttempts) { this.retryAttempts = retryAttempts; @@ -249,25 +266,25 @@ public class RedissonBatch implements RBatch { } @Override - public List execute() { - return executorService.execute(timeout, retryAttempts, retryInterval); + public BatchResult execute() { + return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); } @Override public void executeSkipResult() { - executorService.executeSkipResult(timeout, retryAttempts, retryInterval); + executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval); } @Override public RFuture executeSkipResultAsync() { - return executorService.executeSkipResultAsync(timeout, retryAttempts, retryInterval); + return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval); } @Override - public RFuture> executeAsync() { - return executorService.executeAsync(timeout, retryAttempts, retryInterval); + public RFuture> executeAsync() { + return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); } - + @Override public RMultimapAsync getSetMultimap(String name) { return new RedissonSetMultimap(id, executorService, name); diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java index 7148153f1..d23aa2af7 100644 --- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java +++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.redisson.api.BatchResult; import org.redisson.api.RBatch; import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueueAsync; @@ -295,7 +296,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS timeout = request.getOptions().getExecutionTimeoutInMillis(); } - RFuture> clientsFuture = send(timeout, responseName, + RFuture> clientsFuture = send(timeout, responseName, responseHolder.get()); clientsFuture.addListener(new FutureListener>() { @Override @@ -324,7 +325,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS } } - private RFuture> send(long timeout, String responseName, T response) { + private RFuture> send(long timeout, String responseName, T response) { RBatch batch = redisson.createBatch(); RBlockingQueueAsync queue = batch.getBlockingQueue(responseName, getCodec()); queue.putAsync(response); diff --git a/redisson/src/main/java/org/redisson/api/BatchResult.java b/redisson/src/main/java/org/redisson/api/BatchResult.java new file mode 100644 index 000000000..65a477f00 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/BatchResult.java @@ -0,0 +1,242 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; + +/** + * + * @author Nikita Koksharov + * + */ +public class BatchResult implements List { + + private final List responses; + private final int syncedSlaves; + + public BatchResult(List responses, int syncedSlaves) { + super(); + this.responses = responses; + this.syncedSlaves = syncedSlaves; + } + + /** + * Returns list with result object for each command + * + * @return list + */ + public List getResponses() { + return responses; + } + + /** + * Returns synchronized slaves amount involved during batch execution + * + * @return slaves + */ + public int getSyncedSlaves() { + return syncedSlaves; + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public int size() { + return responses.size(); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean isEmpty() { + return responses.isEmpty(); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean contains(Object o) { + return responses.contains(o); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public Iterator iterator() { + return responses.iterator(); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public Object[] toArray() { + return responses.toArray(); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public T[] toArray(T[] a) { + return responses.toArray(a); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean add(E e) { + return responses.add(e); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean remove(Object o) { + return responses.remove(o); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean containsAll(Collection c) { + return responses.containsAll(c); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean addAll(Collection c) { + return responses.addAll(c); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean addAll(int index, Collection c) { + return responses.addAll(index, c); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean removeAll(Collection c) { + return responses.removeAll(c); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public boolean retainAll(Collection c) { + return responses.retainAll(c); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public void clear() { + responses.clear(); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public E get(int index) { + return responses.get(index); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public E set(int index, E element) { + return responses.set(index, element); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public void add(int index, E element) { + responses.add(index, element); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public E remove(int index) { + return responses.remove(index); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public int indexOf(Object o) { + return responses.indexOf(o); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public int lastIndexOf(Object o) { + return responses.lastIndexOf(o); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public ListIterator listIterator() { + return responses.listIterator(); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public ListIterator listIterator(int index) { + return responses.listIterator(index); + } + + /** + * Use {@link #getResponses()} + */ + @Deprecated + public List subList(int fromIndex, int toIndex) { + return responses.subList(fromIndex, toIndex); + } + + +} diff --git a/redisson/src/main/java/org/redisson/api/RBatch.java b/redisson/src/main/java/org/redisson/api/RBatch.java index 0521a575b..7a62d961d 100644 --- a/redisson/src/main/java/org/redisson/api/RBatch.java +++ b/redisson/src/main/java/org/redisson/api/RBatch.java @@ -15,7 +15,6 @@ */ package org.redisson.api; -import java.util.List; import java.util.concurrent.TimeUnit; import org.redisson.client.RedisException; @@ -387,7 +386,7 @@ public interface RBatch { * @throws RedisException in case of any error * */ - List execute() throws RedisException; + BatchResult execute() throws RedisException; /** * Executes all operations accumulated during async methods invocations asynchronously. @@ -397,36 +396,41 @@ public interface RBatch { * * @return List with result object for each command */ - RFuture> executeAsync(); + RFuture> executeAsync(); /** - * Executes all operations accumulated during async methods invocations. - * Command replies are skipped such approach saves response bandwidth. - *

- * If cluster configuration used then operations are grouped by slot ids - * and may be executed on different servers. Thus command execution order could be changed. - *

- * NOTE: Redis 3.2+ required - * - * @throws RedisException in case of any error - * + * Use {@link #skipResult()} */ + @Deprecated void executeSkipResult(); /** - * Executes all operations accumulated during async methods invocations asynchronously, - * Command replies are skipped such approach saves response bandwidth. - *

- * If cluster configuration used then operations are grouped by slot ids - * and may be executed on different servers. Thus command execution order could be changed + * Use {@link #skipResult()} + */ + @Deprecated + RFuture executeSkipResultAsync(); + + /** + * Inform Redis not to send reply for this batch. + * Such approach saves response bandwidth. *

* NOTE: Redis 3.2+ required * - * @return void - * @throws RedisException in case of any error - * + * @return self instance */ - RFuture executeSkipResultAsync(); + RBatch skipResult(); + + /** + * Synchronize write operations execution across defined amount of Redis slave nodes. + *

+ * NOTE: Redis 3.0+ required + * + * @param slaves amount to sync + * @param timeout for sync operation + * @param unit value + * @return self instance + */ + RBatch syncSlaves(int slaves, long timeout, TimeUnit unit); /** * Defines timeout for Redis response. diff --git a/redisson/src/main/java/org/redisson/api/RBatchReactive.java b/redisson/src/main/java/org/redisson/api/RBatchReactive.java index 89843ae47..4ab1a71d6 100644 --- a/redisson/src/main/java/org/redisson/api/RBatchReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBatchReactive.java @@ -15,7 +15,7 @@ */ package org.redisson.api; -import java.util.List; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; @@ -249,6 +249,68 @@ public interface RBatchReactive { * * @return List with result object for each command */ - Publisher> execute(); + Publisher> execute(); + /** + * Command replies are skipped such approach saves response bandwidth. + *

+ * NOTE: Redis 3.2+ required + * + * @return self instance + */ + RBatchReactive skipResult(); + + /** + * + *

+ * NOTE: Redis 3.0+ required + * + * @param slaves number to sync + * @param timeout for sync operation + * @param unit value + * @return self instance + */ + RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit); + + /** + * Defines timeout for Redis response. + * Starts to countdown when Redis command has been successfully sent. + *

+ * 0 value means use Config.setTimeout value instead. + *

+ * Default is 0 + * + * @param timeout value + * @param unit value + * @return self instance + */ + RBatchReactive timeout(long timeout, TimeUnit unit); + + /** + * Defines time interval for another one attempt send Redis commands batch + * if it hasn't been sent already. + *

+ * 0 value means use Config.setRetryInterval value instead. + *

+ * Default is 0 + * + * @param retryInterval value + * @param unit value + * @return self instance + */ + RBatchReactive retryInterval(long retryInterval, TimeUnit unit); + + /** + * Defines attempts amount to re-send Redis commands batch + * if it hasn't been sent already. + *

+ * 0 value means use Config.setRetryAttempts value instead. + *

+ * Default is 0 + * + * @param retryAttempts value + * @return self instance + */ + RBatchReactive retryAttempts(int retryAttempts); + } diff --git a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java index 23f2ff17b..2e59fbaad 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.client.RedisRedirectException; import org.redisson.client.codec.Codec; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; /** * @@ -33,6 +34,10 @@ public class BatchCommandData extends CommandData implements Compara private final int index; private final AtomicReference redirectError = new AtomicReference(); + public BatchCommandData(RedisCommand command, Object[] params, int index) { + this(new RedissonPromise(), null, command, params, index); + } + public BatchCommandData(RPromise promise, Codec codec, RedisCommand command, Object[] params, int index) { super(promise, codec, command, params); this.index = index; diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index aa15d8c51..9f71d89d3 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -93,6 +93,7 @@ public interface RedisCommands { RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor()); RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor()); + RedisStrictCommand WAIT = new RedisStrictCommand("WAIT", new IntegerReplayConvertor()); RedisStrictCommand CLIENT_REPLY = new RedisStrictCommand("CLIENT", "REPLY", new VoidReplayConvertor()); RedisStrictCommand ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor()); RedisStrictCommand READONLY = new RedisStrictCommand("READONLY", new VoidReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index e83832b21..3a066f512 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -872,7 +872,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } - private T tryHandleReference(T o) { + protected T tryHandleReference(T o) { boolean hasConversion = false; if (o instanceof List) { List r = (List) o; diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index cab87b5a9..ef89d87bc 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; +import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.client.RedisAskException; import org.redisson.client.RedisConnection; @@ -47,6 +48,7 @@ import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; +import org.redisson.misc.RedissonPromise; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -129,114 +131,114 @@ public class CommandBatchService extends CommandAsyncService { entry.getCommands().add(commandData); } - public List execute() { - return get(executeAsync(0, 0, 0)); + public BatchResult execute() { + RFuture> f = executeAsync(0, 0, false, 0, 0, 0); + return get(f); } - public List execute(long responseTimeout, int retryAttempts, long retryInterval) { - return get(executeAsync(responseTimeout, retryAttempts, retryInterval)); + public BatchResult execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) { + RFuture> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval); + return get(f); } public RFuture executeAsyncVoid() { - return executeAsyncVoid(false, 0, 0, 0); + final RedissonPromise promise = new RedissonPromise(); + RFuture> res = executeAsync(0, 0, false, 0, 0, 0); + res.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (future.isSuccess()) { + promise.trySuccess(null); + } else { + promise.tryFailure(future.cause()); + } + } + }); + return promise; + } + + public RFuture> executeAsync() { + return executeAsync(0, 0, false, 0, 0, 0); } - protected RFuture executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) { + public RFuture executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) { if (executed) { throw new IllegalStateException("Batch already executed!"); } - + if (commands.isEmpty()) { - return connectionManager.newSucceededFuture(null); + return RedissonPromise.newSucceededFuture(null); } + executed = true; - if (noResult) { + if (skipResult) { for (Entry entry : commands.values()) { - RPromise s = connectionManager.newPromise(); - BatchCommandData offCommand = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); + BatchCommandData offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet()); entry.getCommands().addFirst(offCommand); - RPromise s1 = connectionManager.newPromise(); - BatchCommandData onCommand = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet()); + BatchCommandData onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet()); entry.getCommands().add(onCommand); } } - executed = true; - - RPromise voidPromise = connectionManager.newPromise(); - voidPromise.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - commands = null; + if (syncSlaves > 0) { + for (Entry entry : commands.values()) { + BatchCommandData waitCommand = new BatchCommandData(RedisCommands.WAIT, new Object[] { syncSlaves, syncTimeout }, index.incrementAndGet()); + entry.getCommands().add(waitCommand); } - }); - - AtomicInteger slots = new AtomicInteger(commands.size()); - for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true, responseTimeout, retryAttempts, retryInterval); - } - return voidPromise; - } - - public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) { - get(executeSkipResultAsync(timeout, retryAttempts, retryInterval)); - } - - public RFuture executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) { - return executeAsyncVoid(true, timeout, retryAttempts, retryInterval); - } - - public RFuture> executeAsync() { - return executeAsync(0, 0, 0); - } - - public RFuture> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) { - if (executed) { - throw new IllegalStateException("Batch already executed!"); } - if (commands.isEmpty()) { - return connectionManager.newSucceededFuture(null); - } - executed = true; - - RPromise voidPromise = connectionManager.newPromise(); - final RPromise> promise = connectionManager.newPromise(); - voidPromise.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); + RPromise resultPromise; + RPromise voidPromise = new RedissonPromise(); + if (skipResult) { + voidPromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { commands = null; - return; - } - - List entries = new ArrayList(); - for (Entry e : commands.values()) { - entries.addAll(e.getCommands()); } - Collections.sort(entries); - List result = new ArrayList(entries.size()); - for (BatchCommandData commandEntry : entries) { - Object entryResult = commandEntry.getPromise().getNow(); - if (isRedissonReferenceSupportEnabled() && entryResult instanceof RedissonReference) { - result.add(redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) entryResult) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) entryResult)); - } else { - result.add(entryResult); + }); + resultPromise = (RPromise) voidPromise; + } else { + final RPromise promise = new RedissonPromise(); + voidPromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + commands = null; + return; + } + + List entries = new ArrayList(); + for (Entry e : commands.values()) { + entries.addAll(e.getCommands()); + } + Collections.sort(entries); + List responses = new ArrayList(entries.size()); + int syncedSlaves = 0; + for (BatchCommandData commandEntry : entries) { + if (!isWaitCommand(commandEntry)) { + Object entryResult = commandEntry.getPromise().getNow(); + entryResult = tryHandleReference(entryResult); + responses.add(entryResult); + } else { + syncedSlaves = (Integer) commandEntry.getPromise().getNow(); + } } + + BatchResult result = new BatchResult(responses, syncedSlaves); + promise.trySuccess(result); + + commands = null; } - promise.trySuccess(result); - commands = null; - } - }); + }); + resultPromise = (RPromise) promise; + } AtomicInteger slots = new AtomicInteger(commands.size()); for (java.util.Map.Entry e : commands.entrySet()) { - execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false, responseTimeout, retryAttempts, retryInterval); + execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval); } - return promise; + return resultPromise; } private void execute(final Entry entry, final NodeSource source, final RPromise mainPromise, final AtomicInteger slots, @@ -252,7 +254,7 @@ public class CommandBatchService extends CommandAsyncService { return; } - final RPromise attemptPromise = connectionManager.newPromise(); + final RPromise attemptPromise = new RedissonPromise(); final AsyncDetails details = new AsyncDetails(); @@ -469,11 +471,11 @@ public class CommandBatchService extends CommandAsyncService { List> list = new ArrayList>(entry.getCommands().size() + 1); if (source.getRedirect() == Redirect.ASK) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); } for (BatchCommandData c : entry.getCommands()) { - if (c.getPromise().isSuccess()) { + if (c.getPromise().isSuccess() && !isWaitCommand(c)) { // skip successful commands continue; } @@ -493,4 +495,8 @@ public class CommandBatchService extends CommandAsyncService { releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise, details); } + protected boolean isWaitCommand(BatchCommandData c) { + return c.getCommand().getName().equals(RedisCommands.WAIT.getName()); + } + } diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java index 88a668917..89daff0a1 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java @@ -15,12 +15,12 @@ */ package org.redisson.command; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; +import org.redisson.api.BatchResult; import org.redisson.api.RFuture; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; @@ -61,43 +61,7 @@ public class CommandReactiveBatchService extends CommandReactiveService { batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt); } - public List execute() { - return get(executeAsync(0, 0, 0)); - } - - public List execute(long responseTimeout, int retryAttempts, long retryInterval) { - return get(executeAsync(responseTimeout, retryAttempts, retryInterval)); - } - - public RFuture executeAsyncVoid() { - return executeAsyncVoid(false, 0, 0, 0); - } - - private RFuture executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) { - for (Publisher publisher : publishers) { - publisher.subscribe(new DefaultSubscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - }); - } - return batchService.executeAsyncVoid(noResult, responseTimeout, retryAttempts, retryInterval); - } - - public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) { - get(executeSkipResultAsync(timeout, retryAttempts, retryInterval)); - } - - public RFuture executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) { - return executeAsyncVoid(true, timeout, retryAttempts, retryInterval); - } - - public RFuture> executeAsync() { - return executeAsync(0, 0, 0); - } - - public RFuture> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) { + public RFuture> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) { for (Publisher publisher : publishers) { publisher.subscribe(new DefaultSubscriber() { @Override @@ -107,7 +71,7 @@ public class CommandReactiveBatchService extends CommandReactiveService { }); } - return batchService.executeAsync(responseTimeout, retryAttempts, retryInterval); + return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 4d053ee60..4b0ad80ad 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -15,9 +15,10 @@ */ package org.redisson.reactive; -import java.util.List; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; +import org.redisson.api.BatchResult; import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RBatchReactive; import org.redisson.api.RBitSetReactive; @@ -55,6 +56,14 @@ public class RedissonBatchReactive implements RBatchReactive { private final EvictionScheduler evictionScheduler; private final CommandReactiveBatchService executorService; + private long timeout; + private int retryAttempts; + private long retryInterval; + + private int syncSlaves; + private long syncTimeout; + private boolean skipResult; + public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { this.evictionScheduler = evictionScheduler; this.executorService = new CommandReactiveBatchService(connectionManager); @@ -206,14 +215,45 @@ public class RedissonBatchReactive implements RBatchReactive { } @Override - public Publisher> execute() { - return new NettyFuturePublisher>(new Supplier>>() { + public Publisher> execute() { + return new NettyFuturePublisher>(new Supplier>>() { @Override - public RFuture> get() { - return executorService.executeAsync(); + public RFuture> get() { + return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval); } }); } + + @Override + public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) { + this.syncSlaves = slaves; + this.syncTimeout = unit.toMillis(timeout); + return this; + } + + @Override + public RBatchReactive skipResult() { + this.skipResult = true; + return this; + } + + @Override + public RBatchReactive retryAttempts(int retryAttempts) { + this.retryAttempts = retryAttempts; + return this; + } + + @Override + public RBatchReactive retryInterval(long retryInterval, TimeUnit unit) { + this.retryInterval = unit.toMillis(retryInterval); + return this; + } + + @Override + public RBatchReactive timeout(long timeout, TimeUnit unit) { + this.timeout = unit.toMillis(timeout); + return this; + } public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) { this.executorService.enableRedissonReferenceSupport(redissonReactive); diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index 80f6950bf..9433491b6 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -2,6 +2,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -14,14 +15,20 @@ import java.util.concurrent.atomic.AtomicLong; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; +import org.redisson.ClusterRunner.ClusterProcesses; +import org.redisson.RedisRunner.FailedToStartRedisException; +import org.redisson.api.BatchResult; import org.redisson.api.RBatch; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; +import org.redisson.api.RMapAsync; import org.redisson.api.RMapCacheAsync; import org.redisson.api.RScript; +import org.redisson.api.RedissonClient; import org.redisson.api.RScript.Mode; import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; +import org.redisson.config.Config; public class RedissonBatchTest extends BaseTest { @@ -42,6 +49,40 @@ public class RedissonBatchTest extends BaseTest { List t = batch.execute(); System.out.println(t); } + + @Test + public void testSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RBatch batch = redisson.createBatch(); + for (int i = 0; i < 100; i++) { + RMapAsync map = batch.getMap("test"); + map.putAsync("" + i, "" + i); + } + + batch.syncSlaves(1, 1, TimeUnit.SECONDS); + BatchResult result = batch.execute(); + assertThat(result.getSyncedSlaves()).isEqualTo(1); + + process.shutdown(); + } @Test public void testWriteTimeout() { @@ -62,7 +103,8 @@ public class RedissonBatchTest extends BaseTest { batch.getBucket("A3").setAsync("001"); batch.getKeys().deleteAsync("A1"); batch.getKeys().deleteAsync("A2"); - batch.executeSkipResult(); + batch.skipResult(); + batch.execute(); assertThat(redisson.getBucket("A1").isExists()).isFalse(); assertThat(redisson.getBucket("A3").isExists()).isTrue(); @@ -76,7 +118,7 @@ public class RedissonBatchTest extends BaseTest { batch.getBucket("A3").setAsync("001"); batch.getKeys().deleteAsync("A1"); batch.getKeys().deleteAsync("A2"); - List result = batch.execute(); + batch.execute(); } @Test @@ -99,8 +141,8 @@ public class RedissonBatchTest extends BaseTest { for (int i = 1; i < 540; i++) { listAsync.addAsync(i); } - List res = b.execute(); - Assert.assertEquals(539, res.size()); + BatchResult res = b.execute(); + Assert.assertEquals(539, res.getResponses().size()); } @Test @@ -113,8 +155,8 @@ public class RedissonBatchTest extends BaseTest { batch.getAtomicLong("counter").incrementAndGetAsync(); batch.getAtomicLong("counter").incrementAndGetAsync(); } - List res = batch.execute(); - Assert.assertEquals(210*5, res.size()); + BatchResult res = batch.execute(); + Assert.assertEquals(210*5, res.getResponses().size()); } @Test(expected=RedisException.class)