From 3de8ddcc56e1169a8419e07433e453729ad2b52c Mon Sep 17 00:00:00 2001 From: mrniko <1104661+mrniko@users.noreply.github.com> Date: Mon, 3 Mar 2025 13:38:28 +0300 Subject: [PATCH] Fixed - error thrown by RLiveObject running with AWS ElastiCache Serverless Valkey. #6466 --- .../redisson/RedissonLiveObjectService.java | 35 ++++++++++++++----- .../client/protocol/RedisCommands.java | 1 + .../connection/ClusterConnectionManager.java | 19 +++------- .../pubsub/PublishSubscribeService.java | 32 ++++++++++++++++- .../RedissonLiveObjectServiceTest.java | 2 +- 5 files changed, 65 insertions(+), 24 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java b/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java index f7367176b..888303498 100644 --- a/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java +++ b/redisson/src/main/java/org/redisson/RedissonLiveObjectService.java @@ -53,6 +53,7 @@ import org.redisson.liveobject.misc.Introspectior; import org.redisson.liveobject.resolver.MapResolver; import org.redisson.liveobject.resolver.NamingScheme; import org.redisson.liveobject.resolver.RIdResolver; +import org.redisson.pubsub.PublishSubscribeService; import java.lang.reflect.Constructor; import java.lang.reflect.Field; @@ -80,7 +81,12 @@ public class RedissonLiveObjectService implements RLiveObjectService { } private void addExpireListener(CommandAsyncExecutor commandExecutor) { - if (commandExecutor.getServiceManager().getLiveObjectLatch().compareAndSet(false, true)) { + if (!commandExecutor.getServiceManager().getLiveObjectLatch().compareAndSet(false, true)) { + return; + } + + PublishSubscribeService ss = commandExecutor.getConnectionManager().getSubscribeService(); + if (ss.isPatternSupported()) { String pp = "__keyspace@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:redisson_live_object:*"; String prefix = pp.replace(":redisson_live_object:*", ""); RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, pp); @@ -88,20 +94,33 @@ public class RedissonLiveObjectService implements RLiveObjectService { if (!msg.equals("expired")) { return; } - - String name = channel.toString().replace(prefix, ""); - Class entity = resolveEntity(name); - if (entity == null) { + onExpired(commandExecutor, channel, prefix); + }); + } else { + String pp = "__keyevent@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:expired"; + RedissonTopic topic = RedissonTopic.createRaw(StringCodec.INSTANCE, commandExecutor, pp); + topic.addListenerAsync(String.class, (channel, msg) -> { + if (!msg.startsWith("redisson_live_object:")) { return; } - NamingScheme scheme = commandExecutor.getObjectBuilder().getNamingScheme(entity); - Object id = scheme.resolveId(name); - deleteExpired(id, entity); + onExpired(commandExecutor, msg, ""); }); } } + private void onExpired(CommandAsyncExecutor commandExecutor, CharSequence channel, String prefix) { + String name = channel.toString().replace(prefix, ""); + Class entity = resolveEntity(name); + if (entity == null) { + return; + } + + NamingScheme scheme = commandExecutor.getObjectBuilder().getNamingScheme(entity); + Object id = scheme.resolveId(name); + deleteExpired(id, entity); + } + private Class resolveEntity(String name) { String className = name.substring(name.lastIndexOf(":")+1); try { diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index bf3620ba4..87a7976a3 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -756,6 +756,7 @@ public interface RedisCommands { RedisStrictCommand SPUBLISH = new RedisStrictCommand("SPUBLISH"); RedisCommand PUBSUB_NUMSUB = new RedisCommand<>("PUBSUB", "NUMSUB", new ListObjectDecoder<>(1)); + RedisCommand PUBSUB_NUMPAT = new RedisCommand<>("PUBSUB", "NUMPAT", new ListObjectDecoder<>(1)); RedisCommand> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder()); RedisCommand> PUBSUB_SHARDCHANNELS = new RedisStrictCommand<>("PUBSUB", "SHARDCHANNELS", new StringListReplayDecoder()); RedisCommand PUBSUB_SHARDNUMSUB = new RedisCommand<>("PUBSUB", "SHARDNUMSUB", new ListObjectDecoder<>(1)); diff --git a/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java index 8ec102088..4bc8c9074 100644 --- a/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -21,7 +21,6 @@ import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.*; import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.ObjectDecoder; @@ -92,7 +91,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Throwable lastException = null; List failedMasters = new ArrayList<>(); - boolean skipShardingDetection = false; + boolean skipCommandsDetection = false; for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); CompletionStage connectionFuture = connectToNode(cfg, addr, addr.getHost()); @@ -107,18 +106,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { clusterNodesCommand = new RedisStrictCommand>("CLUSTER", "NODES", new ObjectDecoder(new ClusterNodesDecoder(addr.getScheme()))); - if (!skipShardingDetection) { - if (cfg.getShardedSubscriptionMode() == ShardedSubscriptionMode.AUTO) { - try { - connection.sync(RedisCommands.PUBSUB_SHARDNUMSUB); - subscribeService.setShardingSupported(true); - } catch (Exception e) { - // skip - } - } else if (cfg.getShardedSubscriptionMode() == ShardedSubscriptionMode.ON) { - subscribeService.setShardingSupported(true); - } - skipShardingDetection = true; + if (!skipCommandsDetection) { + subscribeService.checkShardingSupport(cfg.getShardedSubscriptionMode(), connection); + subscribeService.checkPatternSupport(connection); + skipCommandsDetection = true; } List nodes = connection.sync(clusterNodesCommand); diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index c4f40b623..78ca200a1 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -29,6 +29,7 @@ import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; +import org.redisson.config.ShardedSubscriptionMode; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; @@ -123,6 +124,7 @@ public class PublishSubscribeService { private final Set trackedEntries = Collections.newSetFromMap(new ConcurrentHashMap<>()); private boolean shardingSupported = false; + private boolean patternSupported = true; public PublishSubscribeService(ConnectionManager connectionManager) { super(); @@ -1039,13 +1041,41 @@ public class PublishSubscribeService { return f; } + public void checkPatternSupport(RedisConnection connection) { + try { + connection.sync(RedisCommands.PUBSUB_NUMPAT); + } catch (Exception e) { + setPatternSupported(false); + } + } + + public void checkShardingSupport(ShardedSubscriptionMode mode, RedisConnection connection) { + if (mode == ShardedSubscriptionMode.AUTO) { + try { + connection.sync(RedisCommands.PUBSUB_SHARDNUMSUB); + setShardingSupported(true); + } catch (Exception e) { + // skip + } + } else if (mode == ShardedSubscriptionMode.ON) { + setShardingSupported(true); + } + } + + public boolean isPatternSupported() { + return patternSupported; + } + public void setPatternSupported(boolean patternSupported) { + this.patternSupported = patternSupported; + } + public void setShardingSupported(boolean shardingSupported) { this.shardingSupported = shardingSupported; } - public boolean isShardingSupported() { return shardingSupported; } + public String getPublishCommand() { if (shardingSupported) { return RedisCommands.SPUBLISH.getName(); diff --git a/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java b/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java index 092ee5ba0..625b696c7 100644 --- a/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLiveObjectServiceTest.java @@ -1795,7 +1795,7 @@ public class RedissonLiveObjectServiceTest extends RedisDockerTest { throw new RuntimeException(e); } assertThat(redisson.getKeys().count()).isZero(); - }, NOTIFY_KEYSPACE_EVENTS, "Kx"); + }, NOTIFY_KEYSPACE_EVENTS, "KEA"); } @Test