Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java
#	redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java
pull/1303/head
Nikita 7 years ago
commit aafae001c6

@ -87,7 +87,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>4.1.13.Final</version>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>

@ -34,33 +34,33 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.15.Final</version>
<version>4.1.16.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.15.Final</version>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.1.15.Final</version>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.15.Final</version>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.1.15.Final</version>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.15.Final</version>
<version>4.1.16.Final</version>
</dependency>
<dependency>
@ -138,7 +138,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>[3.1,4.3)</version>
<version>[3.1,)</version>
<scope>test</scope>
</dependency>
@ -238,14 +238,14 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>[3.1,4.3)</version>
<version>[3.1,)</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>[3.1,4.3)</version>
<version>[3.1,)</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

@ -15,10 +15,10 @@
*/
package org.redisson;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicLongAsync;
import org.redisson.api.RBatch;
@ -64,6 +64,10 @@ public class RedissonBatch implements RBatch {
private int retryAttempts;
private long retryInterval;
private int syncSlaves;
private long syncTimeout;
private boolean skipResult;
protected RedissonBatch(UUID id, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
@ -230,6 +234,19 @@ public class RedissonBatch implements RBatch {
return new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
}
@Override
public RBatch syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
return this;
}
@Override
public RBatch skipResult() {
this.skipResult = true;
return this;
}
@Override
public RBatch retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
@ -249,23 +266,23 @@ public class RedissonBatch implements RBatch {
}
@Override
public List<?> execute() {
return executorService.execute(timeout, retryAttempts, retryInterval);
public BatchResult<?> execute() {
return executorService.execute(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
@Override
public void executeSkipResult() {
executorService.executeSkipResult(timeout, retryAttempts, retryInterval);
executorService.execute(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval);
}
@Override
public RFuture<Void> executeSkipResultAsync() {
return executorService.executeSkipResultAsync(timeout, retryAttempts, retryInterval);
return executorService.executeAsync(syncSlaves, syncTimeout, true, timeout, retryAttempts, retryInterval);
}
@Override
public RFuture<List<?>> executeAsync() {
return executorService.executeAsync(timeout, retryAttempts, retryInterval);
public RFuture<BatchResult<?>> executeAsync() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
@Override

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
@ -295,7 +296,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
RFuture<List<?>> clientsFuture = send(timeout, responseName,
RFuture<BatchResult<?>> clientsFuture = send(timeout, responseName,
responseHolder.get());
clientsFuture.addListener(new FutureListener<List<?>>() {
@Override
@ -324,7 +325,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
}
private <T extends RRemoteServiceResponse> RFuture<List<?>> send(long timeout, String responseName, T response) {
private <T extends RRemoteServiceResponse> RFuture<BatchResult<?>> send(long timeout, String responseName, T response) {
RBatch batch = redisson.createBatch();
RBlockingQueueAsync<T> queue = batch.getBlockingQueue(responseName, getCodec());
queue.putAsync(response);

@ -0,0 +1,242 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
/**
*
* @author Nikita Koksharov
*
*/
public class BatchResult<E> implements List<E> {
private final List<E> responses;
private final int syncedSlaves;
public BatchResult(List<E> responses, int syncedSlaves) {
super();
this.responses = responses;
this.syncedSlaves = syncedSlaves;
}
/**
* Returns list with result object for each command
*
* @return list
*/
public List<?> getResponses() {
return responses;
}
/**
* Returns synchronized slaves amount involved during batch execution
*
* @return slaves
*/
public int getSyncedSlaves() {
return syncedSlaves;
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public int size() {
return responses.size();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean isEmpty() {
return responses.isEmpty();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean contains(Object o) {
return responses.contains(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public Iterator<E> iterator() {
return responses.iterator();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public Object[] toArray() {
return responses.toArray();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public <T> T[] toArray(T[] a) {
return responses.toArray(a);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean add(E e) {
return responses.add(e);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean remove(Object o) {
return responses.remove(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean containsAll(Collection<?> c) {
return responses.containsAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean addAll(Collection<? extends E> c) {
return responses.addAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean addAll(int index, Collection<? extends E> c) {
return responses.addAll(index, c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean removeAll(Collection<?> c) {
return responses.removeAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public boolean retainAll(Collection<?> c) {
return responses.retainAll(c);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public void clear() {
responses.clear();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public E get(int index) {
return responses.get(index);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public E set(int index, E element) {
return responses.set(index, element);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public void add(int index, E element) {
responses.add(index, element);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public E remove(int index) {
return responses.remove(index);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public int indexOf(Object o) {
return responses.indexOf(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public int lastIndexOf(Object o) {
return responses.lastIndexOf(o);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public ListIterator<E> listIterator() {
return responses.listIterator();
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public ListIterator<E> listIterator(int index) {
return responses.listIterator(index);
}
/**
* Use {@link #getResponses()}
*/
@Deprecated
public List<E> subList(int fromIndex, int toIndex) {
return responses.subList(fromIndex, toIndex);
}
}

@ -15,7 +15,6 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisException;
@ -387,7 +386,7 @@ public interface RBatch {
* @throws RedisException in case of any error
*
*/
List<?> execute() throws RedisException;
BatchResult<?> execute() throws RedisException;
/**
* Executes all operations accumulated during async methods invocations asynchronously.
@ -397,36 +396,42 @@ public interface RBatch {
*
* @return List with result object for each command
*/
RFuture<List<?>> executeAsync();
RFuture<BatchResult<?>> executeAsync();
/**
* Executes all operations accumulated during async methods invocations.
* Command replies are skipped such approach saves response bandwidth.
* <p>
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed.
* <p>
* NOTE: Redis 3.2+ required
*
* @throws RedisException in case of any error
*
* Use {@link #skipResult()}
*/
@Deprecated
void executeSkipResult();
/**
* Executes all operations accumulated during async methods invocations asynchronously,
* Command replies are skipped such approach saves response bandwidth.
* <p>
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed
* Use {@link #skipResult()}
*/
@Deprecated
RFuture<Void> executeSkipResultAsync();
/**
* Inform Redis not to send reply for this batch.
* Such approach saves response bandwidth.
* <p>
* NOTE: Redis 3.2+ required
*
* @return void
* @throws RedisException in case of any error
* @return self instance
*/
RBatch skipResult();
/**
* Synchronize write operations execution across defined amount
* of Redis slave nodes within defined timeout.
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves amount to sync
* @param timeout for sync operation
* @param unit value
* @return self instance
*/
RFuture<Void> executeSkipResultAsync();
RBatch syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.
@ -443,7 +448,7 @@ public interface RBatch {
RBatch timeout(long timeout, TimeUnit unit);
/**
* Defines time interval for another one attempt send Redis commands batch
* Defines time interval for each attempt to send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryInterval</code> value instead.

@ -15,7 +15,7 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
@ -249,6 +249,68 @@ public interface RBatchReactive {
*
* @return List with result object for each command
*/
Publisher<List<?>> execute();
Publisher<BatchResult<?>> execute();
/**
* Command replies are skipped such approach saves response bandwidth.
* <p>
* NOTE: Redis 3.2+ required
*
* @return self instance
*/
RBatchReactive skipResult();
/**
*
* <p>
* NOTE: Redis 3.0+ required
*
* @param slaves number to sync
* @param timeout for sync operation
* @param unit value
* @return self instance
*/
RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit);
/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* <p>
* <code>0</code> value means use <code>Config.setTimeout</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param timeout value
* @param unit value
* @return self instance
*/
RBatchReactive timeout(long timeout, TimeUnit unit);
/**
* Defines time interval for another one attempt send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryInterval</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryInterval value
* @param unit value
* @return self instance
*/
RBatchReactive retryInterval(long retryInterval, TimeUnit unit);
/**
* Defines attempts amount to re-send Redis commands batch
* if it hasn't been sent already.
* <p>
* <code>0</code> value means use <code>Config.setRetryAttempts</code> value instead.
* <p>
* Default is <code>0</code>
*
* @param retryAttempts value
* @return self instance
*/
RBatchReactive retryAttempts(int retryAttempts);
}

@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.codec.Codec;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
/**
*
@ -33,6 +34,10 @@ public class BatchCommandData<T, R> extends CommandData<T, R> implements Compara
private final int index;
private final AtomicReference<RedisRedirectException> redirectError = new AtomicReference<RedisRedirectException>();
public BatchCommandData(RedisCommand<T> command, Object[] params, int index) {
this(new RedissonPromise<R>(), null, command, params, index);
}
public BatchCommandData(RPromise<R> promise, Codec codec, RedisCommand<T> command, Object[] params, int index) {
super(promise, codec, command, params);
this.index = index;

@ -93,6 +93,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor());
RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor());
RedisStrictCommand<Integer> WAIT = new RedisStrictCommand<Integer>("WAIT", new IntegerReplayConvertor());
RedisStrictCommand<Void> CLIENT_REPLY = new RedisStrictCommand<Void>("CLIENT", "REPLY", new VoidReplayConvertor());
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());

@ -864,109 +864,149 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
if (res instanceof List) {
List<Object> r = (List<Object>) res;
try {
mainPromise.trySuccess(tryHandleReference(res));
} catch (Exception e) {
//fall back and let other part of the code handle the type conversion.
mainPromise.trySuccess(res);
}
}
protected <T> T tryHandleReference(T o) {
boolean hasConversion = false;
if (o instanceof List) {
List<Object> r = (List<Object>) o;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
r.set(i, fromReference(r.get(i)));
} else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) r.get(i));
se = new ScoredEntry(se.getScore(), fromReference(se.getValue()));
r.set(i, se);
Object ref = tryHandleReference0(r.get(i));
if (ref != r.get(i)) {
r.set(i, ref);
}
}
mainPromise.trySuccess(res);
} else if (res instanceof Set) {
Set r = (Set) res;
LinkedHashSet converted = new LinkedHashSet();
for (Object o : r) {
if (o instanceof RedissonReference) {
converted.add(fromReference(o));
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) o);
se = new ScoredEntry(se.getScore(), fromReference(se.getValue()));
converted.add(se);
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = old.getKey();
if (key instanceof RedissonReference) {
key = fromReference(key);
}
Object value = old.getValue();
if (value instanceof RedissonReference) {
value = fromReference(value);
}
converted.add(new AbstractMap.SimpleEntry(key, value));
return o;
} else if (o instanceof Set) {
Set set, r = (Set) o;
boolean useNewSet = o instanceof LinkedHashSet;
try {
set = (Set) o.getClass().getConstructor().newInstance();
} catch (Exception exception) {
set = new LinkedHashSet();
}
for (Object i : r) {
Object ref = tryHandleReference0(i);
//Not testing for ref changes because r.add(ref) below needs to
//fail on the first iteration to be able to perform fall back
//if failure happens.
//
//Assuming the failure reason is systematic such as put method
//is not supported or implemented, and not an occasional issue
//like only one element fails.
if (useNewSet) {
set.add(ref);
} else {
converted.add(o);
try {
r.add(ref);
set.add(i);
} catch (Exception e) {
//r is not supporting add operation, like
//LinkedHashMap$LinkedEntrySet and others.
//fall back to use a new set.
useNewSet = true;
set.add(ref);
}
}
hasConversion |= ref != i;
}
mainPromise.trySuccess((R) converted);
} else if (res instanceof Map) {
Map<Object, Object> map = (Map<Object, Object>) res;
LinkedHashMap<Object, Object> converted = new LinkedHashMap<Object, Object>();
for (Map.Entry<Object, Object> e : map.entrySet()) {
Object value = e.getValue();
if (e.getValue() instanceof RedissonReference) {
value = fromReference(e.getValue());
}
Object key = e.getKey();
if (e.getKey() instanceof RedissonReference) {
key = fromReference(e.getKey());
}
converted.put(key, value);
if (!hasConversion) {
return o;
} else if (useNewSet) {
return (T) set;
} else if (!set.isEmpty()) {
r.removeAll(set);
}
mainPromise.trySuccess((R) converted);
} else if (res instanceof ListScanResult) {
List<ScanObjectEntry> r = ((ListScanResult) res).getValues();
for (int i = 0; i < r.size(); i++) {
Object obj = r.get(i);
if (!(obj instanceof ScanObjectEntry)) {
break;
}
ScanObjectEntry e = r.get(i);
if (e.getObj() instanceof RedissonReference) {
r.set(i, new ScanObjectEntry(e.getBuf(), fromReference(e.getObj())));
} else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry<?>) e.getObj()).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) e.getObj());
se = new ScoredEntry(se.getScore(), fromReference(se.getValue()));
r.set(i, new ScanObjectEntry(e.getBuf(), se));
return o;
} else if (o instanceof Map) {
Map<Object, Object> map, r = (Map<Object, Object>) o;
boolean useNewMap = o instanceof LinkedHashMap;
try {
map = (Map) o.getClass().getConstructor().newInstance();
} catch (Exception e) {
map = new LinkedHashMap();
}
for (Map.Entry<Object, Object> e : r.entrySet()) {
Map.Entry<Object, Object> ref = tryHandleReference0(e);
//Not testing for ref changes because r.put(ref.getKey(), ref.getValue())
//below needs to fail on the first iteration to be able to
//perform fall back if failure happens.
//
//Assuming the failure reason is systematic such as put method
//is not supported or implemented, and not an occasional issue
//like only one element fails.
if (useNewMap) {
map.put(ref.getKey(), ref.getValue());
} else {
try {
r.put(ref.getKey(), ref.getValue());
if (e.getKey() != ref.getKey()) {
map.put(e.getKey(), e.getValue());
}
} catch (Exception ex) {
//r is not supporting put operation. fall back to use
//a new map.
useNewMap = true;
map.put(ref.getKey(), ref.getValue());
}
}
hasConversion |= ref != e;
}
mainPromise.trySuccess(res);
} else if (res instanceof MapScanResult) {
MapScanResult scanResult = (MapScanResult) res;
Map<ScanObjectEntry, ScanObjectEntry> map = ((MapScanResult) res).getMap();
LinkedHashMap<ScanObjectEntry, ScanObjectEntry> converted = new LinkedHashMap<ScanObjectEntry, ScanObjectEntry>();
boolean hasConversion = false;
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : map.entrySet()) {
ScanObjectEntry value = e.getValue();
if (e.getValue().getObj() instanceof RedissonReference) {
value = new ScanObjectEntry(e.getValue().getBuf(), fromReference(e.getValue().getObj()));
hasConversion = true;
}
ScanObjectEntry key = e.getKey();
if (e.getKey().getObj() instanceof RedissonReference) {
key = new ScanObjectEntry(e.getKey().getBuf(), fromReference(e.getKey().getObj()));
hasConversion = true;
}
converted.put(key, value);
if (!hasConversion) {
return o;
} else if (useNewMap) {
return (T) map;
} else if (!map.isEmpty()) {
r.keySet().removeAll(map.keySet());
}
if (hasConversion) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> newScanResult = new MapScanResult<ScanObjectEntry, ScanObjectEntry>(scanResult.getPos(), converted);
return o;
} else if (o instanceof ListScanResult) {
tryHandleReference(((ListScanResult) o).getValues());
return o;
} else if (o instanceof MapScanResult) {
MapScanResult scanResult = (MapScanResult) o;
Map oldMap = ((MapScanResult) o).getMap();
Map map = tryHandleReference(oldMap);
if (map != oldMap) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> newScanResult
= new MapScanResult<ScanObjectEntry, ScanObjectEntry>(scanResult.getPos(), map);
newScanResult.setRedisClient(scanResult.getRedisClient());
mainPromise.trySuccess((R) newScanResult);
return (T) newScanResult;
} else {
mainPromise.trySuccess((R) res);
}
} else if (res instanceof RedissonReference) {
try {
mainPromise.trySuccess(this.<R>fromReference(res));
} catch (Exception exception) {
mainPromise.trySuccess(res);//fallback
return o;
}
} else {
mainPromise.trySuccess(res);
return tryHandleReference0(o);
}
}
private <T> T tryHandleReference0(T o) {
if (o instanceof RedissonReference) {
return fromReference(o);
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) o);
return (T) new ScoredEntry(se.getScore(), fromReference(se.getValue()));
} else if (o instanceof ScanObjectEntry) {
ScanObjectEntry keyScan = (ScanObjectEntry) o;
Object obj = tryHandleReference0(keyScan.getObj());
return obj != keyScan.getObj() ? (T) new ScanObjectEntry(keyScan.getBuf(), obj) : o;
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = tryHandleReference0(old.getKey());
Object value = tryHandleReference0(old.getValue());
return value != old.getValue() || key != old.getKey()
? (T) new AbstractMap.SimpleEntry(key, value)
: o;
} else {
return o;
}
}

@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
@ -47,6 +48,7 @@ import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -129,114 +131,114 @@ public class CommandBatchService extends CommandAsyncService {
entry.getCommands().add(commandData);
}
public List<?> execute() {
return get(executeAsync(0, 0, 0));
public BatchResult<?> execute() {
RFuture<BatchResult<?>> f = executeAsync(0, 0, false, 0, 0, 0);
return get(f);
}
public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
public BatchResult<?> execute(int syncSlaves, long syncTimeout, boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
RFuture<BatchResult<?>> f = executeAsync(syncSlaves, syncTimeout, noResult, responseTimeout, retryAttempts, retryInterval);
return get(f);
}
public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false, 0, 0, 0);
final RedissonPromise<Void> promise = new RedissonPromise<Void>();
RFuture<BatchResult<?>> res = executeAsync(0, 0, false, 0, 0, 0);
res.addListener(new FutureListener<BatchResult<?>>() {
@Override
public void operationComplete(Future<BatchResult<?>> future) throws Exception {
if (future.isSuccess()) {
promise.trySuccess(null);
} else {
promise.tryFailure(future.cause());
}
}
});
return promise;
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, false, 0, 0, 0);
}
protected RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
public <R> RFuture<R> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
return RedissonPromise.newSucceededFuture(null);
}
executed = true;
if (noResult) {
if (skipResult) {
for (Entry entry : commands.values()) {
RPromise<Object> s = connectionManager.newPromise();
BatchCommandData<?, ?> offCommand = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
BatchCommandData<?, ?> offCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(offCommand);
RPromise<Object> s1 = connectionManager.newPromise();
BatchCommandData<?, ?> onCommand = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
BatchCommandData<?, ?> onCommand = new BatchCommandData(RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.getCommands().add(onCommand);
}
}
executed = true;
RPromise<Void> voidPromise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
commands = null;
if (syncSlaves > 0) {
for (Entry entry : commands.values()) {
BatchCommandData<?, ?> waitCommand = new BatchCommandData(RedisCommands.WAIT, new Object[] { syncSlaves, syncTimeout }, index.incrementAndGet());
entry.getCommands().add(waitCommand);
}
});
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true, responseTimeout, retryAttempts, retryInterval);
}
return voidPromise;
}
public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, 0);
}
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
}
executed = true;
RPromise<Void> voidPromise = connectionManager.newPromise();
final RPromise<List<?>> promise = connectionManager.newPromise();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
RPromise<R> resultPromise;
RPromise<Void> voidPromise = new RedissonPromise<Void>();
if (skipResult) {
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
commands = null;
return;
}
});
resultPromise = (RPromise<R>) voidPromise;
} else {
final RPromise<Object> promise = new RedissonPromise<Object>();
voidPromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
commands = null;
return;
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> result = new ArrayList<Object>(entries.size());
for (BatchCommandData<?, ?> commandEntry : entries) {
Object entryResult = commandEntry.getPromise().getNow();
if (isRedissonReferenceSupportEnabled() && entryResult instanceof RedissonReference) {
result.add(redisson != null
? RedissonObjectFactory.<Object>fromReference(redisson, (RedissonReference) entryResult)
: RedissonObjectFactory.<Object>fromReference(redissonReactive, (RedissonReference) entryResult));
} else {
result.add(entryResult);
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<Object>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (!isWaitCommand(commandEntry)) {
Object entryResult = commandEntry.getPromise().getNow();
entryResult = tryHandleReference(entryResult);
responses.add(entryResult);
} else {
syncedSlaves = (Integer) commandEntry.getPromise().getNow();
}
}
BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves);
promise.trySuccess(result);
commands = null;
}
promise.trySuccess(result);
commands = null;
}
});
});
resultPromise = (RPromise<R>) promise;
}
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false, responseTimeout, retryAttempts, retryInterval);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, skipResult, responseTimeout, retryAttempts, retryInterval);
}
return promise;
return resultPromise;
}
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots,
@ -252,7 +254,7 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
final RPromise<Void> attemptPromise = connectionManager.newPromise();
final RPromise<Void> attemptPromise = new RedissonPromise<Void>();
final AsyncDetails details = new AsyncDetails();
@ -469,11 +471,11 @@ public class CommandBatchService extends CommandAsyncService {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
if (source.getRedirect() == Redirect.ASK) {
RPromise<Void> promise = connectionManager.newPromise();
RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
}
for (BatchCommandData<?, ?> c : entry.getCommands()) {
if (c.getPromise().isSuccess()) {
if (c.getPromise().isSuccess() && !isWaitCommand(c)) {
// skip successful commands
continue;
}
@ -493,4 +495,8 @@ public class CommandBatchService extends CommandAsyncService {
releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise, details);
}
protected boolean isWaitCommand(BatchCommandData<?, ?> c) {
return c.getCommand().getName().equals(RedisCommands.WAIT.getName());
}
}

@ -15,14 +15,12 @@
*/
package org.redisson.command;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
@ -32,7 +30,6 @@ import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoProcessor;
/**
*
@ -62,81 +59,12 @@ public class CommandReactiveBatchService extends CommandReactiveService {
batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt);
}
public List<?> execute() {
return get(executeAsync(0, 0, 0));
}
public List<?> execute(long responseTimeout, int retryAttempts, long retryInterval) {
return get(executeAsync(responseTimeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false, 0, 0, 0);
}
private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
Flux.from(publisher).subscribe();
// publisher.subscribe(new Subscriber<Object>() {
//
// @Override
// public void onSubscribe(Subscription s) {
// s.request(1);
// }
//
// @Override
// public void onError(Throwable t) {
// }
//
// @Override
// public void onComplete() {
// }
//
// @Override
// public void onNext(Object t) {
// }
// });
}
return batchService.executeAsyncVoid(noResult, responseTimeout, retryAttempts, retryInterval);
}
public void executeSkipResult(long timeout, int retryAttempts, long retryInterval) {
get(executeSkipResultAsync(timeout, retryAttempts, retryInterval));
}
public RFuture<Void> executeSkipResultAsync(long timeout, int retryAttempts, long retryInterval) {
return executeAsyncVoid(true, timeout, retryAttempts, retryInterval);
}
public RFuture<List<?>> executeAsync() {
return executeAsync(0, 0, 0);
}
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
public RFuture<BatchResult<?>> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
Flux.from(publisher).subscribe();
// publisher.subscribe(new Subscriber<Object>() {
//
// @Override
// public void onSubscribe(Subscription s) {
// s.request(1);
// }
//
// @Override
// public void onError(Throwable t) {
// }
//
// @Override
// public void onComplete() {
// }
//
// @Override
// public void onNext(Object t) {
// }
// });
}
return batchService.executeAsync(responseTimeout, retryAttempts, retryInterval);
return batchService.executeAsync(syncSlaves, syncTimeout, skipResult, responseTimeout, retryAttempts, retryInterval);
}
@Override

@ -15,10 +15,11 @@
*/
package org.redisson.reactive;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBitSetReactive;
@ -56,6 +57,14 @@ public class RedissonBatchReactive implements RBatchReactive {
private final CommandReactiveBatchService executorService;
private final CommandReactiveService commandExecutor;
private long timeout;
private int retryAttempts;
private long retryInterval;
private int syncSlaves;
private long syncTimeout;
private boolean skipResult;
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandReactiveBatchService(connectionManager);
@ -208,15 +217,46 @@ public class RedissonBatchReactive implements RBatchReactive {
}
@Override
public Publisher<List<?>> execute() {
return commandExecutor.reactive(new Supplier<RFuture<List<?>>>() {
public Publisher<BatchResult<?>> execute() {
return commandExecutor.reactive(new Supplier<RFuture<BatchResult<?>>>() {
@Override
public RFuture<List<?>> get() {
return executorService.executeAsync();
public RFuture<BatchResult<?>> get() {
return executorService.executeAsync(syncSlaves, syncTimeout, skipResult, timeout, retryAttempts, retryInterval);
}
});
}
@Override
public RBatchReactive syncSlaves(int slaves, long timeout, TimeUnit unit) {
this.syncSlaves = slaves;
this.syncTimeout = unit.toMillis(timeout);
return this;
}
@Override
public RBatchReactive skipResult() {
this.skipResult = true;
return this;
}
@Override
public RBatchReactive retryAttempts(int retryAttempts) {
this.retryAttempts = retryAttempts;
return this;
}
@Override
public RBatchReactive retryInterval(long retryInterval, TimeUnit unit) {
this.retryInterval = unit.toMillis(retryInterval);
return this;
}
@Override
public RBatchReactive timeout(long timeout, TimeUnit unit) {
this.timeout = unit.toMillis(timeout);
return this;
}
public void enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
this.executorService.enableRedissonReferenceSupport(redissonReactive);
}

@ -2,6 +2,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -14,14 +15,20 @@ import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.BatchResult;
import org.redisson.api.RBatch;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.redisson.api.RScript.Mode;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
public class RedissonBatchTest extends BaseTest {
@ -43,6 +50,40 @@ public class RedissonBatchTest extends BaseTest {
System.out.println(t);
}
@Test
public void testSyncSlaves() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBatch batch = redisson.createBatch();
for (int i = 0; i < 100; i++) {
RMapAsync<String, String> map = batch.getMap("test");
map.putAsync("" + i, "" + i);
}
batch.syncSlaves(1, 1, TimeUnit.SECONDS);
BatchResult<?> result = batch.execute();
assertThat(result.getSyncedSlaves()).isEqualTo(1);
process.shutdown();
}
@Test
public void testWriteTimeout() {
RBatch batch = redisson.createBatch();
@ -62,7 +103,8 @@ public class RedissonBatchTest extends BaseTest {
batch.getBucket("A3").setAsync("001");
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
batch.executeSkipResult();
batch.skipResult();
batch.execute();
assertThat(redisson.getBucket("A1").isExists()).isFalse();
assertThat(redisson.getBucket("A3").isExists()).isTrue();
@ -76,7 +118,7 @@ public class RedissonBatchTest extends BaseTest {
batch.getBucket("A3").setAsync("001");
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
List result = batch.execute();
batch.execute();
}
@Test
@ -99,8 +141,8 @@ public class RedissonBatchTest extends BaseTest {
for (int i = 1; i < 540; i++) {
listAsync.addAsync(i);
}
List<?> res = b.execute();
Assert.assertEquals(539, res.size());
BatchResult<?> res = b.execute();
Assert.assertEquals(539, res.getResponses().size());
}
@Test
@ -113,8 +155,8 @@ public class RedissonBatchTest extends BaseTest {
batch.getAtomicLong("counter").incrementAndGetAsync();
batch.getAtomicLong("counter").incrementAndGetAsync();
}
List<?> res = batch.execute();
Assert.assertEquals(210*5, res.size());
BatchResult<?> res = batch.execute();
Assert.assertEquals(210*5, res.getResponses().size());
}
@Test(expected=RedisException.class)

@ -130,12 +130,12 @@ public class RedissonExecutorServiceTest extends BaseTest {
RExecutorFuture<?> future = executor.submit(new ScheduledLongRunnableTask("executed1"));
Thread.sleep(2000);
cancel(future);
assertThat(redisson.<Integer>getBucket("executed1").get()).isBetween(1000, Integer.MAX_VALUE);
assertThat(redisson.<Long>getBucket("executed1").get()).isBetween(1000L, Long.MAX_VALUE);
RExecutorFuture<?> futureAsync = executor.submitAsync(new ScheduledLongRunnableTask("executed2"));
Thread.sleep(2000);
assertThat(executor.cancelTask(futureAsync.getTaskId())).isTrue();
assertThat(redisson.<Integer>getBucket("executed2").get()).isBetween(1000, Integer.MAX_VALUE);
assertThat(redisson.<Long>getBucket("executed2").get()).isBetween(1000L, Long.MAX_VALUE);
}
@Test

@ -19,7 +19,7 @@ public class ScheduledLongRunnableTask implements Runnable {
@Override
public void run() {
for (int i = 0; i < Long.MAX_VALUE; i++) {
for (long i = 0; i < Long.MAX_VALUE; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("interrupted " + i);
redisson.getBucket(objectName).set(i);

Loading…
Cancel
Save