|
|
|
@ -15,17 +15,13 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson;
|
|
|
|
|
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.api.RSemaphore;
|
|
|
|
|
import org.redisson.api.RTopic;
|
|
|
|
|
import org.redisson.api.RedissonClient;
|
|
|
|
|
import org.redisson.api.listener.MessageListener;
|
|
|
|
|
import org.redisson.client.codec.LongCodec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.misc.LongAdder;
|
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
|
import org.redisson.misc.RedissonPromise;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
@ -109,20 +105,17 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
|
|
|
|
|
public RFuture<T> sumAsync() {
|
|
|
|
|
final RPromise<T> result = new RedissonPromise<T>();
|
|
|
|
|
|
|
|
|
|
RFuture<Integer> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
|
|
|
|
|
"redis.call('del', KEYS[1]); "
|
|
|
|
|
+ "return redis.call('publish', KEYS[2], ARGV[1]); ",
|
|
|
|
|
Arrays.<Object>asList(getName(), topic.getChannelNames().get(0)), SUM_MSG);
|
|
|
|
|
future.addListener(new FutureListener<Integer>() {
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(SUM_MSG);
|
|
|
|
|
future.addListener(new FutureListener<Long>() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Integer> future) throws Exception {
|
|
|
|
|
public void operationComplete(Future<Long> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
semaphore.acquireAsync(future.getNow()).addListener(new FutureListener<Void>() {
|
|
|
|
|
|
|
|
|
|
semaphore.acquireAsync(future.getNow().intValue()).addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
@ -130,7 +123,7 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<T> valueFuture = getAsync();
|
|
|
|
|
RFuture<T> valueFuture = getAndDeleteAsync();
|
|
|
|
|
valueFuture.addListener(new FutureListener<T>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
@ -153,21 +146,22 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
|
|
|
|
|
public RFuture<Void> resetAsync() {
|
|
|
|
|
final RPromise<Void> result = new RedissonPromise<Void>();
|
|
|
|
|
|
|
|
|
|
RFuture<Integer> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
|
|
|
|
|
"redis.call('del', KEYS[1]); "
|
|
|
|
|
+ "return redis.call('publish', KEYS[2], ARGV[1]); ",
|
|
|
|
|
Arrays.<Object>asList(getName(), topic.getChannelNames().get(0)), CLEAR_MSG);
|
|
|
|
|
|
|
|
|
|
future.addListener(new FutureListener<Integer>() {
|
|
|
|
|
RFuture<Long> future = topic.publishAsync(CLEAR_MSG);
|
|
|
|
|
future.addListener(new FutureListener<Long>() {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Integer> future) throws Exception {
|
|
|
|
|
public void operationComplete(Future<Long> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
result.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
semaphore.acquireAsync(future.getNow()).addListener(new FutureListener<Void>() {
|
|
|
|
|
int value = 0;
|
|
|
|
|
if (future.getNow() != null) {
|
|
|
|
|
value = future.getNow().intValue();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
semaphore.acquireAsync(value).addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
@ -190,6 +184,6 @@ public abstract class RedissonBaseAdder<T extends Number> extends RedissonExpira
|
|
|
|
|
|
|
|
|
|
protected abstract RFuture<T> addAndGetAsync();
|
|
|
|
|
|
|
|
|
|
protected abstract RFuture<T> getAsync();
|
|
|
|
|
protected abstract RFuture<T> getAndDeleteAsync();
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|