Fixed - Incorrect handling "unknown command" response for RTopic operations. #5128

pull/5139/head
Nikita Koksharov 2 years ago
parent 9d1c35d343
commit 86fdde634f

@ -111,16 +111,38 @@ public class CommandPubSubDecoder extends CommandDecoder {
@Override
protected void onError(Channel channel, String error) {
if (error.contains("unknown command") && error.contains("SSUBSCRIBE")) {
commands.keySet().stream()
.filter(v -> v.getOperation().equalsIgnoreCase(RedisCommands.SSUBSCRIBE.getName()))
.forEach(v -> {
CommandData<Object, Object> dd = commands.get(v);
dd.getPromise().completeExceptionally(new RedisException(error));
});
} else {
super.onError(channel, error);
if (error.contains("unknown command")) {
Set<String> cmds = new HashSet<>(RedisCommands.PUBSUB_COMMANDS);
cmds.remove(RedisCommands.SUBSCRIBE.getName());
cmds.remove(RedisCommands.UNSUBSCRIBE.getName());
String cmd = null;
for (String value : cmds) {
if (error.contains(value)) {
cmd = value;
break;
}
}
if (cmd == null) {
if (error.contains(RedisCommands.UNSUBSCRIBE.getName())) {
cmd = RedisCommands.UNSUBSCRIBE.getName();
} else if (error.contains(RedisCommands.SUBSCRIBE.getName())) {
cmd = RedisCommands.SUBSCRIBE.getName();
}
}
if (cmd != null) {
String c = cmd;
commands.keySet().stream()
.filter(v -> v.getOperation().equalsIgnoreCase(c))
.forEach(v -> {
CommandData<Object, Object> dd = commands.get(v);
dd.getPromise().completeExceptionally(new RedisException(error));
});
}
return;
}
super.onError(channel, error);
}
@Override

@ -114,6 +114,8 @@ public class PublishSubscribeService {
private boolean shardingSupported = false;
private final Map<PubSubType, PubSubType> subscribe2unsubscribe = new HashMap<>();
public PublishSubscribeService(ConnectionManager connectionManager) {
super();
this.connectionManager = connectionManager;
@ -121,6 +123,10 @@ public class PublishSubscribeService {
for (int i = 0; i < locks.length; i++) {
locks[i] = new AsyncSemaphore(1);
}
subscribe2unsubscribe.put(PubSubType.SUBSCRIBE, PubSubType.UNSUBSCRIBE);
subscribe2unsubscribe.put(PubSubType.SSUBSCRIBE, PubSubType.SUNSUBSCRIBE);
subscribe2unsubscribe.put(PubSubType.PSUBSCRIBE, PubSubType.PUNSUBSCRIBE);
}
public LockPubSub getLockPubSub() {
@ -379,7 +385,8 @@ public class PublishSubscribeService {
CompletableFuture<PubSubConnectionEntry> pp = new CompletableFuture<>();
pp.whenComplete((r, e) -> {
if (e != null) {
CompletableFuture<Codec> f = unsubscribe(channelName, type);
PubSubType unsubscribeType = subscribe2unsubscribe.get(type);
CompletableFuture<Codec> f = unsubscribe(channelName, unsubscribeType);
f.whenComplete((rr, ee) -> {
promise.completeExceptionally(e);
});
@ -483,7 +490,8 @@ public class PublishSubscribeService {
CompletableFuture<PubSubConnectionEntry> pp = new CompletableFuture<>();
pp.whenComplete((r, e) -> {
if (e != null) {
CompletableFuture<Codec> f = unsubscribe(channelName, type);
PubSubType unsubscribeType = subscribe2unsubscribe.get(type);
CompletableFuture<Codec> f = unsubscribe(channelName, unsubscribeType);
f.whenComplete((rr, ee) -> {
promise.completeExceptionally(e);
});

@ -1,10 +1,13 @@
package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.NatMapper;
import org.redisson.api.RShardedTopic;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
@ -13,6 +16,7 @@ import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupC
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
@ -28,6 +32,27 @@ public class RedissonShardedTopicTest {
.withStartupCheckStrategy(
new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(6))
);
@Test
public void testInvalidCommand() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner.RedisProcess p = master1.run();
Config config = new Config();
config.useSingleServer().setAddress(p.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RShardedTopic t = redisson.getShardedTopic("ttt");
RedisException e = Assertions.assertThrows(RedisException.class, () -> {
t.addListener(String.class, (channel, msg) -> {
});
});
assertThat(e.getMessage()).contains("ERR unknown command `SSUBSCRIBE`");
redisson.shutdown();
p.stop();
}
@Test
public void testClusterSharding() {
Config config = new Config();

Loading…
Cancel
Save