diff --git a/redisson-all/pom.xml b/redisson-all/pom.xml
index 146fb2ab4..351750987 100644
--- a/redisson-all/pom.xml
+++ b/redisson-all/pom.xml
@@ -87,7 +87,7 @@
io.nettynetty-transport-native-epolllinux-x86_64
- 4.1.13.Final
+ 4.1.16.Finalcom.esotericsoftware
diff --git a/redisson/pom.xml b/redisson/pom.xml
index d063e7389..ed6963917 100644
--- a/redisson/pom.xml
+++ b/redisson/pom.xml
@@ -34,33 +34,33 @@
io.nettynetty-transport-native-epoll
- 4.1.15.Final
+ 4.1.16.Finalprovidedio.nettynetty-common
- 4.1.15.Final
+ 4.1.16.Finalio.nettynetty-codec
- 4.1.15.Final
+ 4.1.16.Finalio.nettynetty-buffer
- 4.1.15.Final
+ 4.1.16.Finalio.nettynetty-transport
- 4.1.15.Final
+ 4.1.16.Finalio.nettynetty-handler
- 4.1.15.Final
+ 4.1.16.Final
@@ -138,7 +138,7 @@
org.springframeworkspring-web
- [3.1,4.3)
+ [3.1,)test
@@ -238,14 +238,14 @@
org.springframeworkspring-context
- [3.1,4.3)
+ [3.1,)providedtrueorg.springframeworkspring-context-support
- [3.1,4.3)
+ [3.1,)providedtrue
diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java
index 702c878ab..1d4b62446 100644
--- a/redisson/src/main/java/org/redisson/RedissonBatch.java
+++ b/redisson/src/main/java/org/redisson/RedissonBatch.java
@@ -15,10 +15,10 @@
*/
package org.redisson;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicLongAsync;
import org.redisson.api.RBatch;
@@ -64,6 +64,10 @@ public class RedissonBatch implements RBatch {
private int retryAttempts;
private long retryInterval;
+ private int syncSlaves;
+ private long syncTimeout;
+ private boolean skipResult;
+
protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
@@ -230,6 +234,19 @@ public class RedissonBatch implements RBatch {
return new RedissonSetCache(codec, evictionScheduler, executorService, name, null);
}
+ @Override
+ public RBatch syncSlaves(int slaves, long timeout, TimeUnit unit) {
+ this.syncSlaves = slaves;
+ this.syncTimeout = unit.toMillis(timeout);
+ return this;
+ }
+
+ @Override
+ public RBatch skipResult() {
+ this.skipResult = true;
+ return this;
+ }
+
@Override
public RBatch retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
@@ -249,25 +266,25 @@ public class RedissonBatch implements RBatch {
}
@Override
- public List> execute() {
- return executorService.execute(timeout, retryAttempts, retryInterval);
+ public BatchResult> execute() {
+ return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
@Override
public void executeSkipResult() {
- executorService.executeSkipResult(timeout, retryAttempts, retryInterval);
+ executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval);
}
@Override
public RFuture executeSkipResultAsync() {
- return executorService.executeSkipResultAsync(timeout, retryAttempts, retryInterval);
+ return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval);
}
@Override
- public RFuture> executeAsync() {
- return executorService.executeAsync(timeout, retryAttempts, retryInterval);
+ public RFuture> executeAsync() {
+ return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
-
+
@Override
public RMultimapAsync getSetMultimap(String name) {
return new RedissonSetMultimap(id, executorService, name);
diff --git a/redisson/src/main/java/org/redisson/RedissonRemoteService.java b/redisson/src/main/java/org/redisson/RedissonRemoteService.java
index 7148153f1..d23aa2af7 100644
--- a/redisson/src/main/java/org/redisson/RedissonRemoteService.java
+++ b/redisson/src/main/java/org/redisson/RedissonRemoteService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
@@ -295,7 +296,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
- RFuture> clientsFuture = send(timeout, responseName,
+ RFuture> clientsFuture = send(timeout, responseName,
responseHolder.get());
clientsFuture.addListener(new FutureListener>() {
@Override
@@ -324,7 +325,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
- private RFuture> send(long timeout, String responseName, T response) {
+ private RFuture> send(long timeout, String responseName, T response) {
RBatch batch = redisson.createBatch();
RBlockingQueueAsync queue = batch.getBlockingQueue(responseName, getCodec());
queue.putAsync(response);
diff --git a/redisson/src/main/java/org/redisson/api/BatchResult.java b/redisson/src/main/java/org/redisson/api/BatchResult.java
new file mode 100644
index 000000000..65a477f00
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/api/BatchResult.java
@@ -0,0 +1,242 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.api;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
+public class BatchResult implements List {
+
+ private final List responses;
+ private final int syncedSlaves;
+
+ public BatchResult(List responses, int syncedSlaves) {
+ super();
+ this.responses = responses;
+ this.syncedSlaves = syncedSlaves;
+ }
+
+ /**
+ * Returns list with result object for each command
+ *
+ * @return list
+ */
+ public List> getResponses() {
+ return responses;
+ }
+
+ /**
+ * Returns synchronized slaves amount involved during batch execution
+ *
+ * @return slaves
+ */
+ public int getSyncedSlaves() {
+ return syncedSlaves;
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public int size() {
+ return responses.size();
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean isEmpty() {
+ return responses.isEmpty();
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean contains(Object o) {
+ return responses.contains(o);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public Iterator iterator() {
+ return responses.iterator();
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public Object[] toArray() {
+ return responses.toArray();
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public T[] toArray(T[] a) {
+ return responses.toArray(a);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean add(E e) {
+ return responses.add(e);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean remove(Object o) {
+ return responses.remove(o);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean containsAll(Collection> c) {
+ return responses.containsAll(c);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean addAll(Collection extends E> c) {
+ return responses.addAll(c);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean addAll(int index, Collection extends E> c) {
+ return responses.addAll(index, c);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean removeAll(Collection> c) {
+ return responses.removeAll(c);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public boolean retainAll(Collection> c) {
+ return responses.retainAll(c);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public void clear() {
+ responses.clear();
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public E get(int index) {
+ return responses.get(index);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public E set(int index, E element) {
+ return responses.set(index, element);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public void add(int index, E element) {
+ responses.add(index, element);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public E remove(int index) {
+ return responses.remove(index);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public int indexOf(Object o) {
+ return responses.indexOf(o);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public int lastIndexOf(Object o) {
+ return responses.lastIndexOf(o);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public ListIterator listIterator() {
+ return responses.listIterator();
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public ListIterator listIterator(int index) {
+ return responses.listIterator(index);
+ }
+
+ /**
+ * Use {@link #getResponses()}
+ */
+ @Deprecated
+ public List subList(int fromIndex, int toIndex) {
+ return responses.subList(fromIndex, toIndex);
+ }
+
+
+}
diff --git a/redisson/src/main/java/org/redisson/api/RBatch.java b/redisson/src/main/java/org/redisson/api/RBatch.java
index 0521a575b..883c1823a 100644
--- a/redisson/src/main/java/org/redisson/api/RBatch.java
+++ b/redisson/src/main/java/org/redisson/api/RBatch.java
@@ -15,7 +15,6 @@
*/
package org.redisson.api;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisException;
@@ -387,7 +386,7 @@ public interface RBatch {
* @throws RedisException in case of any error
*
*/
- List> execute() throws RedisException;
+ BatchResult> execute() throws RedisException;
/**
* Executes all operations accumulated during async methods invocations asynchronously.
@@ -397,36 +396,42 @@ public interface RBatch {
*
* @return List with result object for each command
*/
- RFuture> executeAsync();
+ RFuture> executeAsync();
/**
- * Executes all operations accumulated during async methods invocations.
- * Command replies are skipped such approach saves response bandwidth.
- *
- * If cluster configuration used then operations are grouped by slot ids
- * and may be executed on different servers. Thus command execution order could be changed.
- *
- * NOTE: Redis 3.2+ required
- *
- * @throws RedisException in case of any error
- *
+ * Use {@link #skipResult()}
*/
+ @Deprecated
void executeSkipResult();
/**
- * Executes all operations accumulated during async methods invocations asynchronously,
- * Command replies are skipped such approach saves response bandwidth.
- *
- * If cluster configuration used then operations are grouped by slot ids
- * and may be executed on different servers. Thus command execution order could be changed
+ * Use {@link #skipResult()}
+ */
+ @Deprecated
+ RFuture executeSkipResultAsync();
+
+ /**
+ * Inform Redis not to send reply for this batch.
+ * Such approach saves response bandwidth.
*
* NOTE: Redis 3.2+ required
*
- * @return void
- * @throws RedisException in case of any error
- *
+ * @return self instance
*/
- RFuture executeSkipResultAsync();
+ RBatch skipResult();
+
+ /**
+ * Synchronize write operations execution across defined amount
+ * of Redis slave nodes within defined timeout.
+ *
+ * NOTE: Redis 3.0+ required
+ *
+ * @param slaves amount to sync
+ * @param timeout for sync operation
+ * @param unit value
+ * @return self instance
+ */
+ RBatch syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.
@@ -443,7 +448,7 @@ public interface RBatch {
RBatch timeout(long timeout, TimeUnit unit);
/**
- * Defines time interval for another one attempt send Redis commands batch
+ * Defines time interval for each attempt to send Redis commands batch
* if it hasn't been sent already.
*
* 0 value means use Config.setRetryInterval value instead.
diff --git a/redisson/src/main/java/org/redisson/api/RBatchReactive.java b/redisson/src/main/java/org/redisson/api/RBatchReactive.java
index 89843ae47..4ab1a71d6 100644
--- a/redisson/src/main/java/org/redisson/api/RBatchReactive.java
+++ b/redisson/src/main/java/org/redisson/api/RBatchReactive.java
@@ -15,7 +15,7 @@
*/
package org.redisson.api;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
@@ -249,6 +249,68 @@ public interface RBatchReactive {
*
* @return List with result object for each command
*/
- Publisher> execute();
+ Publisher> execute();
+ /**
+ * Command replies are skipped such approach saves response bandwidth.
+ *
+ * NOTE: Redis 3.0+ required
+ *
+ * @param slaves number to sync
+ * @param timeout for sync operation
+ * @param unit value
+ * @return self instance
+ */
+ RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit);
+
+ /**
+ * Defines timeout for Redis response.
+ * Starts to countdown when Redis command has been successfully sent.
+ *
+ * 0 value means use Config.setTimeout value instead.
+ *
+ * Default is 0
+ *
+ * @param timeout value
+ * @param unit value
+ * @return self instance
+ */
+ RBatchReactive timeout(long timeout, TimeUnit unit);
+
+ /**
+ * Defines time interval for another one attempt send Redis commands batch
+ * if it hasn't been sent already.
+ *
+ * 0 value means use Config.setRetryInterval value instead.
+ *
+ * Default is 0
+ *
+ * @param retryInterval value
+ * @param unit value
+ * @return self instance
+ */
+ RBatchReactive retryInterval(long retryInterval, TimeUnit unit);
+
+ /**
+ * Defines attempts amount to re-send Redis commands batch
+ * if it hasn't been sent already.
+ *
+ * 0 value means use Config.setRetryAttempts value instead.
+ *
+ * Default is 0
+ *
+ * @param retryAttempts value
+ * @return self instance
+ */
+ RBatchReactive retryAttempts(int retryAttempts);
+
}
diff --git a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java
index 23f2ff17b..2e59fbaad 100644
--- a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java
+++ b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;
import org.redisson.misc.RPromise;
+import org.redisson.misc.RedissonPromise;
/**
*
@@ -33,6 +34,10 @@ public class BatchCommandData extends CommandData implements Compara
private final int index;
private final AtomicReference redirectError = new AtomicReference();
+ public BatchCommandData(RedisCommand command, Object[] params, int index) {
+ this(new RedissonPromise(), null, command, params, index);
+ }
+
public BatchCommandData(RPromise promise, Codec codec, RedisCommand command, Object[] params, int index) {
super(promise, codec, command, params);
this.index = index;
diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java
index aa15d8c51..9f71d89d3 100644
--- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java
+++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java
@@ -93,6 +93,7 @@ public interface RedisCommands {
RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor());
RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor());
+ RedisStrictCommand WAIT = new RedisStrictCommand("WAIT", new IntegerReplayConvertor());
RedisStrictCommand CLIENT_REPLY = new RedisStrictCommand("CLIENT", "REPLY", new VoidReplayConvertor());
RedisStrictCommand ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor());
RedisStrictCommand READONLY = new RedisStrictCommand("READONLY", new VoidReplayConvertor());
diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
index cadcecefd..3a066f512 100644
--- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
+++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java
@@ -864,109 +864,149 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private void handleReference(RPromise mainPromise, R res) {
- if (res instanceof List) {
- List