Fixed - command replies don't match if connection pool size < 10 and at least one command failed. #4693

pull/4536/merge
Nikita Koksharov 2 years ago
parent efbc904dd8
commit 5a60e7e0cb

@ -140,9 +140,9 @@ public class RedisConnection implements RedisCommands {
}
}
QueueCommand command = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).get();
if (command instanceof CommandData) {
return (CommandData<?, ?>) command;
QueueCommandHolder holder = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).get();
if (holder != null && holder.getCommand() instanceof CommandData) {
return (CommandData<?, ?>) holder.getCommand();
}
return null;
}

@ -69,18 +69,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
this.scheme = scheme;
}
protected QueueCommand getCommand(ChannelHandlerContext ctx) {
protected QueueCommandHolder getCommand(ChannelHandlerContext ctx) {
Queue<QueueCommandHolder> queue = ctx.channel().attr(CommandsQueue.COMMANDS_QUEUE).get();
QueueCommandHolder holder = queue.peek();
if (holder != null) {
return holder.getCommand();
}
return null;
return queue.peek();
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommand data = getCommand(ctx);
QueueCommandHolder holder = getCommand(ctx);
QueueCommand data = null;
if (holder != null) {
data = holder.getCommand();
}
if (state() == null) {
state(new State());
@ -91,13 +91,20 @@ public class CommandDecoder extends ReplayingDecoder<State> {
int endIndex = skipCommand(in);
try {
decode(ctx, in, data, 0);
decode(ctx, in, null, 0);
} catch (Exception e) {
in.readerIndex(endIndex);
throw e;
}
}
} else {
if (!holder.getChannelPromise().isSuccess()) {
sendNext(ctx.channel());
// throw REPLAY error
in.indexOf(Integer.MAX_VALUE/2, Integer.MAX_VALUE, (byte) 0);
return;
}
int endIndex = 0;
if (!(data instanceof CommandsData)) {
endIndex = skipCommand(in);

@ -65,7 +65,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
}
@Override
protected QueueCommand getCommand(ChannelHandlerContext ctx) {
protected QueueCommandHolder getCommand(ChannelHandlerContext ctx) {
return ctx.channel().attr(CommandsQueuePubSub.CURRENT_COMMAND).get();
}

@ -36,7 +36,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class CommandsQueuePubSub extends ChannelDuplexHandler {
public static final AttributeKey<QueueCommand> CURRENT_COMMAND = AttributeKey.valueOf("promise");
public static final AttributeKey<QueueCommandHolder> CURRENT_COMMAND = AttributeKey.valueOf("promise");
private final Queue<QueueCommandHolder> queue = new ConcurrentLinkedQueue<>();
@ -47,8 +47,8 @@ public class CommandsQueuePubSub extends ChannelDuplexHandler {
};
public void sendNextCommand(Channel channel) {
QueueCommand command = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).getAndSet(null);
if (command != null) {
QueueCommandHolder holder = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).getAndSet(null);
if (holder != null) {
queue.poll();
} else {
QueueCommandHolder c = queue.peek();
@ -96,9 +96,9 @@ public class CommandsQueuePubSub extends ChannelDuplexHandler {
}
private void sendData(Channel ch) {
QueueCommandHolder command = queue.peek();
if (command != null && command.trySend()) {
QueueCommand data = command.getCommand();
QueueCommandHolder holder = queue.peek();
if (holder != null && holder.trySend()) {
QueueCommand data = holder.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) {
@ -107,11 +107,11 @@ public class CommandsQueuePubSub extends ChannelDuplexHandler {
}
}
} else {
ch.attr(CURRENT_COMMAND).set(data);
ch.attr(CURRENT_COMMAND).set(holder);
}
command.getChannelPromise().addListener(listener);
ch.writeAndFlush(data, command.getChannelPromise());
holder.getChannelPromise().addListener(listener);
ch.writeAndFlush(data, holder.getChannelPromise());
}
}

@ -87,6 +87,55 @@ public class RedissonTest extends BaseTest {
inst.shutdown();
}
@Test
public void testResponseHandling2() throws InterruptedException {
Config config = new Config();
config.useSingleServer()
.setTimeout(10)
.setRetryAttempts(0)
.setConnectionPoolSize(1)
.setConnectionMinimumIdleSize(1)
.setPingConnectionInterval(0)
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBucket<String> bucket1 = redisson.getBucket("name1");
RBucket<String> bucket2 = redisson.getBucket("name2");
bucket1.set("val1");
bucket2.set("val2");
ExecutorService executor1 = Executors.newCachedThreadPool();
ExecutorService executor2 = Executors.newCachedThreadPool();
AtomicBoolean hasError = new AtomicBoolean();
for (int i = 0; i < 100000; i++) {
executor1.submit(() -> {
String get = bucket1.get();
if (get.equals("val2")) {
hasError.set(true);
}
});
executor2.submit(() -> {
String get = bucket2.get();
if (get.equals("val1")) {
hasError.set(true);
}
});
}
executor1.shutdown();
assertThat(executor1.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
executor2.shutdown();
assertThat(executor2.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
assertThat(hasError).isFalse();
redisson.shutdown();
}
@Test
public void testResponseHandling() throws InterruptedException {
List<Integer> list = new ArrayList<>();

Loading…
Cancel
Save