Fixed - error thrown by RLiveObject running with AWS ElastiCache Serverless Valkey. #6466

pull/6471/head
mrniko 3 weeks ago
parent 07b8aaa6b9
commit 3de8ddcc56

@ -53,6 +53,7 @@ import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.MapResolver; import org.redisson.liveobject.resolver.MapResolver;
import org.redisson.liveobject.resolver.NamingScheme; import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.liveobject.resolver.RIdResolver; import org.redisson.liveobject.resolver.RIdResolver;
import org.redisson.pubsub.PublishSubscribeService;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -80,7 +81,12 @@ public class RedissonLiveObjectService implements RLiveObjectService {
} }
private void addExpireListener(CommandAsyncExecutor commandExecutor) { private void addExpireListener(CommandAsyncExecutor commandExecutor) {
if (commandExecutor.getServiceManager().getLiveObjectLatch().compareAndSet(false, true)) { if (!commandExecutor.getServiceManager().getLiveObjectLatch().compareAndSet(false, true)) {
return;
}
PublishSubscribeService ss = commandExecutor.getConnectionManager().getSubscribeService();
if (ss.isPatternSupported()) {
String pp = "__keyspace@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:redisson_live_object:*"; String pp = "__keyspace@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:redisson_live_object:*";
String prefix = pp.replace(":redisson_live_object:*", ""); String prefix = pp.replace(":redisson_live_object:*", "");
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, pp); RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, pp);
@ -88,20 +94,33 @@ public class RedissonLiveObjectService implements RLiveObjectService {
if (!msg.equals("expired")) { if (!msg.equals("expired")) {
return; return;
} }
onExpired(commandExecutor, channel, prefix);
String name = channel.toString().replace(prefix, ""); });
Class<?> entity = resolveEntity(name); } else {
if (entity == null) { String pp = "__keyevent@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:expired";
RedissonTopic topic = RedissonTopic.createRaw(StringCodec.INSTANCE, commandExecutor, pp);
topic.addListenerAsync(String.class, (channel, msg) -> {
if (!msg.startsWith("redisson_live_object:")) {
return; return;
} }
NamingScheme scheme = commandExecutor.getObjectBuilder().getNamingScheme(entity); onExpired(commandExecutor, msg, "");
Object id = scheme.resolveId(name);
deleteExpired(id, entity);
}); });
} }
} }
private void onExpired(CommandAsyncExecutor commandExecutor, CharSequence channel, String prefix) {
String name = channel.toString().replace(prefix, "");
Class<?> entity = resolveEntity(name);
if (entity == null) {
return;
}
NamingScheme scheme = commandExecutor.getObjectBuilder().getNamingScheme(entity);
Object id = scheme.resolveId(name);
deleteExpired(id, entity);
}
private Class<?> resolveEntity(String name) { private Class<?> resolveEntity(String name) {
String className = name.substring(name.lastIndexOf(":")+1); String className = name.substring(name.lastIndexOf(":")+1);
try { try {

@ -756,6 +756,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> SPUBLISH = new RedisStrictCommand<Long>("SPUBLISH"); RedisStrictCommand<Long> SPUBLISH = new RedisStrictCommand<Long>("SPUBLISH");
RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<>("PUBSUB", "NUMSUB", new ListObjectDecoder<>(1)); RedisCommand<Long> PUBSUB_NUMSUB = new RedisCommand<>("PUBSUB", "NUMSUB", new ListObjectDecoder<>(1));
RedisCommand<Long> PUBSUB_NUMPAT = new RedisCommand<>("PUBSUB", "NUMPAT", new ListObjectDecoder<>(1));
RedisCommand<List<String>> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder()); RedisCommand<List<String>> PUBSUB_CHANNELS = new RedisStrictCommand<>("PUBSUB", "CHANNELS", new StringListReplayDecoder());
RedisCommand<List<String>> PUBSUB_SHARDCHANNELS = new RedisStrictCommand<>("PUBSUB", "SHARDCHANNELS", new StringListReplayDecoder()); RedisCommand<List<String>> PUBSUB_SHARDCHANNELS = new RedisStrictCommand<>("PUBSUB", "SHARDCHANNELS", new StringListReplayDecoder());
RedisCommand<Long> PUBSUB_SHARDNUMSUB = new RedisCommand<>("PUBSUB", "SHARDNUMSUB", new ListObjectDecoder<>(1)); RedisCommand<Long> PUBSUB_SHARDNUMSUB = new RedisCommand<>("PUBSUB", "SHARDNUMSUB", new ListObjectDecoder<>(1));

@ -21,7 +21,6 @@ import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.*; import org.redisson.client.*;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.ObjectDecoder; import org.redisson.client.protocol.decoder.ObjectDecoder;
@ -92,7 +91,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Throwable lastException = null; Throwable lastException = null;
List<String> failedMasters = new ArrayList<>(); List<String> failedMasters = new ArrayList<>();
boolean skipShardingDetection = false; boolean skipCommandsDetection = false;
for (String address : cfg.getNodeAddresses()) { for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address); RedisURI addr = new RedisURI(address);
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost()); CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
@ -107,18 +106,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
clusterNodesCommand = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", clusterNodesCommand = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES",
new ObjectDecoder(new ClusterNodesDecoder(addr.getScheme()))); new ObjectDecoder(new ClusterNodesDecoder(addr.getScheme())));
if (!skipShardingDetection) { if (!skipCommandsDetection) {
if (cfg.getShardedSubscriptionMode() == ShardedSubscriptionMode.AUTO) { subscribeService.checkShardingSupport(cfg.getShardedSubscriptionMode(), connection);
try { subscribeService.checkPatternSupport(connection);
connection.sync(RedisCommands.PUBSUB_SHARDNUMSUB); skipCommandsDetection = true;
subscribeService.setShardingSupported(true);
} catch (Exception e) {
// skip
}
} else if (cfg.getShardedSubscriptionMode() == ShardedSubscriptionMode.ON) {
subscribeService.setShardingSupported(true);
}
skipShardingDetection = true;
} }
List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand); List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);

@ -29,6 +29,7 @@ import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.ShardedSubscriptionMode;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
@ -123,6 +124,7 @@ public class PublishSubscribeService {
private final Set<PubSubConnectionEntry> trackedEntries = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<PubSubConnectionEntry> trackedEntries = Collections.newSetFromMap(new ConcurrentHashMap<>());
private boolean shardingSupported = false; private boolean shardingSupported = false;
private boolean patternSupported = true;
public PublishSubscribeService(ConnectionManager connectionManager) { public PublishSubscribeService(ConnectionManager connectionManager) {
super(); super();
@ -1039,13 +1041,41 @@ public class PublishSubscribeService {
return f; return f;
} }
public void checkPatternSupport(RedisConnection connection) {
try {
connection.sync(RedisCommands.PUBSUB_NUMPAT);
} catch (Exception e) {
setPatternSupported(false);
}
}
public void checkShardingSupport(ShardedSubscriptionMode mode, RedisConnection connection) {
if (mode == ShardedSubscriptionMode.AUTO) {
try {
connection.sync(RedisCommands.PUBSUB_SHARDNUMSUB);
setShardingSupported(true);
} catch (Exception e) {
// skip
}
} else if (mode == ShardedSubscriptionMode.ON) {
setShardingSupported(true);
}
}
public boolean isPatternSupported() {
return patternSupported;
}
public void setPatternSupported(boolean patternSupported) {
this.patternSupported = patternSupported;
}
public void setShardingSupported(boolean shardingSupported) { public void setShardingSupported(boolean shardingSupported) {
this.shardingSupported = shardingSupported; this.shardingSupported = shardingSupported;
} }
public boolean isShardingSupported() { public boolean isShardingSupported() {
return shardingSupported; return shardingSupported;
} }
public String getPublishCommand() { public String getPublishCommand() {
if (shardingSupported) { if (shardingSupported) {
return RedisCommands.SPUBLISH.getName(); return RedisCommands.SPUBLISH.getName();

@ -1795,7 +1795,7 @@ public class RedissonLiveObjectServiceTest extends RedisDockerTest {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
assertThat(redisson.getKeys().count()).isZero(); assertThat(redisson.getKeys().count()).isZero();
}, NOTIFY_KEYSPACE_EVENTS, "Kx"); }, NOTIFY_KEYSPACE_EVENTS, "KEA");
} }
@Test @Test

Loading…
Cancel
Save