From e91cff56f0741970b06c935ccffca12b3886a555 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov <nkoksharov@redisson.pro> Date: Fri, 7 Jun 2024 09:16:59 +0300 Subject: [PATCH] refactoring --- .../src/main/java/org/redisson/RedissonReliableTopic.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java index 213f66f5d..c1eb85ae8 100644 --- a/redisson/src/main/java/org/redisson/RedissonReliableTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonReliableTopic.java @@ -150,14 +150,15 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl return new CompletableFutureWrapper<>(id); } - renewExpiration(); - RFuture<Void> addFuture = commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('zadd', KEYS[2], ARGV[3], ARGV[2]);" + "redis.call('xgroup', 'create', KEYS[1], ARGV[2], ARGV[1], 'MKSTREAM'); ", Arrays.asList(getRawName(), getTimeout()), StreamMessageId.ALL, subscriberId, System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout()); + CompletionStage<String> f = addFuture.thenApply(r -> { + renewExpiration(); + poll(subscriberId); return id; });