From 0284a5e2bb974be2a672cdde47e667b8e6d463b0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 1 Apr 2024 08:34:30 +0300 Subject: [PATCH] Fixed - Incorrect value of RLongAdder.sum() and DoubleAddder.sum() methods in case of multiple Adder instances for the same Redisson object #5742 --- .../java/org/redisson/RedissonBaseAdder.java | 44 ++++++++++++------- .../redisson/connection/ServiceManager.java | 11 +++++ .../org/redisson/RedissonLongAdderTest.java | 8 ++++ 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index d10b80659..68201241e 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -25,10 +25,8 @@ import org.redisson.misc.CompletableFutureWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -56,6 +54,10 @@ public abstract class RedissonBaseAdder extends RedissonExpira } this.redisson = redisson; + + AtomicInteger usage = getServiceManager().getAddersUsage().computeIfAbsent(name, r -> new AtomicInteger()); + usage.incrementAndGet(); + listenerId = topic.addListener(String.class, (channel, msg) -> { String[] parts = msg.split(":"); String id = parts[1]; @@ -69,25 +71,30 @@ public abstract class RedissonBaseAdder extends RedissonExpira return; } - semaphore.releaseAsync().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Can't release semaphore", ex); - } - }); + release(id, usage, semaphore); }); } if (parts[0].equals(CLEAR_MSG)) { doReset(); - semaphore.releaseAsync().whenComplete((res, e) -> { - if (e != null) { - log.error("Can't release semaphore", e); - } - }); + + release(id, usage, semaphore); } }); } + private void release(String id, AtomicInteger usage, RSemaphore semaphore) { + AtomicInteger counter = getServiceManager().getAddersCounter().computeIfAbsent(id, r -> new AtomicInteger()); + if (counter.incrementAndGet() == usage.get()) { + getServiceManager().getAddersCounter().remove(id); + semaphore.releaseAsync().whenComplete((r, ex) -> { + if (ex != null) { + log.error("Can't release semaphore", ex); + } + }); + } + } + protected abstract void doReset(); public void reset() { @@ -122,7 +129,9 @@ public abstract class RedissonBaseAdder extends RedissonExpira RSemaphore semaphore = getSemaphore(id); RFuture future = topic.publishAsync(SUM_MSG + ":" + id); - CompletionStage f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue())) + CompletionStage f = future.thenCompose(r -> { + return tryAcquire(semaphore, timeout, timeUnit, r.intValue()); + }) .thenCompose(r -> getAndDeleteAsync(id)) .thenCompose(r -> semaphore.deleteAsync().thenApply(res -> r)); return new CompletableFutureWrapper<>(f); @@ -163,6 +172,11 @@ public abstract class RedissonBaseAdder extends RedissonExpira public void destroy() { topic.removeListener(listenerId); + + AtomicInteger c = getServiceManager().getAddersUsage().getOrDefault(name, new AtomicInteger()); + if (c.decrementAndGet() == 0) { + getServiceManager().getAddersUsage().remove(name, c); + } } protected abstract RFuture addAndGetAsync(String id); diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index a844f6f00..03c872d19 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -654,4 +654,15 @@ public final class ServiceManager { return codec; } + private final Map addersUsage = new ConcurrentHashMap<>(); + + public Map getAddersUsage() { + return addersUsage; + } + + private final Map addersCounter = new ConcurrentHashMap<>(); + + public Map getAddersCounter() { + return addersCounter; + } } diff --git a/redisson/src/test/java/org/redisson/RedissonLongAdderTest.java b/redisson/src/test/java/org/redisson/RedissonLongAdderTest.java index 7b4076a99..43c8c2ec1 100644 --- a/redisson/src/test/java/org/redisson/RedissonLongAdderTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLongAdderTest.java @@ -19,6 +19,10 @@ public class RedissonLongAdderTest extends RedisDockerTest { Assertions.assertThat(adder1.sum()).isEqualTo(7); Assertions.assertThat(adder2.sum()).isEqualTo(7); Assertions.assertThat(adder3.sum()).isEqualTo(7); + + adder1.destroy(); + adder2.destroy(); + adder3.destroy(); } @Test @@ -36,6 +40,10 @@ public class RedissonLongAdderTest extends RedisDockerTest { Assertions.assertThat(adder1.sum()).isZero(); Assertions.assertThat(adder2.sum()).isZero(); Assertions.assertThat(adder3.sum()).isZero(); + + adder1.destroy(); + adder2.destroy(); + adder3.destroy(); } }