Fixed - Incorrect value of RLongAdder.sum() and DoubleAddder.sum() methods in case of multiple Adder instances for the same Redisson object #5742

pull/5757/head
Nikita Koksharov 12 months ago
parent 4e48be8413
commit 0284a5e2bb

@ -25,10 +25,8 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -56,6 +54,10 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
}
this.redisson = redisson;
AtomicInteger usage = getServiceManager().getAddersUsage().computeIfAbsent(name, r -> new AtomicInteger());
usage.incrementAndGet();
listenerId = topic.addListener(String.class, (channel, msg) -> {
String[] parts = msg.split(":");
String id = parts[1];
@ -69,25 +71,30 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
return;
}
semaphore.releaseAsync().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Can't release semaphore", ex);
}
});
release(id, usage, semaphore);
});
}
if (parts[0].equals(CLEAR_MSG)) {
doReset();
semaphore.releaseAsync().whenComplete((res, e) -> {
if (e != null) {
log.error("Can't release semaphore", e);
}
});
release(id, usage, semaphore);
}
});
}
private void release(String id, AtomicInteger usage, RSemaphore semaphore) {
AtomicInteger counter = getServiceManager().getAddersCounter().computeIfAbsent(id, r -> new AtomicInteger());
if (counter.incrementAndGet() == usage.get()) {
getServiceManager().getAddersCounter().remove(id);
semaphore.releaseAsync().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Can't release semaphore", ex);
}
});
}
}
protected abstract void doReset();
public void reset() {
@ -122,7 +129,9 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
RSemaphore semaphore = getSemaphore(id);
RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
CompletionStage<T> f = future.thenCompose(r -> tryAcquire(semaphore, timeout, timeUnit, r.intValue()))
CompletionStage<T> f = future.thenCompose(r -> {
return tryAcquire(semaphore, timeout, timeUnit, r.intValue());
})
.thenCompose(r -> getAndDeleteAsync(id))
.thenCompose(r -> semaphore.deleteAsync().thenApply(res -> r));
return new CompletableFutureWrapper<>(f);
@ -163,6 +172,11 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public void destroy() {
topic.removeListener(listenerId);
AtomicInteger c = getServiceManager().getAddersUsage().getOrDefault(name, new AtomicInteger());
if (c.decrementAndGet() == 0) {
getServiceManager().getAddersUsage().remove(name, c);
}
}
protected abstract RFuture<T> addAndGetAsync(String id);

@ -654,4 +654,15 @@ public final class ServiceManager {
return codec;
}
private final Map<String, AtomicInteger> addersUsage = new ConcurrentHashMap<>();
public Map<String, AtomicInteger> getAddersUsage() {
return addersUsage;
}
private final Map<String, AtomicInteger> addersCounter = new ConcurrentHashMap<>();
public Map<String, AtomicInteger> getAddersCounter() {
return addersCounter;
}
}

@ -19,6 +19,10 @@ public class RedissonLongAdderTest extends RedisDockerTest {
Assertions.assertThat(adder1.sum()).isEqualTo(7);
Assertions.assertThat(adder2.sum()).isEqualTo(7);
Assertions.assertThat(adder3.sum()).isEqualTo(7);
adder1.destroy();
adder2.destroy();
adder3.destroy();
}
@Test
@ -36,6 +40,10 @@ public class RedissonLongAdderTest extends RedisDockerTest {
Assertions.assertThat(adder1.sum()).isZero();
Assertions.assertThat(adder2.sum()).isZero();
Assertions.assertThat(adder3.sum()).isZero();
adder1.destroy();
adder2.destroy();
adder3.destroy();
}
}

Loading…
Cancel
Save