From efd17b9440a6dcebef4dedad1ce2d18e62925466 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 22 Jan 2018 12:22:08 +0300 Subject: [PATCH] Fixed - RKeys.countExists and touch return wrong result in cluster mode --- .../client/protocol/RedisCommands.java | 4 +- .../redisson/command/CommandAsyncService.java | 98 ++++++++++--------- .../redisson/command/CommandBatchService.java | 2 +- .../command/CommandReactiveBatchService.java | 4 +- 4 files changed, 58 insertions(+), 50 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index c3eac37eb..b5de691a2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -294,9 +294,9 @@ public interface RedisCommands { RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor()); RedisCommand PSETEX = new RedisCommand("PSETEX", new VoidReplayConvertor()); - RedisStrictCommand TOUCH_LONG = new RedisStrictCommand("TOUCH"); + RedisStrictCommand TOUCH_LONG = new RedisStrictCommand("TOUCH", new LongReplayConvertor()); RedisStrictCommand TOUCH = new RedisStrictCommand("TOUCH", new BooleanReplayConvertor()); - RedisStrictCommand EXISTS_LONG = new RedisStrictCommand("EXISTS"); + RedisStrictCommand EXISTS_LONG = new RedisStrictCommand("EXISTS", new LongReplayConvertor()); RedisStrictCommand EXISTS = new RedisStrictCommand("EXISTS", new BooleanReplayConvertor()); RedisStrictCommand NOT_EXISTS = new RedisStrictCommand("EXISTS", new BooleanNumberReplayConvertor(1L)); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 750df9db9..9dc91c96b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -42,6 +42,7 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisException; import org.redisson.client.RedisLoadingException; import org.redisson.client.RedisMovedException; +import org.redisson.client.RedisRedirectException; import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTryAgainException; import org.redisson.client.WriteRedisConnectionException; @@ -62,6 +63,7 @@ import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; +import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -181,36 +183,36 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture readAsync(RedisClient client, String name, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); int slot = connectionManager.calcSlot(name); - async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0); + async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture readAsync(RedisClient client, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(client), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { - final RPromise> mainPromise = connectionManager.newPromise(); + final RPromise> mainPromise = new RedissonPromise>(); final Collection nodes = connectionManager.getEntrySet(); final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { + if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } @@ -234,16 +236,16 @@ public class CommandAsyncService implements CommandAsyncExecutor { }; for (MasterSlaveEntry entry : nodes) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true); } return mainPromise; } @Override public RFuture readRandomAsync(RedisCommand command, Object... params) { - final RPromise mainPromise = connectionManager.newPromise(); + final RPromise mainPromise = new RedissonPromise(); final List nodes = new ArrayList(connectionManager.getEntrySet()); Collections.shuffle(nodes); @@ -253,7 +255,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { private void retryReadRandomAsync(final RedisCommand command, final RPromise mainPromise, final List nodes, final Object... params) { - final RPromise attemptPromise = connectionManager.newPromise(); + final RPromise attemptPromise = new RedissonPromise(); attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -274,7 +276,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }); MasterSlaveEntry entry = nodes.remove(0); - async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0); + async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, attemptPromise, 0, false); } @Override @@ -293,19 +295,24 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object... params) { - final RPromise mainPromise = connectionManager.newPromise(); + final RPromise mainPromise = new RedissonPromise(); final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { + if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } + + T result = future.getNow(); + if (future.cause() instanceof RedisRedirectException) { + result = command.getConvertor().convert(result); + } if (callback != null) { - callback.onSlotResult(future.getNow()); + callback.onSlotResult(result); } if (counter.decrementAndGet() == 0) { if (callback != null) { @@ -318,9 +325,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { }; for (MasterSlaveEntry entry : nodes) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true); } return mainPromise; } @@ -339,22 +346,22 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture readAsync(String key, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); NodeSource source = getNodeSource(key); - async(true, source, codec, command, params, mainPromise, 0); + async(true, source, codec, command, params, mainPromise, 0, false); return mainPromise; } public RFuture readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(true, new NodeSource(entry), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @Override public RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); - async(false, new NodeSource(entry), codec, command, params, mainPromise, 0); + RPromise mainPromise = new RedissonPromise(); + async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false); return mainPromise; } @@ -396,14 +403,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { } public RFuture evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object... params) { - final RPromise mainPromise = connectionManager.newPromise(); + final RPromise mainPromise = new RedissonPromise(); final Collection entries = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(entries.size()); FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { + if (!future.isSuccess() && !(future.cause() instanceof RedisRedirectException)) { mainPromise.tryFailure(future.cause()); return; } @@ -422,21 +429,21 @@ public class CommandAsyncService implements CommandAsyncExecutor { args.addAll(keys); args.addAll(Arrays.asList(params)); for (MasterSlaveEntry entry : entries) { - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); promise.addListener(listener); - async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0); + async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true); } return mainPromise; } private RFuture evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); List args = new ArrayList(2 + keys.size() + params.length); args.add(script); args.add(keys.size()); args.addAll(keys); args.addAll(Arrays.asList(params)); - async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0); + async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false); return mainPromise; } @@ -447,14 +454,15 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture writeAsync(String key, Codec codec, RedisCommand command, Object... params) { - RPromise mainPromise = connectionManager.newPromise(); + RPromise mainPromise = new RedissonPromise(); NodeSource source = getNodeSource(key); - async(false, source, codec, command, params, mainPromise, 0); + async(false, source, codec, command, params, mainPromise, 0, false); return mainPromise; } protected void async(final boolean readOnlyMode, final NodeSource source, final Codec codec, - final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt) { + final RedisCommand command, final Object[] params, final RPromise mainPromise, final int attempt, + final boolean ignoreRedirect) { if (mainPromise.isCancelled()) { free(params); return; @@ -490,7 +498,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { connectionFuture = connectionManager.connectionWriteOp(source, command); } - final RPromise attemptPromise = connectionManager.newPromise(); + final RPromise attemptPromise = new RedissonPromise(); details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt); @@ -566,7 +574,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { count, details.getCommand(), Arrays.toString(details.getParams())); } details.removeMainPromiseListener(); - async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count); + async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect); AsyncDetails.release(details); } @@ -597,10 +605,10 @@ public class CommandAsyncService implements CommandAsyncExecutor { final RedisConnection connection = connFuture.getNow(); if (details.getSource().getRedirect() == Redirect.ASK) { List> list = new ArrayList>(2); - RPromise promise = connectionManager.newPromise(); + RPromise promise = new RedissonPromise(); list.add(new CommandData(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); list.add(new CommandData(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); - RPromise main = connectionManager.newPromise(); + RPromise main = new RedissonPromise(); ChannelFuture future = connection.send(new CommandsData(main, list)); details.setWriteFuture(future); } else { @@ -626,7 +634,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { attemptPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - checkAttemptFuture(source, details, future); + checkAttemptFuture(source, details, future, ignoreRedirect); } }); } @@ -780,7 +788,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void checkAttemptFuture(final NodeSource source, final AsyncDetails details, - Future future) { + Future future, boolean ignoreRedirect) { details.getTimeout().cancel(); if (future.isCancelled()) { return; @@ -788,25 +796,25 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.removeMainPromiseListener(); - if (future.cause() instanceof RedisMovedException) { + if (future.cause() instanceof RedisMovedException && !ignoreRedirect) { RedisMovedException ex = (RedisMovedException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } - if (future.cause() instanceof RedisAskException) { + if (future.cause() instanceof RedisAskException && !ignoreRedirect) { RedisAskException ex = (RedisAskException) future.cause(); async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } if (future.cause() instanceof RedisLoadingException) { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); AsyncDetails.release(details); return; } @@ -816,7 +824,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public void run(Timeout timeout) throws Exception { async(details.isReadOnlyMode(), source, details.getCodec(), - details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt()); + details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect); } }, 1, TimeUnit.SECONDS); diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index c94b6d6d2..22589d84c 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -111,7 +111,7 @@ public class CommandBatchService extends CommandAsyncService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt) { + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { if (executed) { throw new IllegalStateException("Batch already has been executed!"); } diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java index 735ca3660..37ef6b7b9 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveBatchService.java @@ -57,8 +57,8 @@ public class CommandReactiveBatchService extends CommandReactiveService { @Override protected void async(boolean readOnlyMode, NodeSource nodeSource, - Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt) { - batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt); + Codec codec, RedisCommand command, Object[] params, RPromise mainPromise, int attempt, boolean ignoreRedirect) { + batchService.async(readOnlyMode, nodeSource, codec, command, params, mainPromise, attempt, ignoreRedirect); } public RFuture> executeAsync(int syncSlaves, long syncTimeout, boolean skipResult, long responseTimeout, int retryAttempts, long retryInterval, boolean atomic) {