From b26d52c5abdadeff8d03b9f52dd9ba396e6a220f Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 24 Jan 2018 11:28:08 +0300 Subject: [PATCH] Reset and Sum methods with timeout support added to both RDoubleAdder and RLongAdder objects --- .../java/org/redisson/RedissonBaseAdder.java | 179 +++++++++++++----- .../java/org/redisson/api/RDoubleAdder.java | 36 +++- .../java/org/redisson/api/RLongAdder.java | 25 +++ 3 files changed, 183 insertions(+), 57 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index 91eba4573..f92c355bb 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -15,6 +15,9 @@ */ package org.redisson; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.redisson.api.RFuture; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; @@ -37,6 +40,83 @@ import io.netty.util.concurrent.FutureListener; */ public abstract class RedissonBaseAdder extends RedissonExpirable { + private class ResetListener implements FutureListener { + + private final RPromise result; + + private ResetListener(RPromise result) { + this.result = result; + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(null); + } + }); + } + + protected RFuture acquireAsync(int value) { + return semaphore.acquireAsync(value); + } + } + + private class SumListener implements FutureListener { + + private final RPromise result; + + private SumListener(RPromise result) { + this.result = result; + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + RFuture valueFuture = getAndDeleteAsync(); + valueFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(future.getNow()); + } + }); + } + }); + } + + protected RFuture acquireAsync(int value) { + return semaphore.acquireAsync(value); + } + } + private static final Logger log = LoggerFactory.getLogger(RedissonBaseAdder.class); private static final long CLEAR_MSG = 0; @@ -102,76 +182,71 @@ public abstract class RedissonBaseAdder extends RedissonExpira get(resetAsync()); } + public void reset(long timeout, TimeUnit timeUnit) { + get(resetAsync(timeout, timeUnit)); + } + public RFuture sumAsync() { final RPromise result = new RedissonPromise(); RFuture future = topic.publishAsync(SUM_MSG); - future.addListener(new FutureListener() { - + future.addListener(new SumListener(result)); + + return result; + } + + public RFuture sumAsync(final long timeout, final TimeUnit timeUnit) { + RPromise result = new RedissonPromise(); + + RFuture future = topic.publishAsync(SUM_MSG); + future.addListener(new SumListener(result) { @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - semaphore.acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - RFuture valueFuture = getAndDeleteAsync(); - valueFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - result.trySuccess(future.getNow()); - } - }); - } - }); + protected RFuture acquireAsync(int value) { + return tryAcquire(timeout, timeUnit, value); } + }); return result; } - public RFuture resetAsync() { - final RPromise result = new RedissonPromise(); - - RFuture future = topic.publishAsync(CLEAR_MSG); - future.addListener(new FutureListener() { - + protected RFuture tryAcquire(long timeout, TimeUnit timeUnit, int value) { + final RPromise acquireResult = new RedissonPromise(); + semaphore.tryAcquireAsync(value, timeout, timeUnit).addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.tryFailure(future.cause()); + acquireResult.tryFailure(future.cause()); return; } - int value = 0; - if (future.getNow() != null) { - value = future.getNow().intValue(); + if (future.getNow()) { + acquireResult.trySuccess(null); + } else { + acquireResult.tryFailure(new TimeoutException()); } - - semaphore.acquireAsync(value).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } + } + }); + return acquireResult; + } - result.trySuccess(null); - } - }); + public RFuture resetAsync() { + final RPromise result = new RedissonPromise(); + + RFuture future = topic.publishAsync(CLEAR_MSG); + future.addListener(new ResetListener(result)); + + return result; + } + + public RFuture resetAsync(final long timeout, final TimeUnit timeUnit) { + final RPromise result = new RedissonPromise(); + + RFuture future = topic.publishAsync(CLEAR_MSG); + future.addListener(new ResetListener(result) { + @Override + protected RFuture acquireAsync(int value) { + return tryAcquire(timeout, timeUnit, value); } }); diff --git a/redisson/src/main/java/org/redisson/api/RDoubleAdder.java b/redisson/src/main/java/org/redisson/api/RDoubleAdder.java index fc2935b8d..966fdd7c3 100644 --- a/redisson/src/main/java/org/redisson/api/RDoubleAdder.java +++ b/redisson/src/main/java/org/redisson/api/RDoubleAdder.java @@ -15,8 +15,10 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + /** - * Distributed implementation of {@link java.util.concurrent.atomic.LongAdder} + * Distributed implementation of {@link java.util.concurrent.atomic.DoubleAdder} *

* Internal state maintained on client side. * @@ -43,28 +45,52 @@ public interface RDoubleAdder extends RExpirable, RDestroyable { void decrement(); /** - * Accumulates sum across all RLongAdder instances + * Accumulates sum across all RDoubleAdder instances * * @return accumulated sum */ double sum(); /** - * Resets value across all RLongAdder instances + * Resets value across all RDoubleAdder instances */ void reset(); /** - * Accumulates sum across all RLongAdder instances + * Accumulates sum across all RDoubleAdder instances * * @return accumulated sum */ RFuture sumAsync(); + + /** + * Accumulates sum across all RDoubleAdder instances + * within defined timeout. + * + * @param timeout for accumulation + * @param timeUnit for timeout + * + * @return accumulated sum + */ + RFuture sumAsync(long timeout, TimeUnit timeUnit); + /** - * Resets value across all RLongAdder instances + * Resets value across all RDoubleAdder instances * * @return void */ RFuture resetAsync(); + + /** + * Resets value across all RDoubleAdder instances + * within defined timeout. + * + * @param timeout for reset + * @param timeUnit for timeout + * + * @return void + */ + RFuture resetAsync(long timeout, TimeUnit timeUnit); + } diff --git a/redisson/src/main/java/org/redisson/api/RLongAdder.java b/redisson/src/main/java/org/redisson/api/RLongAdder.java index 4cda3e185..ee6f8aa37 100644 --- a/redisson/src/main/java/org/redisson/api/RLongAdder.java +++ b/redisson/src/main/java/org/redisson/api/RLongAdder.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + /** * Distributed implementation of {@link java.util.concurrent.atomic.LongAdder} *

@@ -61,10 +63,33 @@ public interface RLongAdder extends RExpirable, RDestroyable { */ RFuture sumAsync(); + /** + * Accumulates sum across all RLongAdder instances + * within defined timeout. + * + * @param timeout for accumulation + * @param timeUnit for timeout + * + * @return accumulated sum + */ + RFuture sumAsync(long timeout, TimeUnit timeUnit); + /** * Resets value across all RLongAdder instances * * @return void */ RFuture resetAsync(); + + /** + * Resets value across all RLongAdder instances + * within defined timeout. + * + * @param timeout for reset + * @param timeUnit for timeout + * + * @return void + */ + RFuture resetAsync(long timeout, TimeUnit timeUnit); + }