Feature - LongAdder and DoubleAddder use sharded topic if possible.

pull/5757/head
Nikita Koksharov 12 months ago
parent 839b54b76d
commit 4e48be8413

@ -49,7 +49,12 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RedissonBaseAdder(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
if (getSubscribeService().isShardingSupported()) {
topic = RedissonShardedTopic.createRaw(StringCodec.INSTANCE, commandExecutor, suffixName(getRawName(), "topic"));
} else {
topic = RedissonTopic.createRaw(StringCodec.INSTANCE, commandExecutor, suffixName(getRawName(), "topic"));
}
this.redisson = redisson;
listenerId = topic.addListener(String.class, (channel, msg) -> {
String[] parts = msg.split(":");
@ -95,9 +100,9 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<T> sumAsync() {
String id = getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
CompletionStage<T> f = future.thenCompose(r -> semaphore.acquireAsync(r.intValue()))
.thenCompose(r -> getAndDeleteAsync(id))
.thenCompose(r -> semaphore.deleteAsync().thenApply(res -> r));
@ -114,8 +119,9 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<T> sumAsync(long timeout, TimeUnit timeUnit) {
String id = getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
CompletionStage<T> f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue()))
.thenCompose(r -> getAndDeleteAsync(id))
.thenCompose(r -> semaphore.deleteAsync().thenApply(res -> r));
@ -137,8 +143,9 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<Void> resetAsync() {
String id = getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
CompletionStage<Void> f = future.thenCompose(r -> semaphore.acquireAsync(r.intValue()))
.thenCompose(r -> semaphore.deleteAsync().thenApply(res -> null));
return new CompletableFutureWrapper<>(f);
@ -146,8 +153,9 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<Void> resetAsync(long timeout, TimeUnit timeUnit) {
String id = getServiceManager().generateId();
RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
CompletionStage<Void> f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue()))
.thenCompose(r -> semaphore.deleteAsync().thenApply(res -> null));
return new CompletableFutureWrapper<>(f);

Loading…
Cancel
Save