diff --git a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java index 91eba4573..f92c355bb 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseAdder.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseAdder.java @@ -15,6 +15,9 @@ */ package org.redisson; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + import org.redisson.api.RFuture; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; @@ -37,6 +40,83 @@ import io.netty.util.concurrent.FutureListener; */ public abstract class RedissonBaseAdder extends RedissonExpirable { + private class ResetListener implements FutureListener { + + private final RPromise result; + + private ResetListener(RPromise result) { + this.result = result; + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(null); + } + }); + } + + protected RFuture acquireAsync(int value) { + return semaphore.acquireAsync(value); + } + } + + private class SumListener implements FutureListener { + + private final RPromise result; + + private SumListener(RPromise result) { + this.result = result; + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + RFuture valueFuture = getAndDeleteAsync(); + valueFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(future.getNow()); + } + }); + } + }); + } + + protected RFuture acquireAsync(int value) { + return semaphore.acquireAsync(value); + } + } + private static final Logger log = LoggerFactory.getLogger(RedissonBaseAdder.class); private static final long CLEAR_MSG = 0; @@ -102,76 +182,71 @@ public abstract class RedissonBaseAdder extends RedissonExpira get(resetAsync()); } + public void reset(long timeout, TimeUnit timeUnit) { + get(resetAsync(timeout, timeUnit)); + } + public RFuture sumAsync() { final RPromise result = new RedissonPromise(); RFuture future = topic.publishAsync(SUM_MSG); - future.addListener(new FutureListener() { - + future.addListener(new SumListener(result)); + + return result; + } + + public RFuture sumAsync(final long timeout, final TimeUnit timeUnit) { + RPromise result = new RedissonPromise(); + + RFuture future = topic.publishAsync(SUM_MSG); + future.addListener(new SumListener(result) { @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - semaphore.acquireAsync(future.getNow().intValue()).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - RFuture valueFuture = getAndDeleteAsync(); - valueFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - result.trySuccess(future.getNow()); - } - }); - } - }); + protected RFuture acquireAsync(int value) { + return tryAcquire(timeout, timeUnit, value); } + }); return result; } - public RFuture resetAsync() { - final RPromise result = new RedissonPromise(); - - RFuture future = topic.publishAsync(CLEAR_MSG); - future.addListener(new FutureListener() { - + protected RFuture tryAcquire(long timeout, TimeUnit timeUnit, int value) { + final RPromise acquireResult = new RedissonPromise(); + semaphore.tryAcquireAsync(value, timeout, timeUnit).addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - result.tryFailure(future.cause()); + acquireResult.tryFailure(future.cause()); return; } - int value = 0; - if (future.getNow() != null) { - value = future.getNow().intValue(); + if (future.getNow()) { + acquireResult.trySuccess(null); + } else { + acquireResult.tryFailure(new TimeoutException()); } - - semaphore.acquireAsync(value).addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } + } + }); + return acquireResult; + } - result.trySuccess(null); - } - }); + public RFuture resetAsync() { + final RPromise result = new RedissonPromise(); + + RFuture future = topic.publishAsync(CLEAR_MSG); + future.addListener(new ResetListener(result)); + + return result; + } + + public RFuture resetAsync(final long timeout, final TimeUnit timeUnit) { + final RPromise result = new RedissonPromise(); + + RFuture future = topic.publishAsync(CLEAR_MSG); + future.addListener(new ResetListener(result) { + @Override + protected RFuture acquireAsync(int value) { + return tryAcquire(timeout, timeUnit, value); } }); diff --git a/redisson/src/main/java/org/redisson/api/RDoubleAdder.java b/redisson/src/main/java/org/redisson/api/RDoubleAdder.java index fc2935b8d..966fdd7c3 100644 --- a/redisson/src/main/java/org/redisson/api/RDoubleAdder.java +++ b/redisson/src/main/java/org/redisson/api/RDoubleAdder.java @@ -15,8 +15,10 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + /** - * Distributed implementation of {@link java.util.concurrent.atomic.LongAdder} + * Distributed implementation of {@link java.util.concurrent.atomic.DoubleAdder} *

* Internal state maintained on client side. * @@ -43,28 +45,52 @@ public interface RDoubleAdder extends RExpirable, RDestroyable { void decrement(); /** - * Accumulates sum across all RLongAdder instances + * Accumulates sum across all RDoubleAdder instances * * @return accumulated sum */ double sum(); /** - * Resets value across all RLongAdder instances + * Resets value across all RDoubleAdder instances */ void reset(); /** - * Accumulates sum across all RLongAdder instances + * Accumulates sum across all RDoubleAdder instances * * @return accumulated sum */ RFuture sumAsync(); + + /** + * Accumulates sum across all RDoubleAdder instances + * within defined timeout. + * + * @param timeout for accumulation + * @param timeUnit for timeout + * + * @return accumulated sum + */ + RFuture sumAsync(long timeout, TimeUnit timeUnit); + /** - * Resets value across all RLongAdder instances + * Resets value across all RDoubleAdder instances * * @return void */ RFuture resetAsync(); + + /** + * Resets value across all RDoubleAdder instances + * within defined timeout. + * + * @param timeout for reset + * @param timeUnit for timeout + * + * @return void + */ + RFuture resetAsync(long timeout, TimeUnit timeUnit); + } diff --git a/redisson/src/main/java/org/redisson/api/RLongAdder.java b/redisson/src/main/java/org/redisson/api/RLongAdder.java index 4cda3e185..ee6f8aa37 100644 --- a/redisson/src/main/java/org/redisson/api/RLongAdder.java +++ b/redisson/src/main/java/org/redisson/api/RLongAdder.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + /** * Distributed implementation of {@link java.util.concurrent.atomic.LongAdder} *

@@ -61,10 +63,33 @@ public interface RLongAdder extends RExpirable, RDestroyable { */ RFuture sumAsync(); + /** + * Accumulates sum across all RLongAdder instances + * within defined timeout. + * + * @param timeout for accumulation + * @param timeUnit for timeout + * + * @return accumulated sum + */ + RFuture sumAsync(long timeout, TimeUnit timeUnit); + /** * Resets value across all RLongAdder instances * * @return void */ RFuture resetAsync(); + + /** + * Resets value across all RLongAdder instances + * within defined timeout. + * + * @param timeout for reset + * @param timeUnit for timeout + * + * @return void + */ + RFuture resetAsync(long timeout, TimeUnit timeUnit); + } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 9dc91c96b..73d0c3547 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -294,7 +294,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { return allAsync(true, command, callback, params); } - private RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object... params) { + private RFuture allAsync(boolean readOnlyMode, final RedisCommand command, final SlotCallback callback, Object... params) { final RPromise mainPromise = new RedissonPromise(); final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); @@ -788,7 +788,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void checkAttemptFuture(final NodeSource source, final AsyncDetails details, - Future future, boolean ignoreRedirect) { + Future future, final boolean ignoreRedirect) { details.getTimeout().cancel(); if (future.isCancelled()) { return; diff --git a/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java b/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java index 5ae73e626..49a9d0282 100644 --- a/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java +++ b/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java @@ -16,6 +16,7 @@ package org.redisson.spring.cache; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -164,15 +165,16 @@ public class RedissonCache implements Cache { if (value == null) { try { value = toStoreValue(valueLoader.call()); - } catch (Exception ex) { + } catch (Throwable ex) { + RuntimeException exception; try { Class c = Class.forName("org.springframework.cache.Cache$ValueRetrievalException"); Constructor constructor = c.getConstructor(Object.class, Callable.class, Throwable.class); - RuntimeException exception = (RuntimeException) constructor.newInstance(key, valueLoader, ex.getCause()); - throw exception; + exception = (RuntimeException) constructor.newInstance(key, valueLoader, ex); } catch (Exception e) { throw new IllegalStateException(e); } + throw exception; } put(key, value); }