diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index b66627e5b..784d6a278 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -71,6 +71,9 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import java.util.AbstractMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; /** * @@ -111,17 +114,17 @@ public class CommandAsyncService implements CommandAsyncExecutor { } return this; } - + @Override public boolean isRedissonReferenceSupportEnabled() { return redisson != null || redissonReactive != null; } - + @Override public void syncSubscription(RFuture future) { MasterSlaveServersConfig config = connectionManager.getConfig(); try { - int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts(); + int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); if (!future.await(timeout)) { throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)"); } @@ -130,7 +133,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } future.syncUninterruptibly(); } - + @Override public V get(RFuture future) { if (!future.isDone()) { @@ -141,7 +144,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { l.countDown(); } }); - + boolean interrupted = false; while (!future.isDone()) { try { @@ -150,7 +153,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { interrupted = true; } } - + if (interrupted) { Thread.currentThread().interrupt(); } @@ -176,16 +179,16 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); return l.await(timeout, timeoutUnit); } - + @Override - public RFuture readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { + public RFuture readAsync(InetSocketAddress client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); return mainPromise; } - + @Override - public RFuture readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params) { + public RFuture readAsync(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); @@ -193,7 +196,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture> readAllAsync(RedisCommand command, Object ... params) { + public RFuture> readAllAsync(RedisCommand command, Object... params) { final RPromise> mainPromise = connectionManager.newPromise(); final Set nodes = connectionManager.getEntrySet(); final List results = new ArrayList(); @@ -205,18 +208,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { mainPromise.tryFailure(future.cause()); return; } - + R result = future.getNow(); if (result instanceof Collection) { synchronized (results) { - results.addAll((Collection)result); + results.addAll((Collection) result); } } else { synchronized (results) { results.add(result); } } - + if (counter.decrementAndGet() == 0 && !mainPromise.isDone()) { mainPromise.trySuccess(results); @@ -233,7 +236,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture readRandomAsync(RedisCommand command, Object ... params) { + public RFuture readRandomAsync(RedisCommand command, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); final List nodes = new ArrayList(connectionManager.getEntrySet()); Collections.shuffle(nodes); @@ -269,21 +272,21 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture writeAllAsync(RedisCommand command, Object ... params) { + public RFuture writeAllAsync(RedisCommand command, Object... params) { return writeAllAsync(command, null, params); } @Override - public RFuture writeAllAsync(RedisCommand command, SlotCallback callback, Object ... params) { + public RFuture writeAllAsync(RedisCommand command, SlotCallback callback, Object... params) { return allAsync(false, command, callback, params); } @Override - public RFuture readAllAsync(RedisCommand command, SlotCallback callback, Object ... params) { + public RFuture readAllAsync(RedisCommand command, SlotCallback callback, Object... params) { return allAsync(true, command, callback, params); } - private RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object ... params) { + private RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); final Set nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); @@ -317,9 +320,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { } public RedisException convertException(RFuture future) { - return future.cause() instanceof RedisException ? - (RedisException) future.cause() : - new RedisException("Unexpected exception while processing command", future.cause()); + return future.cause() instanceof RedisException + ? (RedisException) future.cause() + : new RedisException("Unexpected exception while processing command", future.cause()); } private NodeSource getNodeSource(String key) { @@ -329,88 +332,86 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture readAsync(String key, Codec codec, RedisCommand command, Object ... params) { + public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); NodeSource source = getNodeSource(key); async(true, source, codec, command, params, mainPromise, 0); return mainPromise; } - public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { + public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); async(true, new NodeSource(entry), codec, command, params, mainPromise, 0); return mainPromise; } - - public RFuture readAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { + + public RFuture readAsync(Integer slot, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); async(true, new NodeSource(slot), codec, command, params, mainPromise, 0); return mainPromise; } @Override - public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params) { + public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); async(false, new NodeSource(entry), codec, command, params, mainPromise, 0); return mainPromise; } - @Override - public RFuture writeAsync(Integer slot, Codec codec, RedisCommand command, Object ... params) { + public RFuture writeAsync(Integer slot, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); async(false, new NodeSource(slot), codec, command, params, mainPromise, 0); return mainPromise; } @Override - public RFuture readAsync(String key, RedisCommand command, Object ... params) { + public RFuture readAsync(String key, RedisCommand command, Object... params) { return readAsync(key, connectionManager.getCodec(), command, params); } @Override - public RFuture evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public RFuture evalReadAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { NodeSource source = getNodeSource(key); return evalAsync(source, true, codec, evalCommandType, script, keys, params); } @Override - public RFuture evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public RFuture evalReadAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { return evalAsync(new NodeSource(entry), true, codec, evalCommandType, script, keys, params); } - + @Override - public RFuture evalReadAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public RFuture evalReadAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params); } @Override - public RFuture evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public RFuture evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { int slot = connectionManager.calcSlot(key); return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params); } @Override - public RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public RFuture evalWriteAsync(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { NodeSource source = getNodeSource(key); return evalAsync(source, false, codec, evalCommandType, script, keys, params); } - public RFuture evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + public RFuture evalWriteAsync(MasterSlaveEntry entry, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { return evalAsync(new NodeSource(entry), false, codec, evalCommandType, script, keys, params); } - - public RFuture evalWriteAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + + public RFuture evalWriteAsync(Integer slot, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params); } - @Override - public RFuture evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object ... params) { + public RFuture evalWriteAllAsync(RedisCommand command, SlotCallback callback, String script, List keys, Object... params) { return evalAllAsync(false, command, callback, script, keys, params); } - public RFuture evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object ... params) { + public RFuture evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); final Set entries = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(entries.size()); @@ -425,7 +426,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { callback.onSlotResult(future.getNow()); if (counter.decrementAndGet() == 0 - && !mainPromise.isDone()) { + && !mainPromise.isDone()) { mainPromise.trySuccess(callback.onFinish()); } } @@ -444,7 +445,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { return mainPromise; } - private RFuture evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + private RFuture evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { RPromise mainPromise = connectionManager.newPromise(); List args = new ArrayList(2 + keys.size() + params.length); args.add(script); @@ -456,12 +457,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture writeAsync(String key, RedisCommand command, Object ... params) { + public RFuture writeAsync(String key, RedisCommand command, Object... params) { return writeAsync(key, connectionManager.getCodec(), command, params); } @Override - public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object ... params) { + public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params) { RPromise mainPromise = connectionManager.newPromise(); NodeSource source = getNodeSource(key); async(false, source, codec, command, params, mainPromise, 0); @@ -469,7 +470,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, - final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt) { + final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt) { if (mainPromise.isCancelled()) { free(params); return; @@ -481,7 +482,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { return; } - final AsyncDetails details = AsyncDetails.acquire(); if (isRedissonReferenceSupportEnabled()) { try { @@ -522,7 +522,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } }; - + final TimerTask retryTimerTask = new TimerTask() { @Override @@ -550,7 +550,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setTimeout(timeout); return; } - + if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) { return; } @@ -591,30 +591,30 @@ public class CommandAsyncService implements CommandAsyncExecutor { Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); details.setTimeout(timeout); details.setupMainPromiseListener(mainPromiseListener); - + connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { if (connFuture.isCancelled()) { return; } - + if (!connFuture.isSuccess()) { connectionManager.getShutdownLatch().release(); details.setException(convertException(connectionFuture)); return; } - + if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) { releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details); return; } - + final RedisConnection connection = connFuture.getNow(); if (details.getSource().getRedirect() == Redirect.ASK) { List> list = new ArrayList>(2); RPromise promise = connectionManager.newPromise(); - list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[] {})); + list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); RPromise main = connectionManager.newPromise(); ChannelFuture future = connection.send(new CommandsData(main, list)); @@ -627,7 +627,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { ChannelFuture future = connection.send(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); details.setWriteFuture(future); } - + details.getWriteFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -658,13 +658,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { ReferenceCountUtil.safeRelease(obj); } } - + private void checkWriteFuture(final AsyncDetails details, final RedisConnection connection) { ChannelFuture future = details.getWriteFuture(); if (future.isCancelled() || details.getAttemptPromise().isDone()) { return; } - + if (!future.isSuccess()) { log.trace("Can't write {} to {}", details.getCommand(), connection); details.setException(new WriteRedisConnectionException( @@ -676,7 +676,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } details.getTimeout().cancel(); - + long timeoutTime = connectionManager.getConfig().getTimeout(); if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) { Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); @@ -684,7 +684,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (popTimeout == 0) { return; } - timeoutTime += popTimeout*1000; + timeoutTime += popTimeout * 1000; // add 1 second due to issue https://github.com/antirez/redis/issues/874 timeoutTime += 1000; } @@ -710,7 +710,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown")); } }; - + final Timeout scheduledFuture; if (popTimeout != 0) { // to handle cases when connection has been lost @@ -720,11 +720,11 @@ public class CommandAsyncService implements CommandAsyncExecutor { public void run(Timeout timeout) throws Exception { // re-connection hasn't been made // and connection is still active - if (orignalChannel == connection.getChannel() + if (orignalChannel == connection.getChannel() && connection.isActive()) { return; } - + if (details.getAttemptPromise().trySuccess(null)) { connection.forceFastReconnectAsync(); } @@ -733,7 +733,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } else { scheduledFuture = null; } - + details.getMainPromise().addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -756,13 +756,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); return; } - + if (future.cause() instanceof RedissonShutdownException) { details.getAttemptPromise().tryFailure(future.cause()); } } }); - + synchronized (listener) { if (!details.getMainPromise().isDone()) { connectionManager.getShutdownPromise().addListener(listener); @@ -771,14 +771,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { } protected void releaseConnection(final NodeSource source, final RFuture connectionFuture, - final boolean isReadOnly, RPromise attemptPromise, final AsyncDetails details) { + final boolean isReadOnly, RPromise attemptPromise, final AsyncDetails details) { attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!connectionFuture.isSuccess()) { return; } - + RedisConnection connection = connectionFuture.getNow(); connectionManager.getShutdownLatch().release(); if (isReadOnly) { @@ -786,7 +786,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } else { connectionManager.releaseWrite(source, connection); } - + if (log.isDebugEnabled()) { log.debug("connection released for command {} and params {} from slot {} using connection {}", details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection); @@ -805,7 +805,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.removeMainPromiseListener(); if (future.cause() instanceof RedisMovedException) { - RedisMovedException ex = (RedisMovedException)future.cause(); + RedisMovedException ex = (RedisMovedException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); AsyncDetails.release(details); @@ -813,7 +813,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } if (future.cause() instanceof RedisAskException) { - RedisAskException ex = (RedisAskException)future.cause(); + RedisAskException ex = (RedisAskException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); AsyncDetails.release(details); @@ -826,14 +826,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { AsyncDetails.release(details); return; } - + if (future.cause() instanceof RedisTryAgainException) { connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { async(details.isReadOnlyMode(), source, details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); - + } }, 1, TimeUnit.SECONDS); AsyncDetails.release(details); @@ -841,7 +841,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } free(details); - + if (future.isSuccess()) { R res = future.getNow(); if (res instanceof RedisClientResult) { @@ -849,9 +849,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (addr == null) { addr = details.getConnectionFuture().getNow().getRedisClient().getAddr(); } - ((RedisClientResult)res).setRedisClient(addr); + ((RedisClientResult) res).setRedisClient(addr); } - + if (isRedissonReferenceSupportEnabled()) { handleReference(details.getMainPromise(), res); } else { @@ -866,29 +866,60 @@ public class CommandAsyncService implements CommandAsyncExecutor { private void handleReference(RPromise mainPromise, R res) { if (res instanceof List) { - List r = (List)res; + List r = (List) res; for (int i = 0; i < r.size(); i++) { if (r.get(i) instanceof RedissonReference) { - try { - r.set(i, redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i))); - } catch (Exception exception) {//skip and carry on to next one. - } + r.set(i, fromReference(r.get(i))); } else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) { - try { - ScoredEntry se = ((ScoredEntry) r.get(i)); - se = new ScoredEntry(se.getScore(), redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue())); - r.set(i, se); - } catch (Exception exception) {//skip and carry on to next one. - } + ScoredEntry se = ((ScoredEntry) r.get(i)); + se = new ScoredEntry(se.getScore(), fromReference(se.getValue())); + r.set(i, se); } } mainPromise.trySuccess(res); + } else if (res instanceof Set) { + Set r = (Set) res; + LinkedHashSet converted = new LinkedHashSet(); + for (Object o : r) { + if (o instanceof RedissonReference) { + converted.add(fromReference(o)); + } else if (o instanceof ScoredEntry && ((ScoredEntry) o).getValue() instanceof RedissonReference) { + ScoredEntry se = ((ScoredEntry) o); + se = new ScoredEntry(se.getScore(), fromReference(se.getValue())); + converted.add(se); + } else if (o instanceof Map.Entry) { + Map.Entry old = (Map.Entry) o; + Object key = old.getKey(); + if (key instanceof RedissonReference) { + key = fromReference(key); + } + Object value = old.getValue(); + if (value instanceof RedissonReference) { + value = fromReference(value); + } + converted.add(new AbstractMap.SimpleEntry(key, value)); + } else { + converted.add(o); + } + } + mainPromise.trySuccess((R) converted); + } else if (res instanceof Map) { + Map map = (Map) res; + LinkedHashMap converted = new LinkedHashMap(); + for (Map.Entry e : map.entrySet()) { + Object value = e.getValue(); + if (e.getValue() instanceof RedissonReference) { + value = fromReference(e.getValue()); + } + Object key = e.getKey(); + if (e.getKey() instanceof RedissonReference) { + key = fromReference(e.getKey()); + } + converted.put(key, value); + } + mainPromise.trySuccess((R) converted); } else if (res instanceof ListScanResult) { - List r = ((ListScanResult)res).getValues(); + List r = ((ListScanResult) res).getValues(); for (int i = 0; i < r.size(); i++) { Object obj = r.get(i); if (!(obj instanceof ScanObjectEntry)) { @@ -896,60 +927,42 @@ public class CommandAsyncService implements CommandAsyncExecutor { } ScanObjectEntry e = r.get(i); if (e.getObj() instanceof RedissonReference) { - try { - r.set(i , new ScanObjectEntry(e.getBuf(), redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getObj()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getObj()))); - } catch (Exception exception) {//skip and carry on to next one. - } + r.set(i, new ScanObjectEntry(e.getBuf(), fromReference(e.getObj()))); } else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry) e.getObj()).getValue() instanceof RedissonReference) { - try { - ScoredEntry se = ((ScoredEntry) e.getObj()); - se = new ScoredEntry(se.getScore(), redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue())); - - r.set(i, new ScanObjectEntry(e.getBuf(), se)); - } catch (Exception exception) {//skip and carry on to next one. - } + ScoredEntry se = ((ScoredEntry) e.getObj()); + se = new ScoredEntry(se.getScore(), fromReference(se.getValue())); + r.set(i, new ScanObjectEntry(e.getBuf(), se)); } } mainPromise.trySuccess(res); } else if (res instanceof MapScanResult) { - Map map = ((MapScanResult)res).getMap(); - HashMap toAdd = null; - for (Map.Entry e : map.entrySet()) { + MapScanResult scanResult = (MapScanResult) res; + Map map = ((MapScanResult) res).getMap(); + LinkedHashMap converted = new LinkedHashMap(); + boolean hasConversion = false; + for (Map.Entry e : map.entrySet()) { + ScanObjectEntry value = e.getValue(); if (e.getValue().getObj() instanceof RedissonReference) { - try { - e.setValue(new ScanObjectEntry(e.getValue().getBuf(), redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getValue().getObj()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getValue().getObj()))); - } catch (Exception exception) {//skip and carry on to next one. - } + value = new ScanObjectEntry(e.getValue().getBuf(), fromReference(e.getValue().getObj())); + hasConversion = true; } + ScanObjectEntry key = e.getKey(); if (e.getKey().getObj() instanceof RedissonReference) { - if (toAdd == null) { - toAdd = new HashMap(); - } - toAdd.put(e.getKey(), e.getValue()); + key = new ScanObjectEntry(e.getKey().getBuf(), fromReference(e.getKey().getObj())); + hasConversion = true; } + converted.put(key, value); } - if (toAdd != null) { - for (Map.Entry e : toAdd.entrySet()) { - try { - map.put(new ScanObjectEntry(e.getValue().getBuf(), (redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getKey().getObj()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getKey().getObj()))), map.remove(e.getKey())); - } catch (Exception exception) {//skip and carry on to next one. - } - } + if (hasConversion) { + MapScanResult newScanResult = new MapScanResult(scanResult.getPos(), converted); + newScanResult.setRedisClient(scanResult.getRedisClient()); + mainPromise.trySuccess((R) newScanResult); + } else { + mainPromise.trySuccess((R) res); } - mainPromise.trySuccess(res); } else if (res instanceof RedissonReference) { try { - mainPromise.trySuccess(redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) res) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) res)); + mainPromise.trySuccess(this.fromReference(res)); } catch (Exception exception) { mainPromise.trySuccess(res);//fallback } @@ -958,4 +971,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } + private R fromReference(Object res) { + try { + return redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) res) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) res); + } catch (Exception exception) { + return (R) res; + } + } } diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java index 922cec6fe..dc5397c20 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java @@ -5,16 +5,27 @@ import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; import org.junit.Test; +import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.RBatch; import org.redisson.api.RBatchReactive; import org.redisson.api.RBucket; import org.redisson.api.RBucketAsync; import org.redisson.api.RBucketReactive; +import org.redisson.api.RDelayedQueue; +import org.redisson.api.RGeo; +import org.redisson.api.RList; +import org.redisson.api.RListMultimap; import org.redisson.api.RLiveObject; import org.redisson.api.RLiveObjectService; +import org.redisson.api.RLocalCachedMap; import org.redisson.api.RMap; +import org.redisson.api.RMapCache; +import org.redisson.api.RPriorityQueue; +import org.redisson.api.RQueue; import org.redisson.api.RScoredSortedSet; import org.redisson.api.RSet; +import org.redisson.api.RSetCache; +import org.redisson.api.RSetMultimap; import org.redisson.client.protocol.ScoredEntry; /** @@ -22,7 +33,7 @@ import org.redisson.client.protocol.ScoredEntry; * @author Rui Gu (https://github.com/jackygurui) */ public class RedissonReferenceTest extends BaseTest { - + @Test public void testBasic() { RBucket b1 = redisson.getBucket("b1"); @@ -36,10 +47,10 @@ public class RedissonReferenceTest extends BaseTest { b4.set(redisson.getMapCache("testCache")); assertTrue(b4.get() instanceof RedissonMapCache); ((RedissonMapCache) b4.get()).fastPut(b1, b2, 1, TimeUnit.MINUTES); - assertEquals("b2", ((RBucket)((RedissonMapCache) b4.get()).get(b1)).getName()); + assertEquals("b2", ((RBucket) ((RedissonMapCache) b4.get()).get(b1)).getName()); RBucket b5 = redisson.getBucket("b5"); RLiveObjectService service = redisson.getLiveObjectService(); - + RedissonLiveObjectServiceTest.TestREntity rlo = new RedissonLiveObjectServiceTest.TestREntity("123"); rlo = service.persist(rlo); rlo.setName("t1"); @@ -48,9 +59,9 @@ public class RedissonReferenceTest extends BaseTest { assertTrue(redisson.getBucket("b5").get() instanceof RLiveObject); assertEquals("t1", ((RedissonLiveObjectServiceTest.TestREntity) redisson.getBucket("b5").get()).getName()); assertEquals("t2", ((RedissonLiveObjectServiceTest.TestREntity) redisson.getBucket("b5").get()).getValue()); - + } - + @Test public void testBatch() { RBatch batch = redisson.createBatch(); @@ -61,17 +72,17 @@ public class RedissonReferenceTest extends BaseTest { b1.setAsync(b2); b3.setAsync(b1); batch.execute(); - + batch = redisson.createBatch(); batch.getBucket("b1").getAsync(); batch.getBucket("b2").getAsync(); batch.getBucket("b3").getAsync(); - List result = (List)batch.execute(); + List result = (List) batch.execute(); assertEquals("b2", result.get(0).getName()); assertEquals("b3", result.get(1).getName()); assertEquals("b1", result.get(2).getName()); } - + @Test public void testNormalToReactive() { RBatch batch = redisson.createBatch(); @@ -82,7 +93,7 @@ public class RedissonReferenceTest extends BaseTest { b1.setAsync(b2); b3.setAsync(b1); batch.execute(); - + RBatchReactive b = Redisson.createReactive(redisson.getConfig()).createBatch(); b.getBucket("b1").get(); b.getBucket("b2").get(); @@ -92,7 +103,7 @@ public class RedissonReferenceTest extends BaseTest { assertEquals("b3", result.get(1).getName()); assertEquals("b1", result.get(2).getName()); } - + @Test public void testWithList() { RSet> b1 = redisson.getSet("set"); @@ -103,7 +114,7 @@ public class RedissonReferenceTest extends BaseTest { assertEquals(b2.get(), b1.iterator().next().get()); assertEquals(2, redisson.getKeys().count()); } - + @Test public void testWithZSet() { RScoredSortedSet> b1 = redisson.getScoredSortedSet("set"); @@ -115,7 +126,97 @@ public class RedissonReferenceTest extends BaseTest { Collection>> entryRange = b1.entryRange(0, 1); assertEquals(b2.get(), entryRange.iterator().next().getValue().get()); } - + + @Test + public void testReadAll() throws InterruptedException { + RSetCache> b1 = redisson.getSetCache("set"); + RBucket b2 = redisson.getBucket("bucket"); + b1.add(b2, 1, TimeUnit.MINUTES); + b2.set("test1"); + assertEquals(b2.get(), b1.readAll().iterator().next().get()); + assertEquals(2, redisson.getKeys().count()); + + RMapCache>> b3 = redisson.getMapCache("map"); + b3.put("1", b1); + assertEquals(b2.get(), b3.readAllMap().get("1").iterator().next().get()); + assertEquals(b2.get(), b3.readAllEntrySet().iterator().next().getValue().iterator().next().get()); + assertEquals(b2.get(), b3.readAllValues().iterator().next().iterator().next().get()); + + RMapCache, RSetCache>> b4 = redisson.getMapCache("map1"); + b4.put(b2, b1); + assertEquals(b2.get(), b4.readAllKeySet().iterator().next().get()); + + RPriorityQueue> q1 = redisson.getPriorityQueue("q1"); + q1.add(b2); + assertEquals(b2.get(), q1.readAll().get(0).get()); + + RQueue> q2 = redisson.getQueue("q2"); + q2.add(b2); + assertEquals(b2.get(), q2.readAll().get(0).get()); + + RDelayedQueue> q3 = redisson.getDelayedQueue(q2); + q3.offer(b2, 10, TimeUnit.MINUTES); + assertEquals(b2.get(), q3.readAll().get(0).get()); + + RList> l1 = redisson.getList("l1"); + l1.add(b2); + assertEquals(b2.get(), l1.readAll().get(0).get()); + RList> sl1 = l1.subList(0, 0); + assertEquals(b2.get(), sl1.readAll().get(0).get()); + + RLocalCachedMap> m1 = redisson.getLocalCachedMap("m1", LocalCachedMapOptions.defaults()); + m1.put("1", b2); + assertEquals(b2.get(), m1.readAllMap().get("1").get()); + assertEquals(b2.get(), m1.readAllEntrySet().iterator().next().getValue().get()); + assertEquals(b2.get(), m1.readAllValues().iterator().next().get()); + m1 = redisson.getLocalCachedMap("m1", LocalCachedMapOptions.defaults()); + assertEquals(b2.get(), m1.readAllMap().get("1").get()); + assertEquals(b2.get(), m1.readAllEntrySet().iterator().next().getValue().get()); + assertEquals(b2.get(), m1.readAllValues().iterator().next().get()); + + RLocalCachedMap, RBucket> m2 = redisson.getLocalCachedMap("m2", LocalCachedMapOptions.defaults()); + m2.put(b2, b2); + assertEquals(b2.get(), m2.readAllKeySet().iterator().next().get()); + m2 = redisson.getLocalCachedMap("m2", LocalCachedMapOptions.defaults()); + assertEquals(b2.get(), m2.readAllKeySet().iterator().next().get()); + + RMap>> m3 = redisson.getMap("m3"); + m3.put("1", b1); + assertEquals(b2.get(), m3.readAllMap().get("1").iterator().next().get()); + assertEquals(b2.get(), m3.readAllEntrySet().iterator().next().getValue().iterator().next().get()); + assertEquals(b2.get(), m3.readAllValues().iterator().next().iterator().next().get()); + + RMap, RSetCache>> m4 = redisson.getMap("m4"); + m4.put(b2, b1); + assertEquals(b2.get(), m4.readAllKeySet().iterator().next().get()); + + //multimap + RGeo> g1 = redisson.getGeo("g1"); + g1.add(13.361389, 38.115556, b2); + assertEquals(b2.get(), g1.readAll().iterator().next().get()); + + RScoredSortedSet> s1 = redisson.getScoredSortedSet("s1"); + s1.add(0.0, b2); + assertEquals(b2.get(), s1.readAll().iterator().next().get()); + + RListMultimap> mm1 = redisson.getListMultimap("mm1"); + mm1.put("1", b2); + assertEquals(b2.get(), mm1.get("1").readAll().get(0).get()); + + RListMultimap, RBucket> mm2 = redisson.getListMultimap("mm2"); + mm2.put(b2, b2); + assertEquals(b2.get(), mm2.get(b2).readAll().get(0).get()); + + + RSetMultimap> mm3 = redisson.getSetMultimap("mm3"); + mm3.put("1", b2); + assertEquals(b2.get(), mm3.get("1").readAll().iterator().next().get()); + + RSetMultimap, RBucket> mm4 = redisson.getSetMultimap("mm4"); + mm4.put(b2, b2); + assertEquals(b2.get(), mm4.get(b2).readAll().iterator().next().get()); + } + @Test public void testWithMap() { RMap, RBucket> map = redisson.getMap("set");