|
|
|
@ -15,6 +15,9 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.api.RSemaphore;
|
|
|
|
|
import org.redisson.api.RTopic;
|
|
|
|
@ -37,6 +40,83 @@ import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
*/
|
|
|
|
|
public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpirable {
|
|
|
|
|
|
|
|
|
|
private class ResetListener implements FutureListener<Long> {
|
|
|
|
|
|
|
|
|
|
private final RPromise<Void> result;
|
|
|
|
|
|
|
|
|
|
private ResetListener(RPromise<Void> result) {
|
|
|
|
|
this.result = result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Long> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
acquireAsync(future.getNow().intValue()).addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.trySuccess(null);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Void> acquireAsync(int value) {
|
|
|
|
|
return semaphore.acquireAsync(value);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private class SumListener implements FutureListener<Long> {
|
|
|
|
|
|
|
|
|
|
private final RPromise<T> result;
|
|
|
|
|
|
|
|
|
|
private SumListener(RPromise<T> result) {
|
|
|
|
|
this.result = result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Long> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
acquireAsync(future.getNow().intValue()).addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<T> valueFuture = getAndDeleteAsync();
|
|
|
|
|
valueFuture.addListener(new FutureListener<T>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.trySuccess(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected RFuture<Void> acquireAsync(int value) {
|
|
|
|
|
return semaphore.acquireAsync(value);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(RedissonBaseAdder.class);
|
|
|
|
|
|
|
|
|
|
private static final long CLEAR_MSG = 0;
|
|
|
|
@ -102,76 +182,71 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
|
|
|
|
|
get(resetAsync());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void reset(long timeout, TimeUnit timeUnit) {
|
|
|
|
|
get(resetAsync(timeout, timeUnit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<T> sumAsync() {
|
|
|
|
|
final RPromise<T> result = new RedissonPromise<T>();
|
|
|
|
|
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(SUM_MSG);
|
|
|
|
|
future.addListener(new FutureListener<Long>() {
|
|
|
|
|
|
|
|
|
|
future.addListener(new SumListener(result));
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<T> sumAsync(final long timeout, final TimeUnit timeUnit) {
|
|
|
|
|
RPromise<T> result = new RedissonPromise<T>();
|
|
|
|
|
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(SUM_MSG);
|
|
|
|
|
future.addListener(new SumListener(result) {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Long> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
semaphore.acquireAsync(future.getNow().intValue()).addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<T> valueFuture = getAndDeleteAsync();
|
|
|
|
|
valueFuture.addListener(new FutureListener<T>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.trySuccess(future.getNow());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
protected RFuture<Void> acquireAsync(int value) {
|
|
|
|
|
return tryAcquire(timeout, timeUnit, value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<Void> resetAsync() {
|
|
|
|
|
final RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(CLEAR_MSG);
|
|
|
|
|
future.addListener(new FutureListener<Long>() {
|
|
|
|
|
|
|
|
|
|
protected RFuture<Void> tryAcquire(long timeout, TimeUnit timeUnit, int value) {
|
|
|
|
|
final RPromise<Void> acquireResult = new RedissonPromise<Void>();
|
|
|
|
|
semaphore.tryAcquireAsync(value, timeout, timeUnit).addListener(new FutureListener<Boolean>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Long> future) throws Exception {
|
|
|
|
|
public void operationComplete(Future<Boolean> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
acquireResult.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int value = 0;
|
|
|
|
|
if (future.getNow() != null) {
|
|
|
|
|
value = future.getNow().intValue();
|
|
|
|
|
if (future.getNow()) {
|
|
|
|
|
acquireResult.trySuccess(null);
|
|
|
|
|
} else {
|
|
|
|
|
acquireResult.tryFailure(new TimeoutException());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
semaphore.acquireAsync(value).addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return acquireResult;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result.trySuccess(null);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
public RFuture<Void> resetAsync() {
|
|
|
|
|
final RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(CLEAR_MSG);
|
|
|
|
|
future.addListener(new ResetListener(result));
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<Void> resetAsync(final long timeout, final TimeUnit timeUnit) {
|
|
|
|
|
final RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(CLEAR_MSG);
|
|
|
|
|
future.addListener(new ResetListener(result) {
|
|
|
|
|
@Override
|
|
|
|
|
protected RFuture<Void> acquireAsync(int value) {
|
|
|
|
|
return tryAcquire(timeout, timeUnit, value);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|