From 5a60e7e0cb99e6cfadb370dac3f4d81621716d33 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 28 Nov 2022 11:59:17 +0300 Subject: [PATCH] Fixed - command replies don't match if connection pool size < 10 and at least one command failed. #4693 --- .../org/redisson/client/RedisConnection.java | 6 +-- .../client/handler/CommandDecoder.java | 23 ++++++--- .../client/handler/CommandPubSubDecoder.java | 2 +- .../client/handler/CommandsQueuePubSub.java | 18 +++---- .../test/java/org/redisson/RedissonTest.java | 49 +++++++++++++++++++ 5 files changed, 77 insertions(+), 21 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 738ac6fa3..c8d7623ce 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -140,9 +140,9 @@ public class RedisConnection implements RedisCommands { } } - QueueCommand command = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).get(); - if (command instanceof CommandData) { - return (CommandData) command; + QueueCommandHolder holder = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).get(); + if (holder != null && holder.getCommand() instanceof CommandData) { + return (CommandData) holder.getCommand(); } return null; } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 06c9eadf8..e5f5e5574 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -69,18 +69,18 @@ public class CommandDecoder extends ReplayingDecoder { this.scheme = scheme; } - protected QueueCommand getCommand(ChannelHandlerContext ctx) { + protected QueueCommandHolder getCommand(ChannelHandlerContext ctx) { Queue queue = ctx.channel().attr(CommandsQueue.COMMANDS_QUEUE).get(); - QueueCommandHolder holder = queue.peek(); - if (holder != null) { - return holder.getCommand(); - } - return null; + return queue.peek(); } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - QueueCommand data = getCommand(ctx); + QueueCommandHolder holder = getCommand(ctx); + QueueCommand data = null; + if (holder != null) { + data = holder.getCommand(); + } if (state() == null) { state(new State()); @@ -91,13 +91,20 @@ public class CommandDecoder extends ReplayingDecoder { int endIndex = skipCommand(in); try { - decode(ctx, in, data, 0); + decode(ctx, in, null, 0); } catch (Exception e) { in.readerIndex(endIndex); throw e; } } } else { + if (!holder.getChannelPromise().isSuccess()) { + sendNext(ctx.channel()); + // throw REPLAY error + in.indexOf(Integer.MAX_VALUE/2, Integer.MAX_VALUE, (byte) 0); + return; + } + int endIndex = 0; if (!(data instanceof CommandsData)) { endIndex = skipCommand(in); diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index e9e71339a..79b0cb94c 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -65,7 +65,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } @Override - protected QueueCommand getCommand(ChannelHandlerContext ctx) { + protected QueueCommandHolder getCommand(ChannelHandlerContext ctx) { return ctx.channel().attr(CommandsQueuePubSub.CURRENT_COMMAND).get(); } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueuePubSub.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueuePubSub.java index c53f1996a..13ecd0f1d 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueuePubSub.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueuePubSub.java @@ -36,7 +36,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class CommandsQueuePubSub extends ChannelDuplexHandler { - public static final AttributeKey CURRENT_COMMAND = AttributeKey.valueOf("promise"); + public static final AttributeKey CURRENT_COMMAND = AttributeKey.valueOf("promise"); private final Queue queue = new ConcurrentLinkedQueue<>(); @@ -47,8 +47,8 @@ public class CommandsQueuePubSub extends ChannelDuplexHandler { }; public void sendNextCommand(Channel channel) { - QueueCommand command = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).getAndSet(null); - if (command != null) { + QueueCommandHolder holder = channel.attr(CommandsQueuePubSub.CURRENT_COMMAND).getAndSet(null); + if (holder != null) { queue.poll(); } else { QueueCommandHolder c = queue.peek(); @@ -96,9 +96,9 @@ public class CommandsQueuePubSub extends ChannelDuplexHandler { } private void sendData(Channel ch) { - QueueCommandHolder command = queue.peek(); - if (command != null && command.trySend()) { - QueueCommand data = command.getCommand(); + QueueCommandHolder holder = queue.peek(); + if (holder != null && holder.trySend()) { + QueueCommand data = holder.getCommand(); List> pubSubOps = data.getPubSubOperations(); if (!pubSubOps.isEmpty()) { for (CommandData cd : pubSubOps) { @@ -107,11 +107,11 @@ public class CommandsQueuePubSub extends ChannelDuplexHandler { } } } else { - ch.attr(CURRENT_COMMAND).set(data); + ch.attr(CURRENT_COMMAND).set(holder); } - command.getChannelPromise().addListener(listener); - ch.writeAndFlush(data, command.getChannelPromise()); + holder.getChannelPromise().addListener(listener); + ch.writeAndFlush(data, holder.getChannelPromise()); } } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 6ba8386cd..2d5dafefe 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -87,6 +87,55 @@ public class RedissonTest extends BaseTest { inst.shutdown(); } + @Test + public void testResponseHandling2() throws InterruptedException { + Config config = new Config(); + config.useSingleServer() + .setTimeout(10) + .setRetryAttempts(0) + .setConnectionPoolSize(1) + .setConnectionMinimumIdleSize(1) + .setPingConnectionInterval(0) + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + + RedissonClient redisson = Redisson.create(config); + + RBucket bucket1 = redisson.getBucket("name1"); + RBucket bucket2 = redisson.getBucket("name2"); + + bucket1.set("val1"); + bucket2.set("val2"); + + ExecutorService executor1 = Executors.newCachedThreadPool(); + ExecutorService executor2 = Executors.newCachedThreadPool(); + + AtomicBoolean hasError = new AtomicBoolean(); + for (int i = 0; i < 100000; i++) { + executor1.submit(() -> { + String get = bucket1.get(); + if (get.equals("val2")) { + hasError.set(true); + } + }); + + executor2.submit(() -> { + String get = bucket2.get(); + if (get.equals("val1")) { + hasError.set(true); + } + }); + } + + executor1.shutdown(); + assertThat(executor1.awaitTermination(5, TimeUnit.SECONDS)).isTrue(); + executor2.shutdown(); + assertThat(executor2.awaitTermination(5, TimeUnit.SECONDS)).isTrue(); + assertThat(hasError).isFalse(); + + redisson.shutdown(); + } + + @Test public void testResponseHandling() throws InterruptedException { List list = new ArrayList<>();