Fixed - RBatch with skipResult() option affects result of other commands (regression since 3.16.1) #3817

pull/3848/head
Nikita Koksharov 3 years ago
parent 382bce2ced
commit d5350a70b6

@ -15,11 +15,11 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.util.Queue; import io.netty.channel.Channel;
import java.util.concurrent.CountDownLatch; import io.netty.channel.ChannelFuture;
import java.util.concurrent.TimeUnit; import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.atomic.AtomicInteger; import io.netty.util.AttributeKey;
import io.netty.util.Timeout;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
@ -30,11 +30,11 @@ import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import io.netty.channel.Channel; import java.util.Deque;
import io.netty.channel.ChannelFuture; import java.util.Queue;
import io.netty.channel.ChannelFutureListener; import java.util.concurrent.CountDownLatch;
import io.netty.util.AttributeKey; import java.util.concurrent.TimeUnit;
import io.netty.util.Timeout; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
@ -57,7 +57,7 @@ public class RedisConnection implements RedisCommands {
private Runnable disconnectedListener; private Runnable disconnectedListener;
private volatile boolean pooled; private volatile boolean pooled;
private AtomicInteger usage = new AtomicInteger(); private final AtomicInteger usage = new AtomicInteger();
public <C> RedisConnection(RedisClient redisClient, Channel channel, RPromise<C> connectionPromise) { public <C> RedisConnection(RedisClient redisClient, Channel channel, RPromise<C> connectionPromise) {
this.redisClient = redisClient; this.redisClient = redisClient;
@ -119,6 +119,19 @@ public class RedisConnection implements RedisCommands {
return (C) channel.attr(RedisConnection.CONNECTION).get(); return (C) channel.attr(RedisConnection.CONNECTION).get();
} }
public CommandData<?, ?> getLastCommand() {
Deque<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
if (queue != null) {
QueueCommandHolder holder = queue.peekLast();
if (holder != null) {
if (holder.getCommand() instanceof CommandData) {
return (CommandData<?, ?>) holder.getCommand();
}
}
}
return null;
}
public CommandData<?, ?> getCurrentCommand() { public CommandData<?, ?> getCurrentCommand() {
Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get(); Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
if (queue != null) { if (queue != null) {
@ -320,7 +333,7 @@ public class RedisConnection implements RedisCommands {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "@" + System.identityHashCode(this) + " [redisClient=" + redisClient + ", channel=" + channel + ", currentCommand=" + getCurrentCommand() + "]"; return getClass().getSimpleName() + "@" + System.identityHashCode(this) + " [redisClient=" + redisClient + ", channel=" + channel + ", currentCommand=" + getCurrentCommand() + ", usage=" + usage + "]";
} }
} }

@ -20,13 +20,15 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.protocol.*; import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;
import org.redisson.misc.LogHelper; import org.redisson.misc.LogHelper;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Deque;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
@ -37,13 +39,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class CommandsQueue extends ChannelDuplexHandler { public class CommandsQueue extends ChannelDuplexHandler {
public static final AttributeKey<Queue<QueueCommandHolder>> COMMANDS_QUEUE = AttributeKey.valueOf("COMMANDS_QUEUE"); public static final AttributeKey<Deque<QueueCommandHolder>> COMMANDS_QUEUE = AttributeKey.valueOf("COMMANDS_QUEUE");
@Override @Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise); super.connect(ctx, remoteAddress, localAddress, promise);
ctx.channel().attr(COMMANDS_QUEUE).set(new ConcurrentLinkedQueue<>()); ctx.channel().attr(COMMANDS_QUEUE).set(new ConcurrentLinkedDeque<>());
} }
@Override @Override

@ -15,10 +15,9 @@
*/ */
package org.redisson.command; package org.redisson.command;
import java.util.ArrayList; import io.netty.channel.ChannelFuture;
import java.util.List; import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.atomic.AtomicInteger; import io.netty.channel.ChannelPromise;
import org.redisson.api.BatchOptions; import org.redisson.api.BatchOptions;
import org.redisson.api.BatchOptions.ExecutionMode; import org.redisson.api.BatchOptions.ExecutionMode;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -37,6 +36,10 @@ import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -97,7 +100,7 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size()); List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size());
if (source.getRedirect() == Redirect.ASK) { if (source.getRedirect() == Redirect.ASK) {
RPromise<Void> promise = new RedissonPromise<Void>(); RPromise<Void> promise = new RedissonPromise<Void>();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); list.add(new CommandData<>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
} }
for (CommandData<?, ?> c : entry.getCommands()) { for (CommandData<?, ?> c : entry.getCommands()) {
if ((c.getPromise().isCancelled() || c.getPromise().isSuccess()) if ((c.getPromise().isCancelled() || c.getPromise().isSuccess())
@ -115,7 +118,37 @@ public class RedisCommonBatchExecutor extends RedisExecutor<Object, Void> {
timeout.cancel(); timeout.cancel();
return; return;
} }
sendCommand(connection, attemptPromise, list);
}
private void sendCommand(RedisConnection connection, RPromise<Void> attemptPromise, List<CommandData<?, ?>> list) {
boolean isAtomic = options.getExecutionMode() != ExecutionMode.IN_MEMORY;
boolean isQueued = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC
|| options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC;
CommandData<?, ?> lastCommand = connection.getLastCommand();
if (lastCommand != null && options.isSkipResult()) {
writeFuture = connection.getChannel().newPromise();
lastCommand.getPromise().onComplete((r, e) -> {
CommandData<?, ?> currentLastCommand = connection.getLastCommand();
if (lastCommand != currentLastCommand && currentLastCommand != null) {
sendCommand(connection, attemptPromise, list);
return;
}
ChannelFuture wf = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));
wf.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
((ChannelPromise) writeFuture).trySuccess(future.getNow());
} else {
((ChannelPromise) writeFuture).tryFailure(future.cause());
}
});
});
return;
}
writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0)); writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));
} }

@ -1,5 +1,6 @@
package org.redisson; package org.redisson;
import net.bytebuddy.utility.RandomString;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Assumptions;
@ -14,6 +15,7 @@ import org.redisson.client.*;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
@ -240,7 +242,40 @@ public class RedissonBatchTest extends BaseTest {
} }
}); });
} }
@Test
public void testSkipResult() throws InterruptedException {
ExecutorService e = Executors.newFixedThreadPool(8);
Queue<RFuture<?>> futures = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 8; i++) {
e.submit(() -> {
for (int j = 0; j < 3000; j++) {
try {
if (ThreadLocalRandom.current().nextBoolean()) {
RBatch b = redisson.createBatch(BatchOptions.defaults());
RBucketAsync<Object> bucket = b.getBucket(RandomString.make(10), new JsonJacksonCodec());
bucket.trySetAsync("test");
RFuture<BatchResult<?>> f = b.executeAsync();
futures.add(f);
} else {
RMap<Integer, Integer> map = redisson.getMap("test", new JsonJacksonCodec());
RFuture<Integer> f = map.addAndGetAsync(1, 2);
futures.add(f);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
}
e.shutdown();
assertThat(e.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
for (RFuture<?> future : futures) {
future.syncUninterruptibly();
}
}
@ParameterizedTest @ParameterizedTest
@MethodSource("data") @MethodSource("data")
public void testConnectionLeakAfterError() throws InterruptedException { public void testConnectionLeakAfterError() throws InterruptedException {
@ -262,6 +297,9 @@ public class RedissonBatchTest extends BaseTest {
batch1.execute(); batch1.execute();
}); });
// time to reconnect broken connection
Thread.sleep(300);
redisson.getBucket("test3").set(4); redisson.getBucket("test3").set(4);
assertThat(redisson.getBucket("test3").get()).isEqualTo(4); assertThat(redisson.getBucket("test3").get()).isEqualTo(4);

Loading…
Cancel
Save