refactoring

pull/5937/head
Nikita Koksharov 8 months ago
parent f08dfd894d
commit e91cff56f0

@ -150,14 +150,15 @@ public class RedissonReliableTopic extends RedissonExpirable implements RReliabl
return new CompletableFutureWrapper<>(id);
}
renewExpiration();
RFuture<Void> addFuture = commandExecutor.evalWriteNoRetryAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('zadd', KEYS[2], ARGV[3], ARGV[2]);" +
"redis.call('xgroup', 'create', KEYS[1], ARGV[2], ARGV[1], 'MKSTREAM'); ",
Arrays.asList(getRawName(), getTimeout()),
StreamMessageId.ALL, subscriberId, System.currentTimeMillis() + getServiceManager().getCfg().getReliableTopicWatchdogTimeout());
CompletionStage<String> f = addFuture.thenApply(r -> {
renewExpiration();
poll(subscriberId);
return id;
});

Loading…
Cancel
Save