Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 24f18ab7c4

@ -228,7 +228,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
try { try {
decode(in, commandData, null, ctx.channel(), !commandBatch.isAtomic()); decode(in, commandData, null, ctx.channel(), commandBatch.isQueued());
} finally { } finally {
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
commandsData.remove(); commandsData.remove();

@ -32,33 +32,39 @@ public class CommandsData implements QueueCommand {
private final RPromise<Void> promise; private final RPromise<Void> promise;
private final boolean skipResult; private final boolean skipResult;
private final boolean atomic; private final boolean atomic;
private final boolean queued;
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands) { public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean queued) {
this(promise, commands, null); this(promise, commands, null, false, false, queued);
} }
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands) { public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands) {
this(promise, commands, attachedCommands, false, false); this(promise, commands, attachedCommands, false, false, true);
} }
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic, boolean queued) {
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic) { this(promise, commands, null, skipResult, atomic, queued);
this(promise, commands, null, skipResult, atomic);
} }
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands, boolean skipResult, boolean atomic) { public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands,
boolean skipResult, boolean atomic, boolean queued) {
super(); super();
this.promise = promise; this.promise = promise;
this.commands = commands; this.commands = commands;
this.skipResult = skipResult; this.skipResult = skipResult;
this.atomic = atomic; this.atomic = atomic;
this.attachedCommands = attachedCommands; this.attachedCommands = attachedCommands;
this.queued = queued;
} }
public RPromise<Void> getPromise() { public RPromise<Void> getPromise() {
return promise; return promise;
} }
public boolean isQueued() {
return queued;
}
public boolean isAtomic() { public boolean isAtomic() {
return atomic; return atomic;
} }

@ -1051,7 +1051,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{})); list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>(); RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list)); ChannelFuture future = connection.send(new CommandsData(main, list, false));
details.setWriteFuture(future); details.setWriteFuture(future);
} else { } else {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {

@ -269,7 +269,7 @@ public class CommandBatchService extends CommandAsyncService {
} }
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>(); RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list)); ChannelFuture future = connection.send(new CommandsData(main, list, true));
details.setWriteFuture(future); details.setWriteFuture(future);
} else { } else {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -282,7 +282,7 @@ public class CommandBatchService extends CommandAsyncService {
list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.MULTI, new Object[]{})); list.add(new CommandData<Void, Void>(new RedissonPromise<Void>(), details.getCodec(), RedisCommands.MULTI, new Object[]{}));
list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams())); list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
RPromise<Void> main = new RedissonPromise<Void>(); RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list)); ChannelFuture future = connection.send(new CommandsData(main, list, true));
connectionEntry.setFirstCommand(false); connectionEntry.setFirstCommand(false);
details.setWriteFuture(future); details.setWriteFuture(future);
} else { } else {
@ -733,7 +733,7 @@ public class CommandBatchService extends CommandAsyncService {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(), checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, options.isSkipResult(),
options.getResponseTimeout(), attempts, options.getExecutionMode() != ExecutionMode.IN_MEMORY); options.getResponseTimeout(), attempts, options.getExecutionMode());
} }
}); });
@ -824,7 +824,7 @@ public class CommandBatchService extends CommandAsyncService {
private void checkConnectionFuture(final Entry entry, final NodeSource source, private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details, final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts, boolean atomic) { RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts, ExecutionMode executionMode) {
if (connFuture.isCancelled()) { if (connFuture.isCancelled()) {
return; return;
} }
@ -841,6 +841,8 @@ public class CommandBatchService extends CommandAsyncService {
} }
final RedisConnection connection = connFuture.getNow(); final RedisConnection connection = connFuture.getNow();
boolean isAtomic = executionMode != ExecutionMode.IN_MEMORY;
boolean isQueued = executionMode == ExecutionMode.REDIS_READ_ATOMIC || executionMode == ExecutionMode.REDIS_WRITE_ATOMIC;
List<CommandData<?, ?>> list = new LinkedList<CommandData<?, ?>>(); List<CommandData<?, ?>> list = new LinkedList<CommandData<?, ?>>();
if (source.getRedirect() == Redirect.ASK) { if (source.getRedirect() == Redirect.ASK) {
@ -848,14 +850,14 @@ public class CommandBatchService extends CommandAsyncService {
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
} }
for (BatchCommandData<?, ?> c : entry.getCommands()) { for (BatchCommandData<?, ?> c : entry.getCommands()) {
if (c.getPromise().isSuccess() && !isWaitCommand(c) && !atomic) { if (c.getPromise().isSuccess() && !isWaitCommand(c) && !isAtomic) {
// skip successful commands // skip successful commands
continue; continue;
} }
list.add(c); list.add(c);
} }
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult, atomic)); ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult, isAtomic, isQueued));
details.setWriteFuture(future); details.setWriteFuture(future);
details.getWriteFuture().addListener(new ChannelFutureListener() { details.getWriteFuture().addListener(new ChannelFutureListener() {

@ -153,7 +153,7 @@ public class RedisClientTest {
commands.add(cmd4); commands.add(cmd4);
RPromise<Void> p = new RedissonPromise<Void>(); RPromise<Void> p = new RedissonPromise<Void>();
conn.send(new CommandsData(p, commands)); conn.send(new CommandsData(p, commands, false));
assertThat(cmd1.getPromise().get()).isEqualTo("PONG"); assertThat(cmd1.getPromise().get()).isEqualTo("PONG");
assertThat(cmd2.getPromise().get()).isEqualTo(1); assertThat(cmd2.getPromise().get()).isEqualTo(1);
@ -188,7 +188,7 @@ public class RedisClientTest {
} }
RPromise<Void> p = new RedissonPromise<Void>(); RPromise<Void> p = new RedissonPromise<Void>();
conn.send(new CommandsData(p, commands)); conn.send(new CommandsData(p, commands, false));
for (CommandData<?, ?> commandData : commands) { for (CommandData<?, ?> commandData : commands) {
commandData.getPromise().get(); commandData.getPromise().get();

Loading…
Cancel
Save