Code formatted according to checkstyle rules

pull/1923/head
Nikita Koksharov 6 years ago
parent 0090b8796d
commit 901a5a797b

@ -355,7 +355,7 @@
<execution> <execution>
<phase>verify</phase> <phase>verify</phase>
<goals> <goals>
<goal>checkstyle</goal> <goal>check</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>

@ -101,7 +101,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
* @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[]) * @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/ */
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return blockingQueue.pollFromAny(timeout, unit); return blockingQueue.pollFromAny(timeout, unit);
} }
@ -110,7 +110,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[]) * @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/ */
@Override @Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return blockingQueue.pollFromAnyAsync(timeout, unit); return blockingQueue.pollFromAnyAsync(timeout, unit);
} }
@ -217,22 +217,22 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
} }
@Override @Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return pollFromAnyAsync(timeout, unit, queueNames); return pollFromAnyAsync(timeout, unit, queueNames);
} }
@Override @Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollLastFromAnyAsync(timeout, unit, queueNames)); return get(pollLastFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BRPOP_VALUE, toSeconds(timeout, unit), queueNames); return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BRPOP_VALUE, toSeconds(timeout, unit), queueNames);
} }

@ -99,7 +99,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
* @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[]) * @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/ */
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFromAnyAsync(timeout, unit, queueNames)); return get(pollFromAnyAsync(timeout, unit, queueNames));
} }
@ -108,7 +108,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[]) * @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/ */
@Override @Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames); return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames);
} }

@ -222,7 +222,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
* @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[]) * @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/ */
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFromAnyAsync(timeout, unit, queueNames)); return get(pollFromAnyAsync(timeout, unit, queueNames));
} }
@ -231,7 +231,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[]) * @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/ */
@Override @Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
RFuture<V> takeFuture = commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames); RFuture<V> takeFuture = commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames);
return wrapTakeFuture(takeFuture); return wrapTakeFuture(takeFuture);
} }

@ -328,7 +328,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public void execute(Runnable ...tasks) { public void execute(Runnable...tasks) {
if (tasks.length == 0) { if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined"); throw new NullPointerException("Tasks are not defined");
} }
@ -574,7 +574,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public RExecutorBatchFuture submit(Callable<?> ...tasks) { public RExecutorBatchFuture submit(Callable<?>...tasks) {
if (tasks.length == 0) { if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined"); throw new NullPointerException("Tasks are not defined");
} }
@ -600,7 +600,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public RExecutorBatchFuture submitAsync(Callable<?> ...tasks) { public RExecutorBatchFuture submitAsync(Callable<?>...tasks) {
if (tasks.length == 0) { if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined"); throw new NullPointerException("Tasks are not defined");
} }
@ -690,7 +690,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public RExecutorBatchFuture submit(Runnable ...tasks) { public RExecutorBatchFuture submit(Runnable...tasks) {
if (tasks.length == 0) { if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined"); throw new NullPointerException("Tasks are not defined");
} }
@ -716,7 +716,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
} }
@Override @Override
public RExecutorBatchFuture submitAsync(Runnable ...tasks) { public RExecutorBatchFuture submitAsync(Runnable...tasks) {
if (tasks.length == 0) { if (tasks.length == 0) {
throw new NullPointerException("Tasks are not defined"); throw new NullPointerException("Tasks are not defined");
} }
@ -979,11 +979,6 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private <T> RFuture<T> poll(List<RExecutorFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException { private <T> RFuture<T> poll(List<RExecutorFuture<?>> futures, long timeout, TimeUnit timeUnit) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicReference<RFuture<T>> result = new AtomicReference<>(); AtomicReference<RFuture<T>> result = new AtomicReference<>();
BiConsumer<T, Throwable> listener = new BiConsumer<T, Throwable>() {
@Override
public void accept(T t, Throwable u) {
}
};
for (Future<?> future : futures) { for (Future<?> future : futures) {
RFuture<T> f = (RFuture<T>) future; RFuture<T> f = (RFuture<T>) future;
f.onComplete((r, e) -> { f.onComplete((r, e) -> {

@ -259,17 +259,17 @@ public class RedissonKeys implements RKeys {
} }
@Override @Override
public long delete(String ... keys) { public long delete(String... keys) {
return commandExecutor.get(deleteAsync(keys)); return commandExecutor.get(deleteAsync(keys));
} }
@Override @Override
public long delete(RObject ... objects) { public long delete(RObject... objects) {
return commandExecutor.get(deleteAsync(objects)); return commandExecutor.get(deleteAsync(objects));
} }
@Override @Override
public RFuture<Long> deleteAsync(RObject ... objects) { public RFuture<Long> deleteAsync(RObject... objects) {
List<String> keys = new ArrayList<String>(); List<String> keys = new ArrayList<String>();
for (RObject obj : objects) { for (RObject obj : objects) {
keys.add(obj.getName()); keys.add(obj.getName());
@ -279,21 +279,21 @@ public class RedissonKeys implements RKeys {
} }
@Override @Override
public long unlink(String ... keys) { public long unlink(String... keys) {
return commandExecutor.get(deleteAsync(keys)); return commandExecutor.get(deleteAsync(keys));
} }
@Override @Override
public RFuture<Long> unlinkAsync(String ... keys) { public RFuture<Long> unlinkAsync(String... keys) {
return executeAsync(RedisCommands.UNLINK, keys); return executeAsync(RedisCommands.UNLINK, keys);
} }
@Override @Override
public RFuture<Long> deleteAsync(String ... keys) { public RFuture<Long> deleteAsync(String... keys) {
return executeAsync(RedisCommands.DEL, keys); return executeAsync(RedisCommands.DEL, keys);
} }
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, String ... keys) { private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, String... keys) {
if (!commandExecutor.getConnectionManager().isClusterMode()) { if (!commandExecutor.getConnectionManager().isClusterMode()) {
return commandExecutor.writeAsync(null, command, keys); return commandExecutor.writeAsync(null, command, keys);
} }

@ -308,11 +308,11 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return commandExecutor.readAsync(getName(), codec, LINDEX, getName(), index); return commandExecutor.readAsync(getName(), codec, LINDEX, getName(), index);
} }
public List<V> get(int ...indexes) { public List<V> get(int...indexes) {
return get(getAsync(indexes)); return get(getAsync(indexes));
} }
public RFuture<List<V>> getAsync(int ...indexes) { public RFuture<List<V>> getAsync(int...indexes) {
List<Integer> params = new ArrayList<Integer>(); List<Integer> params = new ArrayList<Integer>();
for (Integer index : indexes) { for (Integer index : indexes) {
params.add(index); params.add(index);

@ -365,12 +365,12 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
} }
@Override @Override
public List<V> get(int ...indexes) { public List<V> get(int...indexes) {
return get(getAsync(indexes)); return get(getAsync(indexes));
} }
@Override @Override
public RFuture<List<V>> getAsync(int ...indexes) { public RFuture<List<V>> getAsync(int...indexes) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis()); params.add(System.currentTimeMillis());
params.add(encodeMapKey(key)); params.add(encodeMapKey(key));

@ -447,7 +447,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
@Override @Override
protected RFuture<Long> fastRemoveOperationAsync(K ... keys) { protected RFuture<Long> fastRemoveOperationAsync(K... keys) {
if (invalidateEntryOnChange == 1) { if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2); List<Object> params = new ArrayList<Object>(keys.length*2);

@ -1028,7 +1028,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
@Override @Override
public RFuture<Long> fastRemoveAsync(K ... keys) { public RFuture<Long> fastRemoveAsync(K... keys) {
if (keys == null) { if (keys == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -1112,7 +1112,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
@Override @Override
public long fastRemove(K ... keys) { public long fastRemove(K... keys) {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }

@ -1213,7 +1213,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
} }
@Override @Override
protected RFuture<Long> fastRemoveOperationAsync(K ... keys) { protected RFuture<Long> fastRemoveOperationAsync(K... keys) {
List<Object> params = new ArrayList<Object>(keys.length); List<Object> params = new ArrayList<Object>(keys.length);
for (K key : keys) { for (K key : keys) {
params.add(encodeMapKey(key)); params.add(encodeMapKey(key));

@ -195,12 +195,12 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
} }
@Override @Override
public long fastRemove(K ... keys) { public long fastRemove(K... keys) {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }
@Override @Override
public RFuture<Long> fastRemoveAsync(K ... keys) { public RFuture<Long> fastRemoveAsync(K... keys) {
if (keys == null || keys.length == 0) { if (keys == null || keys.length == 0) {
return RedissonPromise.newSucceededFuture(0L); return RedissonPromise.newSucceededFuture(0L);
} }

@ -80,7 +80,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
} }
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method"); throw new UnsupportedOperationException("use poll method");
} }
@ -201,22 +201,22 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
} }
@Override @Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return pollFromAnyAsync(timeout, unit, queueNames); return pollFromAnyAsync(timeout, unit, queueNames);
} }
@Override @Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollLastFromAnyAsync(timeout, unit, queueNames)); return get(pollLastFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

@ -68,7 +68,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
return result; return result;
} }
protected <T> void takeAsync(RPromise<V> result, long delay, long timeoutInMicro, RedisCommand<T> command, Object ... params) { protected <T> void takeAsync(RPromise<V> result, long delay, long timeoutInMicro, RedisCommand<T> command, Object... params) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
commandExecutor.getConnectionManager().getGroup().schedule(() -> { commandExecutor.getConnectionManager().getGroup().schedule(() -> {
RFuture<V> future = pollAsync(command, params); RFuture<V> future = pollAsync(command, params);
@ -123,7 +123,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
} }
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method"); throw new UnsupportedOperationException("use poll method");
} }

@ -288,7 +288,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
return pollAsync(RedisCommands.LPOP, getName()); return pollAsync(RedisCommands.LPOP, getName());
} }
protected <T> RFuture<V> pollAsync(RedisCommand<T> command, Object ... params) { protected <T> RFuture<V> pollAsync(RedisCommand<T> command, Object... params) {
long threadId = Thread.currentThread().getId(); long threadId = Thread.currentThread().getId();
RPromise<V> result = new RedissonPromise<V>(); RPromise<V> result = new RedissonPromise<V>();
lock.lockAsync(threadId).onComplete((r, exc) -> { lock.lockAsync(threadId).onComplete((r, exc) -> {

@ -150,22 +150,22 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
} }
@Override @Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) { public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, toSeconds(timeout, unit), queueNames); return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, toSeconds(timeout, unit), queueNames);
} }
@Override @Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) { public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) {
return get(pollLastFromAnyAsync(timeout, unit, queueNames)); return get(pollLastFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, toSeconds(timeout, unit), queueNames); return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, toSeconds(timeout, unit), queueNames);
} }

@ -176,12 +176,12 @@ public class RedissonScript implements RScript {
} }
@Override @Override
public List<Boolean> scriptExists(String ... shaDigests) { public List<Boolean> scriptExists(String... shaDigests) {
return commandExecutor.get(scriptExistsAsync(shaDigests)); return commandExecutor.get(scriptExistsAsync(shaDigests));
} }
@Override @Override
public RFuture<List<Boolean>> scriptExistsAsync(final String ... shaDigests) { public RFuture<List<Boolean>> scriptExistsAsync(final String... shaDigests) {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() { return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() {
volatile List<Boolean> result = new ArrayList<Boolean>(shaDigests.length); volatile List<Boolean> result = new ArrayList<Boolean>(shaDigests.length);
@Override @Override
@ -201,11 +201,11 @@ public class RedissonScript implements RScript {
}, (Object[])shaDigests); }, (Object[])shaDigests);
} }
public List<Boolean> scriptExists(String key, String ... shaDigests) { public List<Boolean> scriptExists(String key, String... shaDigests) {
return commandExecutor.get(scriptExistsAsync(key, shaDigests)); return commandExecutor.get(scriptExistsAsync(key, shaDigests));
} }
public RFuture<List<Boolean>> scriptExistsAsync(String key, String ... shaDigests) { public RFuture<List<Boolean>> scriptExistsAsync(String key, String... shaDigests) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_EXISTS, shaDigests); return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_EXISTS, shaDigests);
} }

@ -758,7 +758,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
} }
@Override @Override
public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids) { public Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId... ids) {
return get(readAsync(count, ids)); return get(readAsync(count, ids));
} }

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class BatchOptions { public final class BatchOptions {
public enum ExecutionMode { public enum ExecutionMode {

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class ExecutorOptions { public final class ExecutorOptions {
private long taskRetryInterval = 5 * 60000; private long taskRetryInterval = 5 * 60000;

@ -28,7 +28,7 @@ import org.redisson.client.protocol.Time;
*/ */
public interface Node extends NodeAsync { public interface Node extends NodeAsync {
public enum InfoSection {ALL, DEFAULT, SERVER, CLIENTS, MEMORY, PERSISTENCE, STATS, REPLICATION, CPU, COMMANDSTATS, CLUSTER, KEYSPACE} enum InfoSection {ALL, DEFAULT, SERVER, CLIENTS, MEMORY, PERSISTENCE, STATS, REPLICATION, CPU, COMMANDSTATS, CLUSTER, KEYSPACE}
Map<String, String> info(InfoSection section); Map<String, String> info(InfoSection section);

@ -40,7 +40,7 @@ public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>,
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting * @throws InterruptedException if interrupted while waiting
*/ */
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException; V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException;
/** /**
* Retrieves and removes first available tail element of <b>any</b> queue, * Retrieves and removes first available tail element of <b>any</b> queue,
@ -56,6 +56,6 @@ public interface RBlockingDeque<V> extends BlockingDeque<V>, RBlockingQueue<V>,
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting * @throws InterruptedException if interrupted while waiting
*/ */
V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException; V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException;
} }

@ -39,7 +39,7 @@ public interface RBlockingDequeAsync<V> extends RDequeAsync<V>, RBlockingQueueAs
* @return the head of this queue, or {@code null} if the * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Retrieves and removes first available tail element of <b>any</b> queue in async mode, * Retrieves and removes first available tail element of <b>any</b> queue in async mode,
@ -54,7 +54,7 @@ public interface RBlockingDequeAsync<V> extends RDequeAsync<V>, RBlockingQueueAs
* @return the head of this queue, or {@code null} if the * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Adds value to the head of queue. * Adds value to the head of queue.

@ -41,7 +41,7 @@ public interface RBlockingDequeReactive<V> extends RDequeReactive<V>, RBlockingQ
* @return the head of this queue, or {@code null} if the * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
Mono<V> pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); Mono<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Retrieves and removes first available tail element of <b>any</b> queue in reactive mode, * Retrieves and removes first available tail element of <b>any</b> queue in reactive mode,
@ -56,7 +56,7 @@ public interface RBlockingDequeReactive<V> extends RDequeReactive<V>, RBlockingQ
* @return the head of this queue, or {@code null} if the * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
Mono<V> pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); Mono<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Adds value to the head of queue. * Adds value to the head of queue.

@ -40,7 +40,7 @@ public interface RBlockingDequeRx<V> extends RDequeRx<V>, RBlockingQueueRx<V> {
* @return the head of this queue, or {@code null} if the * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
Flowable<V> pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); Flowable<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Retrieves and removes first available tail element of <b>any</b> queue in reactive mode, * Retrieves and removes first available tail element of <b>any</b> queue in reactive mode,
@ -55,7 +55,7 @@ public interface RBlockingDequeRx<V> extends RDequeRx<V>, RBlockingQueueRx<V> {
* @return the head of this queue, or {@code null} if the * @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
Flowable<V> pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); Flowable<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Adds value to the head of queue. * Adds value to the head of queue.

@ -40,7 +40,7 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting * @throws InterruptedException if interrupted while waiting
*/ */
V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException; V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException;
/** /**
* Retrieves and removes last available tail element of <b>any</b> queue and adds it at the head of <code>queueName</code>, * Retrieves and removes last available tail element of <b>any</b> queue and adds it at the head of <code>queueName</code>,

@ -40,7 +40,7 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
* @return Future object with the head of this queue, or {@code null} if the * @return Future object with the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes at most the given number of available elements from * Removes at most the given number of available elements from

@ -42,7 +42,7 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
* @return Mono object with the head of this queue, or {@code null} if the * @return Mono object with the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
Mono<V> pollFromAny(long timeout, TimeUnit unit, String ... queueNames); Mono<V> pollFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes at most the given number of available elements from * Removes at most the given number of available elements from

@ -41,7 +41,7 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
* @return Flowable object with the head of this queue, or {@code null} if the * @return Flowable object with the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available * specified waiting time elapses before an element is available
*/ */
Flowable<V> pollFromAny(long timeout, TimeUnit unit, String ... queueNames); Flowable<V> pollFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes at most the given number of available elements from * Removes at most the given number of available elements from

@ -33,7 +33,7 @@ public interface RBuckets extends RBucketsAsync {
* @param keys - keys * @param keys - keys
* @return Map with name of bucket as key and bucket as value * @return Map with name of bucket as key and bucket as value
*/ */
<V> Map<String, V> get(String ... keys); <V> Map<String, V> get(String... keys);
/** /**
* Try to save objects mapped by Redis key. * Try to save objects mapped by Redis key.

@ -33,7 +33,7 @@ public interface RBucketsAsync {
* @param keys - keys * @param keys - keys
* @return Map with name of bucket as key and bucket as value * @return Map with name of bucket as key and bucket as value
*/ */
<V> RFuture<Map<String, V>> getAsync(String ... keys); <V> RFuture<Map<String, V>> getAsync(String... keys);
/** /**
* Try to save objects mapped by Redis key. * Try to save objects mapped by Redis key.

@ -52,7 +52,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
* @param tasks - tasks to execute * @param tasks - tasks to execute
* @return Future object * @return Future object
*/ */
RExecutorBatchFuture submit(Callable<?> ...tasks); RExecutorBatchFuture submit(Callable<?>...tasks);
/** /**
* Submits a Runnable task for execution and returns a Future * Submits a Runnable task for execution and returns a Future
@ -86,7 +86,7 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
* @param tasks - tasks to execute * @param tasks - tasks to execute
* @return Future object * @return Future object
*/ */
RExecutorBatchFuture submit(Runnable ...tasks); RExecutorBatchFuture submit(Runnable...tasks);
/** /**
* Returns executor name * Returns executor name
@ -141,6 +141,6 @@ public interface RExecutorService extends ExecutorService, RExecutorServiceAsync
* *
* @param tasks - tasks to execute * @param tasks - tasks to execute
*/ */
void execute(Runnable ...tasks); void execute(Runnable...tasks);
} }

@ -48,7 +48,7 @@ public interface RExecutorServiceAsync {
* @param tasks - tasks to execute * @param tasks - tasks to execute
* @return Future object * @return Future object
*/ */
RExecutorBatchFuture submitAsync(Callable<?> ...tasks); RExecutorBatchFuture submitAsync(Callable<?>...tasks);
/** /**
* Submits task for execution asynchronously * Submits task for execution asynchronously
@ -65,6 +65,6 @@ public interface RExecutorServiceAsync {
* @param tasks - tasks to execute * @param tasks - tasks to execute
* @return Future object * @return Future object
*/ */
RExecutorBatchFuture submitAsync(Runnable ...tasks); RExecutorBatchFuture submitAsync(Runnable...tasks);
} }

@ -58,13 +58,13 @@ public interface RHyperLogLog<V> extends RExpirable, RHyperLogLogAsync<V> {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return number * @return number
*/ */
long countWith(String ... otherLogNames); long countWith(String... otherLogNames);
/** /**
* Merges multiple instances into this instance. * Merges multiple instances into this instance.
* *
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
*/ */
void mergeWith(String ... otherLogNames); void mergeWith(String... otherLogNames);
} }

@ -59,7 +59,7 @@ public interface RHyperLogLogAsync<V> extends RExpirableAsync {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return number * @return number
*/ */
RFuture<Long> countWithAsync(String ... otherLogNames); RFuture<Long> countWithAsync(String... otherLogNames);
/** /**
* Merges multiple instances into this instance. * Merges multiple instances into this instance.
@ -67,6 +67,6 @@ public interface RHyperLogLogAsync<V> extends RExpirableAsync {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return void * @return void
*/ */
RFuture<Void> mergeWithAsync(String ... otherLogNames); RFuture<Void> mergeWithAsync(String... otherLogNames);
} }

@ -61,7 +61,7 @@ public interface RHyperLogLogReactive<V> extends RExpirableReactive {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return number * @return number
*/ */
Mono<Long> countWith(String ... otherLogNames); Mono<Long> countWith(String... otherLogNames);
/** /**
* Merges multiple instances into this instance. * Merges multiple instances into this instance.
@ -69,6 +69,6 @@ public interface RHyperLogLogReactive<V> extends RExpirableReactive {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return void * @return void
*/ */
Mono<Void> mergeWith(String ... otherLogNames); Mono<Void> mergeWith(String... otherLogNames);
} }

@ -61,7 +61,7 @@ public interface RHyperLogLogRx<V> extends RExpirableRx {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return number * @return number
*/ */
Flowable<Long> countWith(String ... otherLogNames); Flowable<Long> countWith(String... otherLogNames);
/** /**
* Merges multiple instances into this instance. * Merges multiple instances into this instance.
@ -69,6 +69,6 @@ public interface RHyperLogLogRx<V> extends RExpirableRx {
* @param otherLogNames - name of instances * @param otherLogNames - name of instances
* @return void * @return void
*/ */
Flowable<Void> mergeWith(String ... otherLogNames); Flowable<Void> mergeWith(String... otherLogNames);
} }

@ -290,7 +290,7 @@ public interface RKeys extends RKeysAsync {
* @param objects of Redisson * @param objects of Redisson
* @return number of removed keys * @return number of removed keys
*/ */
long delete(RObject ... objects); long delete(RObject... objects);
/** /**
* Delete multiple objects by name * Delete multiple objects by name
@ -298,7 +298,7 @@ public interface RKeys extends RKeysAsync {
* @param keys - object names * @param keys - object names
* @return number of removed keys * @return number of removed keys
*/ */
long delete(String ... keys); long delete(String... keys);
/** /**
* Delete multiple objects by name. * Delete multiple objects by name.
@ -309,7 +309,7 @@ public interface RKeys extends RKeysAsync {
* @param keys of objects * @param keys of objects
* @return number of removed keys * @return number of removed keys
*/ */
long unlink(String ... keys); long unlink(String... keys);
/** /**
* Returns the number of keys in the currently-selected database * Returns the number of keys in the currently-selected database

@ -185,7 +185,7 @@ public interface RKeysAsync {
* @param objects of Redisson * @param objects of Redisson
* @return number of removed keys * @return number of removed keys
*/ */
RFuture<Long> deleteAsync(RObject ... objects); RFuture<Long> deleteAsync(RObject... objects);
/** /**
* Delete multiple objects by name * Delete multiple objects by name
@ -193,7 +193,7 @@ public interface RKeysAsync {
* @param keys - object names * @param keys - object names
* @return number of removed keys * @return number of removed keys
*/ */
RFuture<Long> deleteAsync(String ... keys); RFuture<Long> deleteAsync(String... keys);
/** /**
* Delete multiple objects by name. * Delete multiple objects by name.
@ -204,7 +204,7 @@ public interface RKeysAsync {
* @param keys - object names * @param keys - object names
* @return number of removed keys * @return number of removed keys
*/ */
RFuture<Long> unlinkAsync(String ... keys); RFuture<Long> unlinkAsync(String... keys);
/** /**
* Returns the number of keys in the currently-selected database in async mode * Returns the number of keys in the currently-selected database in async mode

@ -254,7 +254,7 @@ public interface RKeysReactive {
* @param keys - object names * @param keys - object names
* @return deleted objects amount * @return deleted objects amount
*/ */
Mono<Long> delete(String ... keys); Mono<Long> delete(String... keys);
/** /**
* Delete multiple objects by name. * Delete multiple objects by name.
@ -265,7 +265,7 @@ public interface RKeysReactive {
* @param keys of objects * @param keys of objects
* @return number of removed keys * @return number of removed keys
*/ */
Mono<Long> unlink(String ... keys); Mono<Long> unlink(String... keys);
/** /**
* Returns the number of keys in the currently-selected database * Returns the number of keys in the currently-selected database

@ -253,7 +253,7 @@ public interface RKeysRx {
* @param keys - object names * @param keys - object names
* @return deleted objects amount * @return deleted objects amount
*/ */
Flowable<Long> delete(String ... keys); Flowable<Long> delete(String... keys);
/** /**
* Delete multiple objects by name. * Delete multiple objects by name.
@ -264,7 +264,7 @@ public interface RKeysRx {
* @param keys of objects * @param keys of objects
* @return number of removed keys * @return number of removed keys
*/ */
Flowable<Long> unlink(String ... keys); Flowable<Long> unlink(String... keys);
/** /**
* Returns the number of keys in the currently-selected database * Returns the number of keys in the currently-selected database

@ -35,7 +35,7 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
* @param indexes of elements * @param indexes of elements
* @return list of elements * @return list of elements
*/ */
List<V> get(int ...indexes); List<V> get(int...indexes);
/** /**
* Returns <code>RMapReduce</code> object associated with this map * Returns <code>RMapReduce</code> object associated with this map

@ -34,7 +34,7 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<
* @param indexes of elements * @param indexes of elements
* @return elements * @return elements
*/ */
RFuture<List<V>> getAsync(int ...indexes); RFuture<List<V>> getAsync(int...indexes);
/** /**
* Add <code>element</code> after <code>elementToFind</code> * Add <code>element</code> after <code>elementToFind</code>

@ -37,7 +37,7 @@ public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReact
* @param indexes of elements * @param indexes of elements
* @return elements * @return elements
*/ */
Mono<List<V>> get(int ...indexes); Mono<List<V>> get(int...indexes);
/** /**
* Add <code>element</code> after <code>elementToFind</code> * Add <code>element</code> after <code>elementToFind</code>

@ -36,7 +36,7 @@ public interface RListRx<V> extends RCollectionRx<V>, RSortableRx<List<V>> {
* @param indexes of elements * @param indexes of elements
* @return elements * @return elements
*/ */
Flowable<List<V>> get(int ...indexes); Flowable<List<V>> get(int...indexes);
/** /**
* Add <code>element</code> after <code>elementToFind</code> * Add <code>element</code> after <code>elementToFind</code>

@ -271,7 +271,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
long fastRemove(K ... keys); long fastRemove(K... keys);
/** /**
* Associates the specified <code>value</code> with the specified <code>key</code>. * Associates the specified <code>value</code> with the specified <code>key</code>.

@ -135,7 +135,7 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
RFuture<Long> fastRemoveAsync(K ... keys); RFuture<Long> fastRemoveAsync(K... keys);
/** /**
* Associates the specified <code>value</code> with the specified <code>key</code> * Associates the specified <code>value</code> with the specified <code>key</code>

@ -107,7 +107,7 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
Mono<Long> fastRemove(K ... keys); Mono<Long> fastRemove(K... keys);
/** /**
* Associates the specified <code>value</code> with the specified <code>key</code> * Associates the specified <code>value</code> with the specified <code>key</code>

@ -106,7 +106,7 @@ public interface RMapRx<K, V> extends RExpirableRx {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
Flowable<Long> fastRemove(K ... keys); Flowable<Long> fastRemove(K... keys);
/** /**
* Associates the specified <code>value</code> with the specified <code>key</code> * Associates the specified <code>value</code> with the specified <code>key</code>

@ -242,7 +242,7 @@ public interface RMultimap<K, V> extends RExpirable, RMultimapAsync<K, V> {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
long fastRemove(K ... keys); long fastRemove(K... keys);
/** /**
* Read all keys at once * Read all keys at once

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
/** /**
* Base asynchronous MultiMap interface. A collection that maps multiple values per one key. * Base asynchronous MultiMap interface. A collection that maps multiple values per one key.
* *
@ -139,8 +140,7 @@ public interface RMultimapAsync<K, V> extends RExpirableAsync {
RFuture<Collection<V>> removeAllAsync(Object key); RFuture<Collection<V>> removeAllAsync(Object key);
RFuture<Collection<V>> getAllAsync(K key); RFuture<Collection<V>> getAllAsync(K key);
/** /**
* Returns the number of key-value pairs in this multimap. * Returns the number of key-value pairs in this multimap.
* *
@ -157,7 +157,7 @@ public interface RMultimapAsync<K, V> extends RExpirableAsync {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
RFuture<Long> fastRemoveAsync(K ... keys); RFuture<Long> fastRemoveAsync(K... keys);
/** /**
* Read all keys at once * Read all keys at once

@ -127,7 +127,7 @@ public interface RMultimapReactive<K, V> extends RExpirableReactive {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
Mono<Long> fastRemove(K ... keys); Mono<Long> fastRemove(K... keys);
/** /**
* Read all keys at once * Read all keys at once

@ -127,7 +127,7 @@ public interface RMultimapRx<K, V> extends RExpirableRx {
* @param keys - map keys * @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys * @return the number of keys that were removed from the hash, not including specified but non existing keys
*/ */
Flowable<Long> fastRemove(K ... keys); Flowable<Long> fastRemove(K... keys);
/** /**
* Read all keys at once * Read all keys at once

@ -34,7 +34,7 @@ import org.redisson.client.protocol.ScoredEntry;
*/ */
public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<V>, RExpirable, RSortable<Set<V>> { public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<V>, RExpirable, RSortable<Set<V>> {
public enum Aggregate { enum Aggregate {
SUM, MAX, MIN SUM, MAX, MIN
@ -63,7 +63,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* {@code timeout} parameter * {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
@ -79,7 +79,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* {@code timeout} parameter * {@code timeout} parameter
* @return the head element, or {@code null} if all sorted sets are empty * @return the head element, or {@code null} if all sorted sets are empty
*/ */
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns the head element waiting if necessary for an element to become available. * Removes and returns the head element waiting if necessary for an element to become available.

@ -45,7 +45,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* {@code timeout} parameter * {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
@ -62,7 +62,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* @return the head element, or {@code null} if all sorted sets are empty * @return the head element, or {@code null} if all sorted sets are empty
* *
*/ */
RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.

@ -49,7 +49,7 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* {@code timeout} parameter * {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
Mono<V> pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); Mono<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
@ -66,7 +66,7 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* @return the head element, or {@code null} if all sorted sets are empty * @return the head element, or {@code null} if all sorted sets are empty
* *
*/ */
Mono<V> pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); Mono<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.

@ -48,7 +48,7 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* {@code timeout} parameter * {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
Flowable<V> pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); Flowable<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
@ -65,7 +65,7 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* @return the head element, or {@code null} if all sorted sets are empty * @return the head element, or {@code null} if all sorted sets are empty
* *
*/ */
Flowable<V> pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); Flowable<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.

@ -40,7 +40,7 @@ public interface RScript extends RScriptAsync {
MAPVALUE(RedisCommands.EVAL_MAP_VALUE), MAPVALUE(RedisCommands.EVAL_MAP_VALUE),
MAPVALUELIST(RedisCommands.EVAL_MAP_VALUE_LIST); MAPVALUELIST(RedisCommands.EVAL_MAP_VALUE_LIST);
RedisCommand<?> command; private final RedisCommand<?> command;
ReturnType(RedisCommand<?> command) { ReturnType(RedisCommand<?> command) {
this.command = command; this.command = command;
@ -166,7 +166,7 @@ public interface RScript extends RScriptAsync {
* @param shaDigests - collection of SHA-1 digests * @param shaDigests - collection of SHA-1 digests
* @return list of booleans corresponding to collection SHA-1 digests * @return list of booleans corresponding to collection SHA-1 digests
*/ */
List<Boolean> scriptExists(String ... shaDigests); List<Boolean> scriptExists(String... shaDigests);
/** /**
* Kills currently executed Lua script * Kills currently executed Lua script

@ -171,7 +171,7 @@ public interface RScriptAsync {
* @param shaDigests - collection of SHA-1 digests * @param shaDigests - collection of SHA-1 digests
* @return list of booleans corresponding to collection SHA-1 digests * @return list of booleans corresponding to collection SHA-1 digests
*/ */
RFuture<List<Boolean>> scriptExistsAsync(String ... shaDigests); RFuture<List<Boolean>> scriptExistsAsync(String... shaDigests);
/** /**
* Checks for presence Lua scripts in Redis script cache by SHA-1 digest. * Checks for presence Lua scripts in Redis script cache by SHA-1 digest.
@ -180,7 +180,7 @@ public interface RScriptAsync {
* @param shaDigests - collection of SHA-1 digests * @param shaDigests - collection of SHA-1 digests
* @return list of booleans corresponding to collection SHA-1 digests * @return list of booleans corresponding to collection SHA-1 digests
*/ */
RFuture<List<Boolean>> scriptExistsAsync(String key, String ... shaDigests); RFuture<List<Boolean>> scriptExistsAsync(String key, String... shaDigests);
/** /**
* Kills currently executed Lua script * Kills currently executed Lua script

@ -128,7 +128,7 @@ public interface RScriptReactive {
* @param shaDigests - collection of SHA-1 digests * @param shaDigests - collection of SHA-1 digests
* @return list of booleans corresponding to collection SHA-1 digests * @return list of booleans corresponding to collection SHA-1 digests
*/ */
Mono<List<Boolean>> scriptExists(String ... shaDigests); Mono<List<Boolean>> scriptExists(String... shaDigests);
/** /**
* Kills currently executed Lua script * Kills currently executed Lua script

@ -127,7 +127,7 @@ public interface RScriptRx {
* @param shaDigests - collection of SHA-1 digests * @param shaDigests - collection of SHA-1 digests
* @return list of booleans corresponding to collection SHA-1 digests * @return list of booleans corresponding to collection SHA-1 digests
*/ */
Flowable<List<Boolean>> scriptExists(String ... shaDigests); Flowable<List<Boolean>> scriptExists(String... shaDigests);
/** /**
* Kills currently executed Lua script * Kills currently executed Lua script

@ -138,16 +138,16 @@ public interface RSortable<V> extends RSortableAsync<V> {
<T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order); <T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order);
/** /**
* Read data in sorted view lexicographically * Read data in sorted view lexicographically
* *
* @param <T> object type * @param <T> object type
* @param byPattern that is used to generate the keys that are used for sorting * @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view * @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data * @param order for sorted data
* @param offset of sorted data * @param offset of sorted data
* @param count of sorted data * @param count of sorted data
* @return sorted collection lexicographically * @return sorted collection lexicographically
*/ */
<T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count); <T> Collection<T> readSortAlpha(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count);
/** /**

@ -138,17 +138,17 @@ public interface RSortableAsync<V> {
<T> RFuture<Collection<T>> readSortAlphaAsync(String byPattern, List<String> getPatterns, SortOrder order); <T> RFuture<Collection<T>> readSortAlphaAsync(String byPattern, List<String> getPatterns, SortOrder order);
/** /**
* Read data in sorted view lexicographically * Read data in sorted view lexicographically
* *
* @param <T> object type * @param <T> object type
* @param byPattern that is used to generate the keys that are used for sorting * @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view * @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data * @param order for sorted data
* @param offset of sorted data * @param offset of sorted data
* @param count of sorted data * @param count of sorted data
* @return sorted collection lexicographically * @return sorted collection lexicographically
*/ */
<T> RFuture<Collection<T>> readSortAlphaAsync(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count); <T> RFuture<Collection<T>> readSortAlphaAsync(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count);
/** /**
* Sort data and store to <code>destName</code> list * Sort data and store to <code>destName</code> list

@ -132,7 +132,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - Stream Message IDs * @param ids - Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/** /**
* Transfers ownership of pending messages by id to a new consumer * Transfers ownership of pending messages by id to a new consumer
@ -145,7 +145,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - Stream Message IDs * @param ids - Stream Message IDs
* @return list of Stream Message IDs * @return list of Stream Message IDs
*/ */
List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); List<StreamMessageId> fastClaim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
@ -155,7 +155,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
@ -166,7 +166,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
@ -179,7 +179,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream Message IDs.
@ -193,7 +193,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -470,7 +470,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> read(StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> read(StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream Message IDs. * Read stream data by specified collection of Stream Message IDs.
@ -479,7 +479,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> read(int count, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream Message IDs. * Read stream data by specified collection of Stream Message IDs.
@ -490,7 +490,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> read(long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream Message IDs. * Read stream data by specified collection of Stream Message IDs.
@ -502,7 +502,7 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
* @param ids - collection of Stream Message IDs * @param ids - collection of Stream Message IDs
* @return stream data mapped by Stream Message ID * @return stream data mapped by Stream Message ID
*/ */
Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); Map<StreamMessageId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified stream name including this stream. * Read stream data by specified stream name including this stream.

@ -136,7 +136,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - stream ids * @param ids - stream ids
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> claimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/** /**
* Transfers ownership of pending messages by id to a new consumer * Transfers ownership of pending messages by id to a new consumer
@ -149,7 +149,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - Stream Message IDs * @param ids - Stream Message IDs
* @return list of Stream Message IDs * @return list of Stream Message IDs
*/ */
RFuture<List<StreamMessageId>> fastClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); RFuture<List<StreamMessageId>> fastClaimAsync(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -159,7 +159,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -170,7 +170,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -183,7 +183,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -197,7 +197,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readGroupAsync(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -208,7 +208,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param nameToId - Stream Message ID mapped by stream name * @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID * @return stream data mapped by key and Stream Message ID
*/ */
RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName,StreamMessageId id, Map<String, StreamMessageId> nameToId); RFuture<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroupAsync(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -462,7 +462,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -471,7 +471,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -482,7 +482,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -494,7 +494,7 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); RFuture<Map<StreamMessageId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified stream name including this stream. * Read stream data by specified stream name including this stream.

@ -138,7 +138,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - stream ids * @param ids - stream ids
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -148,7 +148,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -159,7 +159,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -172,7 +172,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -186,7 +186,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -197,7 +197,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param nameToId - Stream Message ID mapped by stream name * @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID * @return stream data mapped by key and Stream Message ID
*/ */
Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName,StreamMessageId id, Map<String, StreamMessageId> nameToId); Mono<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -451,7 +451,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -460,7 +460,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -471,7 +471,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -483,7 +483,7 @@ public interface RStreamReactive<K, V> extends RExpirableReactive {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Mono<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); Mono<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified stream name including this stream. * Read stream data by specified stream name including this stream.

@ -138,7 +138,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - stream ids * @param ids - stream ids
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> claim(String groupName, String consumerName, long idleTime, TimeUnit idleTimeUnit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -148,7 +148,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -159,7 +159,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -172,7 +172,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs. * Read stream data from <code>groupName</code> by <code>consumerName</code> and specified collection of Stream IDs.
@ -186,7 +186,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> readGroup(String groupName, String consumerName, int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -197,7 +197,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param nameToId - Stream Message ID mapped by stream name * @param nameToId - Stream Message ID mapped by stream name
* @return stream data mapped by key and Stream Message ID * @return stream data mapped by key and Stream Message ID
*/ */
Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName,StreamMessageId id, Map<String, StreamMessageId> nameToId); Flowable<Map<String, Map<StreamMessageId, Map<K, V>>>> readGroup(String groupName, String consumerName, StreamMessageId id, Map<String, StreamMessageId> nameToId);
/** /**
* Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams. * Read stream data from <code>groupName</code> by <code>consumerName</code>, starting by specified message ids for this and other streams.
@ -451,7 +451,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> read(StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -460,7 +460,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -471,7 +471,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified collection of Stream IDs. * Read stream data by specified collection of Stream IDs.
@ -483,7 +483,7 @@ public interface RStreamRx<K, V> extends RExpirableRx {
* @param ids - collection of Stream IDs * @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID * @return stream data mapped by Stream ID
*/ */
Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId ... ids); Flowable<Map<StreamMessageId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamMessageId... ids);
/** /**
* Read stream data by specified stream name including this stream. * Read stream data by specified stream name including this stream.

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class TransactionOptions { public final class TransactionOptions {
private long responseTimeout = 3000; private long responseTimeout = 3000;
private int retryAttempts = 3; private int retryAttempts = 3;

@ -36,7 +36,7 @@ import org.redisson.liveobject.resolver.NamingScheme;
@Target({ElementType.TYPE}) @Target({ElementType.TYPE})
public @interface REntity { public @interface REntity {
public enum TransformationMode { enum TransformationMode {
IMPLEMENTATION_BASED, IMPLEMENTATION_BASED,
@ -68,7 +68,7 @@ public @interface REntity {
*/ */
TransformationMode fieldTransformation() default TransformationMode.ANNOTATION_BASED; TransformationMode fieldTransformation() default TransformationMode.ANNOTATION_BASED;
static final class DEFAULT extends BaseCodec { final class DEFAULT extends BaseCodec {
@Override @Override
public Decoder<Object> getValueDecoder() { public Decoder<Object> getValueDecoder() {
return null; return null;

@ -15,8 +15,6 @@
*/ */
package org.redisson.api.annotation; package org.redisson.api.annotation;
import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.liveobject.resolver.DefaultNamingScheme;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;
import java.lang.annotation.Retention; import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
@ -26,7 +24,8 @@ import org.redisson.client.codec.BaseCodec;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.Encoder;
import org.redisson.codec.JsonJacksonCodec; import org.redisson.liveobject.resolver.DefaultNamingScheme;
import org.redisson.liveobject.resolver.NamingScheme;
/** /**
* By default <code>namingScheme</code> and/or <code>codec</code> parameters specified in {@link REntity} * By default <code>namingScheme</code> and/or <code>codec</code> parameters specified in {@link REntity}
@ -58,7 +57,7 @@ public @interface RObjectField{
*/ */
Class<? extends Codec> codec() default DEFAULT.class; Class<? extends Codec> codec() default DEFAULT.class;
static final class DEFAULT extends BaseCodec { final class DEFAULT extends BaseCodec {
@Override @Override
public Decoder<Object> getValueDecoder() { public Decoder<Object> getValueDecoder() {
return null; return null;

@ -161,7 +161,7 @@ public class RedisConnection implements RedisCommands {
} }
} }
public <T> T sync(RedisCommand<T> command, Object ... params) { public <T> T sync(RedisCommand<T> command, Object... params) {
return sync(null, command, params); return sync(null, command, params);
} }
@ -173,25 +173,25 @@ public class RedisConnection implements RedisCommands {
return channel.writeAndFlush(data); return channel.writeAndFlush(data);
} }
public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object... params) {
RPromise<R> promise = new RedissonPromise<R>(); RPromise<R> promise = new RedissonPromise<R>();
send(new CommandData<T, R>(promise, encoder, command, params)); send(new CommandData<T, R>(promise, encoder, command, params));
return await(promise); return await(promise);
} }
public <T, R> RFuture<R> async(RedisCommand<T> command, Object ... params) { public <T, R> RFuture<R> async(RedisCommand<T> command, Object... params) {
return async(null, command, params); return async(null, command, params);
} }
public <T, R> RFuture<R> async(long timeout, RedisCommand<T> command, Object ... params) { public <T, R> RFuture<R> async(long timeout, RedisCommand<T> command, Object... params) {
return async(null, command, params); return async(null, command, params);
} }
public <T, R> RFuture<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> RFuture<R> async(Codec encoder, RedisCommand<T> command, Object... params) {
return async(-1, encoder, command, params); return async(-1, encoder, command, params);
} }
public <T, R> RFuture<R> async(long timeout, Codec encoder, final RedisCommand<T> command, final Object ... params) { public <T, R> RFuture<R> async(long timeout, Codec encoder, final RedisCommand<T> command, final Object... params) {
final RPromise<R> promise = new RedissonPromise<R>(); final RPromise<R> promise = new RedissonPromise<R>();
if (timeout == -1) { if (timeout == -1) {
timeout = redisClient.getCommandTimeout(); timeout = redisClient.getCommandTimeout();
@ -228,7 +228,7 @@ public class RedisConnection implements RedisCommands {
return promise; return promise;
} }
public <T, R> CommandData<T, R> create(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> CommandData<T, R> create(Codec encoder, RedisCommand<T> command, Object... params) {
RPromise<R> promise = new RedissonPromise<R>(); RPromise<R> promise = new RedissonPromise<R>();
return new CommandData<T, R>(promise, encoder, command, params); return new CommandData<T, R>(promise, encoder, command, params);
} }

@ -85,21 +85,21 @@ public class RedisPubSubConnection extends RedisConnection {
} }
} }
public void subscribe(Codec codec, ChannelName ... channels) { public void subscribe(Codec codec, ChannelName... channels) {
for (ChannelName ch : channels) { for (ChannelName ch : channels) {
this.channels.put(ch, codec); this.channels.put(ch, codec);
} }
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channels); async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channels);
} }
public void psubscribe(Codec codec, ChannelName ... channels) { public void psubscribe(Codec codec, ChannelName... channels) {
for (ChannelName ch : channels) { for (ChannelName ch : channels) {
patternChannels.put(ch, codec); patternChannels.put(ch, codec);
} }
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channels); async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channels);
} }
public void unsubscribe(final ChannelName ... channels) { public void unsubscribe(final ChannelName... channels) {
synchronized (this) { synchronized (this) {
for (ChannelName ch : channels) { for (ChannelName ch : channels) {
this.channels.remove(ch); this.channels.remove(ch);
@ -145,7 +145,7 @@ public class RedisPubSubConnection extends RedisConnection {
} }
} }
public void punsubscribe(final ChannelName ... channels) { public void punsubscribe(final ChannelName... channels) {
synchronized (this) { synchronized (this) {
for (ChannelName ch : channels) { for (ChannelName ch : channels) {
patternChannels.remove(ch); patternChannels.remove(ch);
@ -166,7 +166,7 @@ public class RedisPubSubConnection extends RedisConnection {
}); });
} }
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) { private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object... params) {
RPromise<R> promise = new RedissonPromise<R>(); RPromise<R> promise = new RedissonPromise<R>();
return channel.writeAndFlush(new CommandData<T, R>(promise, messageDecoder, null, command, params)); return channel.writeAndFlush(new CommandData<T, R>(promise, messageDecoder, null, command, params));
} }

@ -68,6 +68,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.FastThreadLocal;
/** /**
* Redis protocol command decoder * Redis protocol command decoder
@ -112,14 +113,14 @@ public class CommandDecoder extends ReplayingDecoder<State> {
final ExecutorService executor; final ExecutorService executor;
private final boolean decodeInExecutor; private final boolean decodeInExecutor;
private final ThreadLocal<Status> decoderStatus = new ThreadLocal<Status>() { private final FastThreadLocal<Status> decoderStatus = new FastThreadLocal<Status>() {
@Override @Override
protected Status initialValue() { protected Status initialValue() {
return Status.NORMAL; return Status.NORMAL;
}; };
}; };
private final ThreadLocal<State> state = new ThreadLocal<State>(); private final FastThreadLocal<State> state = new FastThreadLocal<State>();
public CommandDecoder(ExecutorService executor, boolean decodeInExecutor) { public CommandDecoder(ExecutorService executor, boolean decodeInExecutor) {
this.decodeInExecutor = decodeInExecutor; this.decodeInExecutor = decodeInExecutor;

@ -112,13 +112,13 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
return ds; return ds;
} }
public ListMultiDecoder(MultiDecoder<?> ... decoders) { public ListMultiDecoder(MultiDecoder<?>... decoders) {
this.decoders = decoders; this.decoders = decoders;
} }
private Integer fixedIndex; private Integer fixedIndex;
public ListMultiDecoder(Integer fixedIndex, MultiDecoder<?> ... decoders) { public ListMultiDecoder(Integer fixedIndex, MultiDecoder<?>... decoders) {
this.fixedIndex = fixedIndex; this.fixedIndex = fixedIndex;
this.decoders = decoders; this.decoders = decoders;
} }

@ -529,7 +529,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RFuture<Collection<RFuture<Void>>> future = addMasterEntry(newPart, cfg); RFuture<Collection<RFuture<Void>>> future = addMasterEntry(newPart, cfg);
future.onComplete((res, e) -> { future.onComplete((res, e) -> {
if (e == null) { if (e == null) {
futures.addAll(future.getNow()); futures.addAll(res);
} }
if (masters.decrementAndGet() == 0) { if (masters.decrementAndGet() == 0) {

@ -26,6 +26,7 @@ import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.compression.Snappy; import io.netty.handler.codec.compression.Snappy;
import io.netty.util.concurrent.FastThreadLocal;
/** /**
* Snappy compression codec. * Snappy compression codec.
@ -39,13 +40,13 @@ import io.netty.handler.codec.compression.Snappy;
*/ */
public class SnappyCodec extends BaseCodec { public class SnappyCodec extends BaseCodec {
private static final ThreadLocal<Snappy> snappyDecoder = new ThreadLocal<Snappy>() { private static final FastThreadLocal<Snappy> snappyDecoder = new FastThreadLocal<Snappy>() {
protected Snappy initialValue() { protected Snappy initialValue() {
return new Snappy(); return new Snappy();
}; };
}; };
private static final ThreadLocal<Snappy> snappyEncoder = new ThreadLocal<Snappy>() { private static final FastThreadLocal<Snappy> snappyEncoder = new FastThreadLocal<Snappy>() {
protected Snappy initialValue() { protected Snappy initialValue() {
return new Snappy(); return new Snappy();
}; };

@ -59,60 +59,60 @@ public interface CommandAsyncExecutor {
<V> V get(RFuture<V> RFuture); <V> V get(RFuture<V> RFuture);
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params);
<R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params); <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params);
<R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params); <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params); <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params);
<R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params); <R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params); <T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object... params);
<T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object ... params); <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object... params);
<T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String ... queueNames); <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames);
} }

@ -1253,7 +1253,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return objectBuilder; return objectBuilder;
} }
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String ... queueNames) { public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames) {
if (connectionManager.isClusterMode() && queueNames.length > 0) { if (connectionManager.isClusterMode() && queueNames.length > 0) {
RPromise<V> result = new RedissonPromise<V>(); RPromise<V> result = new RedissonPromise<V>();
AtomicReference<Iterator<String>> ref = new AtomicReference<Iterator<String>>(); AtomicReference<Iterator<String>> ref = new AtomicReference<Iterator<String>>();

@ -31,17 +31,17 @@ public interface CommandSyncExecutor {
<V> V get(RFuture<V> future); <V> V get(RFuture<V> future);
<T, R> R read(String key, RedisCommand<T> command, Object ... params); <T, R> R read(String key, RedisCommand<T> command, Object... params);
<T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params); <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params);
<T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params); <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
ConnectionManager getConnectionManager(); ConnectionManager getConnectionManager();

@ -38,34 +38,34 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
} }
@Override @Override
public <T, R> R read(String key, RedisCommand<T> command, Object ... params) { public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
return read(key, connectionManager.getCodec(), command, params); return read(key, connectionManager.getCodec(), command, params);
} }
@Override @Override
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
RFuture<R> res = readAsync(key, codec, command, params); RFuture<R> res = readAsync(key, codec, command, params);
return get(res); return get(res);
} }
@Override @Override
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params); return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
} }
@Override @Override
public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params); RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return get(res); return get(res);
} }
@Override @Override
public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params); return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
} }
@Override @Override
public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params); RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return get(res); return get(res);
} }

@ -52,7 +52,7 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
* @param addresses in <code>host:port</code> format * @param addresses in <code>host:port</code> format
* @return config * @return config
*/ */
public ClusterServersConfig addNodeAddress(String ... addresses) { public ClusterServersConfig addNodeAddress(String... addresses) {
for (String address : addresses) { for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address)); nodeAddresses.add(URIBuilder.create(address));
} }

@ -84,7 +84,7 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
* @param addresses of Redis * @param addresses of Redis
* @return config * @return config
*/ */
public MasterSlaveServersConfig addSlaveAddress(String ... addresses) { public MasterSlaveServersConfig addSlaveAddress(String... addresses) {
for (String address : addresses) { for (String address : addresses) {
slaveAddresses.add(URIBuilder.create(address)); slaveAddresses.add(URIBuilder.create(address));
} }

@ -60,7 +60,7 @@ public class ReplicatedServersConfig extends BaseMasterSlaveServersConfig<Replic
* @param addresses in <code>host:port</code> format * @param addresses in <code>host:port</code> format
* @return config * @return config
*/ */
public ReplicatedServersConfig addNodeAddress(String ... addresses) { public ReplicatedServersConfig addNodeAddress(String... addresses) {
for (String address : addresses) { for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address)); nodeAddresses.add(URIBuilder.create(address));
} }

@ -72,7 +72,7 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
* @param addresses of Redis * @param addresses of Redis
* @return config * @return config
*/ */
public SentinelServersConfig addSentinelAddress(String ... addresses) { public SentinelServersConfig addSentinelAddress(String... addresses) {
for (String address : addresses) { for (String address : addresses) {
sentinelAddresses.add(URIBuilder.create(address)); sentinelAddresses.add(URIBuilder.create(address));
} }

@ -232,52 +232,40 @@ public class MasterSlaveEntry {
return; return;
} }
System.out.println("reattachBlockingQueue " + connection); RFuture<RedisConnection> newConnectionFuture = connectionWriteOp(commandData.getCommand());
try { newConnectionFuture.onComplete((newConnection, e) -> {
RFuture<RedisConnection> newConnectionFuture = connectionWriteOp(commandData.getCommand()); if (e != null) {
System.out.println("newConnectionFuture " + newConnectionFuture); log.error("Can't resubscribe blocking queue " + commandData, e);
newConnectionFuture.onComplete((newConnection, e) -> { return;
if (e != null) { }
log.error("Can't resubscribe blocking queue " + commandData, e);
return; AtomicBoolean skip = new AtomicBoolean();
BiConsumer<Object, Throwable> listener = new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object t, Throwable u) {
if (skip.get()) {
return;
}
releaseWrite(newConnection);
} }
};
System.out.println("newConnectionFuture1 " + connection); commandData.getPromise().onComplete(listener);
AtomicBoolean skip = new AtomicBoolean(); if (commandData.getPromise().isDone()) {
BiConsumer<Object, Throwable> listener = new BiConsumer<Object, Throwable>() { return;
@Override }
public void accept(Object t, Throwable u) { ChannelFuture channelFuture = newConnection.send(commandData);
if (skip.get()) { channelFuture.addListener(new ChannelFutureListener() {
return; @Override
} public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
listener.accept(null, null);
skip.set(true);
releaseWrite(newConnection); releaseWrite(newConnection);
log.error("Can't resubscribe blocking queue {}", commandData);
} }
};
commandData.getPromise().onComplete(listener);
if (commandData.getPromise().isDone()) {
return;
} }
System.out.println("newConnectionFuture2 " + connection);
ChannelFuture channelFuture = newConnection.send(commandData);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
listener.accept(null, null);
skip.set(true);
releaseWrite(newConnection);
log.error("Can't resubscribe blocking queue {}", commandData);
} else {
System.out.println("resubscribed " + connection);
}
}
});
}); });
});
} catch (Exception e2) {
e2.printStackTrace();
// TODO: handle exception
}
} }
public boolean hasSlave(RedisClient redisClient) { public boolean hasSlave(RedisClient redisClient) {

@ -378,7 +378,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
return value; return value;
} }
private <T, R> R write(String key, RedisCommand<T> command, Object ... params) { private <T, R> R write(String key, RedisCommand<T> command, Object... params) {
RFuture<R> future = commandExecutor.writeAsync(key, command, params); RFuture<R> future = commandExecutor.writeAsync(key, command, params);
try { try {
return get(future); return get(future);
@ -387,7 +387,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
} }
} }
private <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { private <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> future = commandExecutor.evalWriteAsync(key, codec, evalCommandType, script, keys, params); RFuture<R> future = commandExecutor.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
try { try {
return get(future); return get(future);
@ -396,7 +396,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
} }
} }
private <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) { private <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> future = commandExecutor.evalReadAsync(key, codec, evalCommandType, script, keys, params); RFuture<R> future = commandExecutor.evalReadAsync(key, codec, evalCommandType, script, keys, params);
try { try {
return get(future); return get(future);

@ -28,7 +28,7 @@ public class CompositeIterable<T> implements Iterable<T> {
this.iterablesList = iterables; this.iterablesList = iterables;
} }
public CompositeIterable(Iterable<T> ... iterables) { public CompositeIterable(Iterable<T>... iterables) {
this.iterables = iterables; this.iterables = iterables;
} }

Loading…
Cancel
Save