Fixed - RLongAdder.sum() and RDoubleAdder.sum() methods return wrong result #3148

pull/3169/head
Nikita Koksharov 4 years ago
parent 4341acaa9a
commit 7711c615c8

@ -15,22 +15,23 @@
*/ */
package org.redisson; package org.redisson;
import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBufUtil;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSemaphore; import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.StringCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -41,9 +42,11 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
private class ResetListener implements BiConsumer<Long, Throwable> { private class ResetListener implements BiConsumer<Long, Throwable> {
private final RPromise<Void> result; private final RPromise<Void> result;
private final RSemaphore semaphore;
ResetListener(RPromise<Void> result) { ResetListener(RSemaphore semaphore, RPromise<Void> result) {
this.result = result; this.result = result;
this.semaphore = semaphore;
} }
@Override @Override
@ -59,7 +62,14 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
return; return;
} }
result.trySuccess(null); semaphore.deleteAsync().onComplete((re, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(res);
});
}); });
} }
@ -71,9 +81,13 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
private class SumListener implements BiConsumer<Long, Throwable> { private class SumListener implements BiConsumer<Long, Throwable> {
private final RPromise<T> result; private final RPromise<T> result;
private final RSemaphore semaphore;
private final String id;
SumListener(RPromise<T> result) { SumListener(String id, RSemaphore semaphore, RPromise<T> result) {
this.result = result; this.result = result;
this.semaphore = semaphore;
this.id = id;
} }
@Override @Override
@ -89,14 +103,21 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
return; return;
} }
RFuture<T> valueFuture = getAndDeleteAsync(); RFuture<T> valueFuture = getAndDeleteAsync(id);
valueFuture.onComplete((res, ex) -> { valueFuture.onComplete((res, ex) -> {
if (ex != null) { if (ex != null) {
result.tryFailure(ex); result.tryFailure(ex);
return; return;
} }
result.trySuccess(res); semaphore.deleteAsync().onComplete((re, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
result.trySuccess(res);
});
}); });
}); });
} }
@ -108,55 +129,58 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
private static final Logger log = LoggerFactory.getLogger(RedissonBaseAdder.class); private static final Logger log = LoggerFactory.getLogger(RedissonBaseAdder.class);
private static final long CLEAR_MSG = 0; private static final String CLEAR_MSG = "0";
private static final long SUM_MSG = 1; private static final String SUM_MSG = "1";
private final RSemaphore semaphore; private final RedissonClient redisson;
private final RTopic topic; private final RTopic topic;
private final int listenerId; private final int listenerId;
public RedissonBaseAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { public RedissonBaseAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name); super(connectionManager, name);
topic = redisson.getTopic(suffixName(getName(), "topic"), LongCodec.INSTANCE); topic = redisson.getTopic(suffixName(getName(), "topic"), StringCodec.INSTANCE);
semaphore = redisson.getSemaphore(suffixName(getName(), "semaphore")); this.redisson = redisson;
listenerId = topic.addListener(Long.class, new MessageListener<Long>() { listenerId = topic.addListener(String.class, (channel, msg) -> {
String[] parts = msg.split(":");
@Override String id = parts[1];
public void onMessage(CharSequence channel, Long msg) {
if (msg == SUM_MSG) { RSemaphore semaphore = getSemaphore(id);
RFuture<T> addAndGetFuture = addAndGetAsync(); if (parts[0].equals(SUM_MSG)) {
addAndGetFuture.onComplete((res, e) -> { RFuture<T> addAndGetFuture = addAndGetAsync(id);
if (e != null) { addAndGetFuture.onComplete((res, e) -> {
log.error("Can't increase sum", e); if (e != null) {
return; log.error("Can't increase sum", e);
} return;
}
semaphore.releaseAsync().onComplete((r, ex) -> {
if (ex != null) { semaphore.releaseAsync().onComplete((r, ex) -> {
log.error("Can't release semaphore", ex); if (ex != null) {
return; log.error("Can't release semaphore", ex);
}
});
});
}
if (msg == CLEAR_MSG) {
doReset();
semaphore.releaseAsync().onComplete((res, e) -> {
if (e != null) {
log.error("Can't release semaphore", e);
} }
}); });
} });
} }
if (parts[0].equals(CLEAR_MSG)) {
doReset();
semaphore.releaseAsync().onComplete((res, e) -> {
if (e != null) {
log.error("Can't release semaphore", e);
}
});
}
}); });
} }
protected abstract void doReset(); protected abstract void doReset();
private String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
public void reset() { public void reset() {
get(resetAsync()); get(resetAsync());
} }
@ -167,21 +191,33 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
public RFuture<T> sumAsync() { public RFuture<T> sumAsync() {
RPromise<T> result = new RedissonPromise<T>(); RPromise<T> result = new RedissonPromise<T>();
RFuture<Long> future = topic.publishAsync(SUM_MSG); String id = generateId();
future.onComplete(new SumListener(result)); RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
future.onComplete(new SumListener(id, semaphore, result));
return result; return result;
} }
private RSemaphore getSemaphore(String id) {
return redisson.getSemaphore(suffixName(getName(), id + ":semaphore"));
}
protected String getCounterName(String id) {
return suffixName(getName(), id + ":counter");
}
public RFuture<T> sumAsync(long timeout, TimeUnit timeUnit) { public RFuture<T> sumAsync(long timeout, TimeUnit timeUnit) {
RPromise<T> result = new RedissonPromise<T>(); RPromise<T> result = new RedissonPromise<T>();
RFuture<Long> future = topic.publishAsync(SUM_MSG); String id = generateId();
future.onComplete(new SumListener(result) { RFuture<Long> future = topic.publishAsync(SUM_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
future.onComplete(new SumListener(id, semaphore, result) {
@Override @Override
protected RFuture<Void> acquireAsync(int value) { protected RFuture<Void> acquireAsync(int value) {
return tryAcquire(timeout, timeUnit, value); return tryAcquire(semaphore, timeout, timeUnit, value);
} }
}); });
@ -189,8 +225,8 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
return result; return result;
} }
protected RFuture<Void> tryAcquire(long timeout, TimeUnit timeUnit, int value) { protected RFuture<Void> tryAcquire(RSemaphore semaphore, long timeout, TimeUnit timeUnit, int value) {
RPromise<Void> acquireResult = new RedissonPromise<Void>(); RPromise<Void> acquireResult = new RedissonPromise<>();
semaphore.tryAcquireAsync(value, timeout, timeUnit).onComplete((res, e) -> { semaphore.tryAcquireAsync(value, timeout, timeUnit).onComplete((res, e) -> {
if (e != null) { if (e != null) {
acquireResult.tryFailure(e); acquireResult.tryFailure(e);
@ -207,22 +243,26 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
} }
public RFuture<Void> resetAsync() { public RFuture<Void> resetAsync() {
RPromise<Void> result = new RedissonPromise<Void>(); RPromise<Void> result = new RedissonPromise<>();
RFuture<Long> future = topic.publishAsync(CLEAR_MSG); String id = generateId();
future.onComplete(new ResetListener(result)); RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
future.onComplete(new ResetListener(semaphore, result));
return result; return result;
} }
public RFuture<Void> resetAsync(long timeout, TimeUnit timeUnit) { public RFuture<Void> resetAsync(long timeout, TimeUnit timeUnit) {
RPromise<Void> result = new RedissonPromise<Void>(); RPromise<Void> result = new RedissonPromise<>();
RFuture<Long> future = topic.publishAsync(CLEAR_MSG); String id = generateId();
future.onComplete(new ResetListener(result) { RFuture<Long> future = topic.publishAsync(CLEAR_MSG + ":" + id);
RSemaphore semaphore = getSemaphore(id);
future.onComplete(new ResetListener(semaphore, result) {
@Override @Override
protected RFuture<Void> acquireAsync(int value) { protected RFuture<Void> acquireAsync(int value) {
return tryAcquire(timeout, timeUnit, value); return tryAcquire(semaphore, timeout, timeUnit, value);
} }
}); });
@ -233,8 +273,8 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
topic.removeListener(listenerId); topic.removeListener(listenerId);
} }
protected abstract RFuture<T> addAndGetAsync(); protected abstract RFuture<T> addAndGetAsync(String id);
protected abstract RFuture<T> getAndDeleteAsync(); protected abstract RFuture<T> getAndDeleteAsync(String id);
} }

@ -17,7 +17,6 @@ package org.redisson;
import java.util.concurrent.atomic.DoubleAdder; import java.util.concurrent.atomic.DoubleAdder;
import org.redisson.api.RAtomicDouble;
import org.redisson.api.RDoubleAdder; import org.redisson.api.RDoubleAdder;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
@ -31,12 +30,12 @@ import org.redisson.command.CommandAsyncExecutor;
public class RedissonDoubleAdder extends RedissonBaseAdder<Double> implements RDoubleAdder { public class RedissonDoubleAdder extends RedissonBaseAdder<Double> implements RDoubleAdder {
private final DoubleAdder counter = new DoubleAdder(); private final DoubleAdder counter = new DoubleAdder();
private final RAtomicDouble atomicDouble; private final RedissonClient redisson;
public RedissonDoubleAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { public RedissonDoubleAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name, redisson); super(connectionManager, name, redisson);
atomicDouble = redisson.getAtomicDouble(getName()); this.redisson = redisson;
} }
@Override @Override
@ -45,13 +44,13 @@ public class RedissonDoubleAdder extends RedissonBaseAdder<Double> implements RD
} }
@Override @Override
protected RFuture<Double> addAndGetAsync() { protected RFuture<Double> addAndGetAsync(String id) {
return atomicDouble.getAndAddAsync(counter.sum()); return redisson.getAtomicDouble(getCounterName(id)).getAndAddAsync(counter.sum());
} }
@Override @Override
protected RFuture<Double> getAndDeleteAsync() { protected RFuture<Double> getAndDeleteAsync(String id) {
return atomicDouble.getAndDeleteAsync(); return redisson.getAtomicDouble(getCounterName(id)).getAndDeleteAsync();
} }
@Override @Override

@ -15,14 +15,13 @@
*/ */
package org.redisson; package org.redisson;
import java.util.concurrent.atomic.LongAdder;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLongAdder; import org.redisson.api.RLongAdder;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import java.util.concurrent.atomic.LongAdder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -30,13 +29,13 @@ import org.redisson.command.CommandAsyncExecutor;
*/ */
public class RedissonLongAdder extends RedissonBaseAdder<Long> implements RLongAdder { public class RedissonLongAdder extends RedissonBaseAdder<Long> implements RLongAdder {
private final RAtomicLong atomicLong; private final RedissonClient redisson;
private final LongAdder counter = new LongAdder(); private final LongAdder counter = new LongAdder();
public RedissonLongAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) { public RedissonLongAdder(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name, redisson); super(connectionManager, name, redisson);
atomicLong = redisson.getAtomicLong(getName()); this.redisson = redisson;
} }
@Override @Override
@ -45,13 +44,13 @@ public class RedissonLongAdder extends RedissonBaseAdder<Long> implements RLongA
} }
@Override @Override
protected RFuture<Long> addAndGetAsync() { protected RFuture<Long> addAndGetAsync(String id) {
return atomicLong.getAndAddAsync(counter.sum()); return redisson.getAtomicLong(getCounterName(id)).getAndAddAsync(counter.sum());
} }
@Override @Override
protected RFuture<Long> getAndDeleteAsync() { protected RFuture<Long> getAndDeleteAsync(String id) {
return atomicLong.getAndDeleteAsync(); return redisson.getAtomicLong(getCounterName(id)).getAndDeleteAsync();
} }
@Override @Override

Loading…
Cancel
Save