Merge branch 'master' of github.com:redisson/redisson

pull/1071/head
Nikita 7 years ago
commit ad2b010574

@ -71,6 +71,9 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.util.AbstractMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
/**
*
@ -111,17 +114,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
return this;
}
@Override
public boolean isRedissonReferenceSupportEnabled() {
return redisson != null || redissonReactive != null;
}
@Override
public void syncSubscription(RFuture<?> future) {
MasterSlaveServersConfig config = connectionManager.getConfig();
try {
int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
if (!future.await(timeout)) {
throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)");
}
@ -130,7 +133,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
future.syncUninterruptibly();
}
@Override
public <V> V get(RFuture<V> future) {
if (!future.isDone()) {
@ -141,7 +144,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
l.countDown();
}
});
boolean interrupted = false;
while (!future.isDone()) {
try {
@ -150,7 +153,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
@ -176,16 +179,16 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
return l.await(timeout, timeoutUnit);
}
@Override
public <T, R> RFuture<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0);
@ -193,7 +196,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final List<R> results = new ArrayList<R>();
@ -205,18 +208,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
mainPromise.tryFailure(future.cause());
return;
}
R result = future.getNow();
if (result instanceof Collection) {
synchronized (results) {
results.addAll((Collection)result);
results.addAll((Collection) result);
}
} else {
synchronized (results) {
results.add(result);
}
}
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.trySuccess(results);
@ -233,7 +236,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readRandomAsync(RedisCommand<T> command, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise();
final List<MasterSlaveEntry> nodes = new ArrayList<MasterSlaveEntry>(connectionManager.getEntrySet());
Collections.shuffle(nodes);
@ -269,21 +272,21 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
public <T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object... params) {
return writeAllAsync(command, null, params);
}
@Override
public <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
public <R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
return allAsync(false, command, callback, params);
}
@Override
public <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
public <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params) {
return allAsync(true, command, callback, params);
}
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object ... params) {
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size());
@ -317,9 +320,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
public <V> RedisException convertException(RFuture<V> future) {
return future.cause() instanceof RedisException ?
(RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause());
return future.cause() instanceof RedisException
? (RedisException) future.cause()
: new RedisException("Unexpected exception while processing command", future.cause());
}
private NodeSource getNodeSource(String key) {
@ -329,88 +332,86 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0);
return mainPromise;
}
public <T, R> RFuture<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readAsync(Integer slot, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(true, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
async(false, new NodeSource(slot), codec, command, params, mainPromise, 0);
return mainPromise;
}
@Override
public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> readAsync(String key, RedisCommand<T> command, Object... params) {
return readAsync(key, connectionManager.getCodec(), command, params);
}
@Override
public <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalReadAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
int slot = connectionManager.calcSlot(key);
return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
public <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params);
}
public <T, R> RFuture<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalWriteAsync(Integer slot, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params);
}
@Override
public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
return evalAllAsync(false, command, callback, script, keys, params);
}
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params) {
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(entries.size());
@ -425,7 +426,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
callback.onSlotResult(future.getNow());
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
&& !mainPromise.isDone()) {
mainPromise.trySuccess(callback.onFinish());
}
}
@ -444,7 +445,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return mainPromise;
}
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
@ -456,12 +457,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object... params) {
return writeAsync(key, connectionManager.getCodec(), command, params);
}
@Override
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = connectionManager.newPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0);
@ -469,7 +470,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) {
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) {
if (mainPromise.isCancelled()) {
free(params);
return;
@ -481,7 +482,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return;
}
final AsyncDetails<V, R> details = AsyncDetails.acquire();
if (isRedissonReferenceSupportEnabled()) {
try {
@ -522,7 +522,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
};
final TimerTask retryTimerTask = new TimerTask() {
@Override
@ -550,7 +550,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setTimeout(timeout);
return;
}
if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) {
return;
}
@ -591,30 +591,30 @@ public class CommandAsyncService implements CommandAsyncExecutor {
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
details.setupMainPromiseListener(mainPromiseListener);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (connFuture.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
details.setException(convertException(connectionFuture));
return;
}
if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {
releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
return;
}
final RedisConnection connection = connFuture.getNow();
if (details.getSource().getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
RPromise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
@ -627,7 +627,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -658,13 +658,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
ReferenceCountUtil.safeRelease(obj);
}
}
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (future.isCancelled() || details.getAttemptPromise().isDone()) {
return;
}
if (!future.isSuccess()) {
log.trace("Can't write {} to {}", details.getCommand(), connection);
details.setException(new WriteRedisConnectionException(
@ -676,7 +676,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
details.getTimeout().cancel();
long timeoutTime = connectionManager.getConfig().getTimeout();
if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) {
Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
@ -684,7 +684,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (popTimeout == 0) {
return;
}
timeoutTime += popTimeout*1000;
timeoutTime += popTimeout * 1000;
// add 1 second due to issue https://github.com/antirez/redis/issues/874
timeoutTime += 1000;
}
@ -710,7 +710,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown"));
}
};
final Timeout scheduledFuture;
if (popTimeout != 0) {
// to handle cases when connection has been lost
@ -720,11 +720,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public void run(Timeout timeout) throws Exception {
// re-connection hasn't been made
// and connection is still active
if (orignalChannel == connection.getChannel()
if (orignalChannel == connection.getChannel()
&& connection.isActive()) {
return;
}
if (details.getAttemptPromise().trySuccess(null)) {
connection.forceFastReconnectAsync();
}
@ -733,7 +733,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else {
scheduledFuture = null;
}
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
@ -756,13 +756,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
});
return;
}
if (future.cause() instanceof RedissonShutdownException) {
details.getAttemptPromise().tryFailure(future.cause());
}
}
});
synchronized (listener) {
if (!details.getMainPromise().isDone()) {
connectionManager.getShutdownPromise().addListener(listener);
@ -771,14 +771,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
protected <V, R> void releaseConnection(final NodeSource source, final RFuture<RedisConnection> connectionFuture,
final boolean isReadOnly, RPromise<R> attemptPromise, final AsyncDetails<V, R> details) {
final boolean isReadOnly, RPromise<R> attemptPromise, final AsyncDetails<V, R> details) {
attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!connectionFuture.isSuccess()) {
return;
}
RedisConnection connection = connectionFuture.getNow();
connectionManager.getShutdownLatch().release();
if (isReadOnly) {
@ -786,7 +786,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} else {
connectionManager.releaseWrite(source, connection);
}
if (log.isDebugEnabled()) {
log.debug("connection released for command {} and params {} from slot {} using connection {}",
details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection);
@ -805,7 +805,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.removeMainPromiseListener();
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
RedisMovedException ex = (RedisMovedException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
@ -813,7 +813,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
AsyncDetails.release(details);
@ -826,14 +826,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt());
}
}, 1, TimeUnit.SECONDS);
AsyncDetails.release(details);
@ -841,7 +841,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
free(details);
if (future.isSuccess()) {
R res = future.getNow();
if (res instanceof RedisClientResult) {
@ -849,9 +849,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (addr == null) {
addr = details.getConnectionFuture().getNow().getRedisClient().getAddr();
}
((RedisClientResult)res).setRedisClient(addr);
((RedisClientResult) res).setRedisClient(addr);
}
if (isRedissonReferenceSupportEnabled()) {
handleReference(details.getMainPromise(), res);
} else {
@ -866,29 +866,60 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
if (res instanceof List) {
List<Object> r = (List<Object>)res;
List<Object> r = (List<Object>) res;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
try {
r.set(i, redisson != null
? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)));
} catch (Exception exception) {//skip and carry on to next one.
}
r.set(i, fromReference(r.get(i)));
} else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) {
try {
ScoredEntry<?> se = ((ScoredEntry<?>) r.get(i));
se = new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue()));
r.set(i, se);
} catch (Exception exception) {//skip and carry on to next one.
}
ScoredEntry<?> se = ((ScoredEntry<?>) r.get(i));
se = new ScoredEntry(se.getScore(), fromReference(se.getValue()));
r.set(i, se);
}
}
mainPromise.trySuccess(res);
} else if (res instanceof Set) {
Set r = (Set) res;
LinkedHashSet converted = new LinkedHashSet();
for (Object o : r) {
if (o instanceof RedissonReference) {
converted.add(fromReference(o));
} else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) {
ScoredEntry<?> se = ((ScoredEntry<?>) o);
se = new ScoredEntry(se.getScore(), fromReference(se.getValue()));
converted.add(se);
} else if (o instanceof Map.Entry) {
Map.Entry old = (Map.Entry) o;
Object key = old.getKey();
if (key instanceof RedissonReference) {
key = fromReference(key);
}
Object value = old.getValue();
if (value instanceof RedissonReference) {
value = fromReference(value);
}
converted.add(new AbstractMap.SimpleEntry(key, value));
} else {
converted.add(o);
}
}
mainPromise.trySuccess((R) converted);
} else if (res instanceof Map) {
Map<Object, Object> map = (Map<Object, Object>) res;
LinkedHashMap<Object, Object> converted = new LinkedHashMap<Object, Object>();
for (Map.Entry<Object, Object> e : map.entrySet()) {
Object value = e.getValue();
if (e.getValue() instanceof RedissonReference) {
value = fromReference(e.getValue());
}
Object key = e.getKey();
if (e.getKey() instanceof RedissonReference) {
key = fromReference(e.getKey());
}
converted.put(key, value);
}
mainPromise.trySuccess((R) converted);
} else if (res instanceof ListScanResult) {
List<ScanObjectEntry> r = ((ListScanResult)res).getValues();
List<ScanObjectEntry> r = ((ListScanResult) res).getValues();
for (int i = 0; i < r.size(); i++) {
Object obj = r.get(i);
if (!(obj instanceof ScanObjectEntry)) {
@ -896,60 +927,42 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
ScanObjectEntry e = r.get(i);
if (e.getObj() instanceof RedissonReference) {
try {
r.set(i , new ScanObjectEntry(e.getBuf(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getObj())));
} catch (Exception exception) {//skip and carry on to next one.
}
r.set(i, new ScanObjectEntry(e.getBuf(), fromReference(e.getObj())));
} else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry<?>) e.getObj()).getValue() instanceof RedissonReference) {
try {
ScoredEntry<?> se = ((ScoredEntry<?>) e.getObj());
se = new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue()));
r.set(i, new ScanObjectEntry(e.getBuf(), se));
} catch (Exception exception) {//skip and carry on to next one.
}
ScoredEntry<?> se = ((ScoredEntry<?>) e.getObj());
se = new ScoredEntry(se.getScore(), fromReference(se.getValue()));
r.set(i, new ScanObjectEntry(e.getBuf(), se));
}
}
mainPromise.trySuccess(res);
} else if (res instanceof MapScanResult) {
Map<ScanObjectEntry, ScanObjectEntry> map = ((MapScanResult)res).getMap();
HashMap<ScanObjectEntry, ScanObjectEntry> toAdd = null;
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : map.entrySet()) {
MapScanResult scanResult = (MapScanResult) res;
Map<ScanObjectEntry, ScanObjectEntry> map = ((MapScanResult) res).getMap();
LinkedHashMap<ScanObjectEntry, ScanObjectEntry> converted = new LinkedHashMap<ScanObjectEntry, ScanObjectEntry>();
boolean hasConversion = false;
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : map.entrySet()) {
ScanObjectEntry value = e.getValue();
if (e.getValue().getObj() instanceof RedissonReference) {
try {
e.setValue(new ScanObjectEntry(e.getValue().getBuf(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getValue().getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getValue().getObj())));
} catch (Exception exception) {//skip and carry on to next one.
}
value = new ScanObjectEntry(e.getValue().getBuf(), fromReference(e.getValue().getObj()));
hasConversion = true;
}
ScanObjectEntry key = e.getKey();
if (e.getKey().getObj() instanceof RedissonReference) {
if (toAdd == null) {
toAdd = new HashMap<ScanObjectEntry, ScanObjectEntry>();
}
toAdd.put(e.getKey(), e.getValue());
key = new ScanObjectEntry(e.getKey().getBuf(), fromReference(e.getKey().getObj()));
hasConversion = true;
}
converted.put(key, value);
}
if (toAdd != null) {
for (Map.Entry<ScanObjectEntry, ScanObjectEntry> e : toAdd.entrySet()) {
try {
map.put(new ScanObjectEntry(e.getValue().getBuf(), (redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getKey().getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getKey().getObj()))), map.remove(e.getKey()));
} catch (Exception exception) {//skip and carry on to next one.
}
}
if (hasConversion) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> newScanResult = new MapScanResult<ScanObjectEntry, ScanObjectEntry>(scanResult.getPos(), converted);
newScanResult.setRedisClient(scanResult.getRedisClient());
mainPromise.trySuccess((R) newScanResult);
} else {
mainPromise.trySuccess((R) res);
}
mainPromise.trySuccess(res);
} else if (res instanceof RedissonReference) {
try {
mainPromise.trySuccess(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res)
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) res));
mainPromise.trySuccess(this.<R>fromReference(res));
} catch (Exception exception) {
mainPromise.trySuccess(res);//fallback
}
@ -958,4 +971,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
}
private <R> R fromReference(Object res) {
try {
return redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) res)
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) res);
} catch (Exception exception) {
return (R) res;
}
}
}

@ -5,16 +5,27 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import org.junit.Test;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.RBatch;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketAsync;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RGeo;
import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RLiveObject;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RPriorityQueue;
import org.redisson.api.RQueue;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetMultimap;
import org.redisson.client.protocol.ScoredEntry;
/**
@ -22,7 +33,7 @@ import org.redisson.client.protocol.ScoredEntry;
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonReferenceTest extends BaseTest {
@Test
public void testBasic() {
RBucket<Object> b1 = redisson.getBucket("b1");
@ -36,10 +47,10 @@ public class RedissonReferenceTest extends BaseTest {
b4.set(redisson.getMapCache("testCache"));
assertTrue(b4.get() instanceof RedissonMapCache);
((RedissonMapCache) b4.get()).fastPut(b1, b2, 1, TimeUnit.MINUTES);
assertEquals("b2", ((RBucket)((RedissonMapCache) b4.get()).get(b1)).getName());
assertEquals("b2", ((RBucket) ((RedissonMapCache) b4.get()).get(b1)).getName());
RBucket<Object> b5 = redisson.getBucket("b5");
RLiveObjectService service = redisson.getLiveObjectService();
RedissonLiveObjectServiceTest.TestREntity rlo = new RedissonLiveObjectServiceTest.TestREntity("123");
rlo = service.persist(rlo);
rlo.setName("t1");
@ -48,9 +59,9 @@ public class RedissonReferenceTest extends BaseTest {
assertTrue(redisson.getBucket("b5").get() instanceof RLiveObject);
assertEquals("t1", ((RedissonLiveObjectServiceTest.TestREntity) redisson.getBucket("b5").get()).getName());
assertEquals("t2", ((RedissonLiveObjectServiceTest.TestREntity) redisson.getBucket("b5").get()).getValue());
}
@Test
public void testBatch() {
RBatch batch = redisson.createBatch();
@ -61,17 +72,17 @@ public class RedissonReferenceTest extends BaseTest {
b1.setAsync(b2);
b3.setAsync(b1);
batch.execute();
batch = redisson.createBatch();
batch.getBucket("b1").getAsync();
batch.getBucket("b2").getAsync();
batch.getBucket("b3").getAsync();
List<RBucket> result = (List<RBucket>)batch.execute();
List<RBucket> result = (List<RBucket>) batch.execute();
assertEquals("b2", result.get(0).getName());
assertEquals("b3", result.get(1).getName());
assertEquals("b1", result.get(2).getName());
}
@Test
public void testNormalToReactive() {
RBatch batch = redisson.createBatch();
@ -82,7 +93,7 @@ public class RedissonReferenceTest extends BaseTest {
b1.setAsync(b2);
b3.setAsync(b1);
batch.execute();
RBatchReactive b = Redisson.createReactive(redisson.getConfig()).createBatch();
b.getBucket("b1").get();
b.getBucket("b2").get();
@ -92,7 +103,7 @@ public class RedissonReferenceTest extends BaseTest {
assertEquals("b3", result.get(1).getName());
assertEquals("b1", result.get(2).getName());
}
@Test
public void testWithList() {
RSet<RBucket<String>> b1 = redisson.getSet("set");
@ -103,7 +114,7 @@ public class RedissonReferenceTest extends BaseTest {
assertEquals(b2.get(), b1.iterator().next().get());
assertEquals(2, redisson.getKeys().count());
}
@Test
public void testWithZSet() {
RScoredSortedSet<RBucket<String>> b1 = redisson.getScoredSortedSet("set");
@ -115,7 +126,97 @@ public class RedissonReferenceTest extends BaseTest {
Collection<ScoredEntry<RBucket<String>>> entryRange = b1.entryRange(0, 1);
assertEquals(b2.get(), entryRange.iterator().next().getValue().get());
}
@Test
public void testReadAll() throws InterruptedException {
RSetCache<RBucket<String>> b1 = redisson.getSetCache("set");
RBucket<String> b2 = redisson.getBucket("bucket");
b1.add(b2, 1, TimeUnit.MINUTES);
b2.set("test1");
assertEquals(b2.get(), b1.readAll().iterator().next().get());
assertEquals(2, redisson.getKeys().count());
RMapCache<String, RSetCache<RBucket<String>>> b3 = redisson.getMapCache("map");
b3.put("1", b1);
assertEquals(b2.get(), b3.readAllMap().get("1").iterator().next().get());
assertEquals(b2.get(), b3.readAllEntrySet().iterator().next().getValue().iterator().next().get());
assertEquals(b2.get(), b3.readAllValues().iterator().next().iterator().next().get());
RMapCache<RBucket<String>, RSetCache<RBucket<String>>> b4 = redisson.getMapCache("map1");
b4.put(b2, b1);
assertEquals(b2.get(), b4.readAllKeySet().iterator().next().get());
RPriorityQueue<RBucket<String>> q1 = redisson.getPriorityQueue("q1");
q1.add(b2);
assertEquals(b2.get(), q1.readAll().get(0).get());
RQueue<RBucket<String>> q2 = redisson.getQueue("q2");
q2.add(b2);
assertEquals(b2.get(), q2.readAll().get(0).get());
RDelayedQueue<RBucket<String>> q3 = redisson.getDelayedQueue(q2);
q3.offer(b2, 10, TimeUnit.MINUTES);
assertEquals(b2.get(), q3.readAll().get(0).get());
RList<RBucket<String>> l1 = redisson.getList("l1");
l1.add(b2);
assertEquals(b2.get(), l1.readAll().get(0).get());
RList<RBucket<String>> sl1 = l1.subList(0, 0);
assertEquals(b2.get(), sl1.readAll().get(0).get());
RLocalCachedMap<String, RBucket<String>> m1 = redisson.getLocalCachedMap("m1", LocalCachedMapOptions.defaults());
m1.put("1", b2);
assertEquals(b2.get(), m1.readAllMap().get("1").get());
assertEquals(b2.get(), m1.readAllEntrySet().iterator().next().getValue().get());
assertEquals(b2.get(), m1.readAllValues().iterator().next().get());
m1 = redisson.getLocalCachedMap("m1", LocalCachedMapOptions.defaults());
assertEquals(b2.get(), m1.readAllMap().get("1").get());
assertEquals(b2.get(), m1.readAllEntrySet().iterator().next().getValue().get());
assertEquals(b2.get(), m1.readAllValues().iterator().next().get());
RLocalCachedMap<RBucket<String>, RBucket<String>> m2 = redisson.getLocalCachedMap("m2", LocalCachedMapOptions.defaults());
m2.put(b2, b2);
assertEquals(b2.get(), m2.readAllKeySet().iterator().next().get());
m2 = redisson.getLocalCachedMap("m2", LocalCachedMapOptions.defaults());
assertEquals(b2.get(), m2.readAllKeySet().iterator().next().get());
RMap<String, RSetCache<RBucket<String>>> m3 = redisson.getMap("m3");
m3.put("1", b1);
assertEquals(b2.get(), m3.readAllMap().get("1").iterator().next().get());
assertEquals(b2.get(), m3.readAllEntrySet().iterator().next().getValue().iterator().next().get());
assertEquals(b2.get(), m3.readAllValues().iterator().next().iterator().next().get());
RMap<RBucket<String>, RSetCache<RBucket<String>>> m4 = redisson.getMap("m4");
m4.put(b2, b1);
assertEquals(b2.get(), m4.readAllKeySet().iterator().next().get());
//multimap
RGeo<RBucket<String>> g1 = redisson.getGeo("g1");
g1.add(13.361389, 38.115556, b2);
assertEquals(b2.get(), g1.readAll().iterator().next().get());
RScoredSortedSet<RBucket<String>> s1 = redisson.getScoredSortedSet("s1");
s1.add(0.0, b2);
assertEquals(b2.get(), s1.readAll().iterator().next().get());
RListMultimap<String, RBucket<String>> mm1 = redisson.getListMultimap("mm1");
mm1.put("1", b2);
assertEquals(b2.get(), mm1.get("1").readAll().get(0).get());
RListMultimap<RBucket<String>, RBucket<String>> mm2 = redisson.getListMultimap("mm2");
mm2.put(b2, b2);
assertEquals(b2.get(), mm2.get(b2).readAll().get(0).get());
RSetMultimap<String, RBucket<String>> mm3 = redisson.getSetMultimap("mm3");
mm3.put("1", b2);
assertEquals(b2.get(), mm3.get("1").readAll().iterator().next().get());
RSetMultimap<RBucket<String>, RBucket<String>> mm4 = redisson.getSetMultimap("mm4");
mm4.put(b2, b2);
assertEquals(b2.get(), mm4.get(b2).readAll().iterator().next().get());
}
@Test
public void testWithMap() {
RMap<RBucket<RMap>, RBucket<RMap>> map = redisson.getMap("set");

Loading…
Cancel
Save