Fixed - Incorrect value of RLongAdder.sum() and DoubleAddder.sum() methods in case of multiple Adder instances for the same Redisson object #5742

pull/5708/head^2
Nikita Koksharov 11 months ago
parent 2b6f2fda79
commit 707f1505a9

@ -21,6 +21,7 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.AdderEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,14 +56,15 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
this.redisson = redisson;
AtomicInteger usage = getServiceManager().getAddersUsage().computeIfAbsent(name, r -> new AtomicInteger());
usage.incrementAndGet();
AdderEntry entry = getServiceManager().getAddersUsage().computeIfAbsent(name, r -> new AdderEntry());
entry.getUsage().incrementAndGet();
listenerId = topic.addListener(String.class, (channel, msg) -> {
String[] parts = msg.split(":");
String id = parts[1];
RSemaphore semaphore = getSemaphore(id);
entry.getIds().add(id);
if (parts[0].equals(SUM_MSG)) {
RFuture<T> addAndGetFuture = addAndGetAsync(id);
addAndGetFuture.whenComplete((res, e) -> {
@ -71,22 +73,25 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
return;
}
release(id, usage, semaphore);
release(id, entry);
});
}
if (parts[0].equals(CLEAR_MSG)) {
doReset();
release(id, usage, semaphore);
release(id, entry);
}
});
}
private void release(String id, AtomicInteger usage, RSemaphore semaphore) {
private void release(String id, AdderEntry entry) {
AtomicInteger counter = getServiceManager().getAddersCounter().computeIfAbsent(id, r -> new AtomicInteger());
if (counter.incrementAndGet() == usage.get()) {
if (counter.incrementAndGet() == entry.getUsage().get()
|| entry.getUsage().get() == 0) {
getServiceManager().getAddersCounter().remove(id);
entry.getIds().remove(id);
RSemaphore semaphore = getSemaphore(id);
semaphore.releaseAsync().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Can't release semaphore", ex);
@ -173,9 +178,13 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public void destroy() {
topic.removeListener(listenerId);
AtomicInteger c = getServiceManager().getAddersUsage().getOrDefault(name, new AtomicInteger());
if (c.decrementAndGet() == 0) {
getServiceManager().getAddersUsage().remove(name, c);
AdderEntry entry = getServiceManager().getAddersUsage().get(name);
if (entry != null
&& entry.getUsage().decrementAndGet() == 0) {
for (String id : entry.getIds()) {
release(id, entry);
}
getServiceManager().getAddersUsage().remove(name, entry);
}
}

@ -0,0 +1,27 @@
package org.redisson.connection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author Nikita Koksharov
*
*/
public class AdderEntry {
private final Set<String> ids = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicInteger usage = new AtomicInteger();
public Set<String> getIds() {
return ids;
}
public AtomicInteger getUsage() {
return usage;
}
}

@ -653,9 +653,9 @@ public final class ServiceManager {
return codec;
}
private final Map<String, AtomicInteger> addersUsage = new ConcurrentHashMap<>();
private final Map<String, AdderEntry> addersUsage = new ConcurrentHashMap<>();
public Map<String, AtomicInteger> getAddersUsage() {
public Map<String, AdderEntry> getAddersUsage() {
return addersUsage;
}

@ -6,6 +6,20 @@ import org.redisson.api.RLongAdder;
public class RedissonLongAdderTest extends RedisDockerTest {
@Test
public void testSumStability() {
for (int i = 0; i < 100; i++) {
testSum();
}
}
@Test
public void testResetStability() {
for (int i = 0; i < 100; i++) {
testReset();
}
}
@Test
public void testSum() {
RLongAdder adder1 = redisson.getLongAdder("test1");

Loading…
Cancel
Save