diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index 6732186da..d10b80659 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -48,8 +48,13 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RedissonBaseAdder(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name); - - topic = RedissonTopic.createRaw(StringCodec.INSTANCE, commandExecutor, suffixName(getRawName(), "topic")); + + if (getSubscribeService().isShardingSupported()) { + topic = RedissonShardedTopic.createRaw(StringCodec.INSTANCE, commandExecutor, suffixName(getRawName(), "topic")); + } else { + topic = RedissonTopic.createRaw(StringCodec.INSTANCE, commandExecutor, suffixName(getRawName(), "topic")); + } + this.redisson = redisson; listenerId = topic.addListener(String.class, (channel, msg) -> { String[] parts = msg.split(":"); @@ -95,9 +100,9 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RFuture sumAsync() { String id = getServiceManager().generateId(); - RFuture future = topic.publishAsync(SUM_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); + RFuture future = topic.publishAsync(SUM_MSG + ":" + id); CompletionStage f = future.thenCompose(r -> semaphore.acquireAsync(r.intValue())) .thenCompose(r -> getAndDeleteAsync(id)) .thenCompose(r -> semaphore.deleteAsync().thenApply(res -> r)); @@ -114,8 +119,9 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RFuture sumAsync(long timeout, TimeUnit timeUnit) { String id = getServiceManager().generateId(); - RFuture future = topic.publishAsync(SUM_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); + + RFuture future = topic.publishAsync(SUM_MSG + ":" + id); CompletionStage f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue())) .thenCompose(r -> getAndDeleteAsync(id)) .thenCompose(r -> semaphore.deleteAsync().thenApply(res -> r)); @@ -137,8 +143,9 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RFuture resetAsync() { String id = getServiceManager().generateId(); - RFuture future = topic.publishAsync(CLEAR_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); + + RFuture future = topic.publishAsync(CLEAR_MSG + ":" + id); CompletionStage f = future.thenCompose(r -> semaphore.acquireAsync(r.intValue())) .thenCompose(r -> semaphore.deleteAsync().thenApply(res -> null)); return new CompletableFutureWrapper<>(f); @@ -146,8 +153,9 @@ public abstract class RedissonBaseAdder extends RedissonExpira public RFuture resetAsync(long timeout, TimeUnit timeUnit) { String id = getServiceManager().generateId(); - RFuture future = topic.publishAsync(CLEAR_MSG + ":" + id); RSemaphore semaphore = getSemaphore(id); + + RFuture future = topic.publishAsync(CLEAR_MSG + ":" + id); CompletionStage f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue())) .thenCompose(r -> semaphore.deleteAsync().thenApply(res -> null)); return new CompletableFutureWrapper<>(f);