diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 281bc4be1..e89143f7b 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RFuture; import org.redisson.api.RSemaphore; +import org.redisson.client.codec.IntegerCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -441,26 +442,32 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { Arrays.asList(getName(), getChannelName()), permits); } - @Override public int drainPermits() { - RFuture future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, + return get(drainPermitsAsync()); + } + + @Override + public RFuture drainPermitsAsync() { + return commandExecutor.evalWriteAsync(getName(), IntegerCodec.INSTANCE, RedisCommands.EVAL_LONG, "local value = redis.call('get', KEYS[1]); " + "if (value == false or value == 0) then " + "return 0; " + "end; " + "redis.call('set', KEYS[1], 0); " + "return value;", - Collections.singletonList(getName())); - Long res = get(future); - return res.intValue(); + Collections.singletonList(getName())); } @Override public int availablePermits() { - RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); - Long res = get(future); - return res.intValue(); + RFuture future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, getName()); + return get(future); + } + + @Override + public RFuture availablePermitsAsync() { + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, getName()); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java b/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java index ab045cf49..53f123df9 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java @@ -123,4 +123,18 @@ public interface RSemaphoreAsync extends RExpirableAsync { */ RFuture addPermitsAsync(int permits); + /** + * Returns amount of available permits. + * + * @return number of permits + */ + RFuture availablePermitsAsync(); + + /** + * Acquires and returns all permits that are immediately available. + * + * @return number of permits + */ + RFuture drainPermitsAsync(); + } diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java index b559ad427..5111b142e 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java @@ -15,10 +15,10 @@ */ package org.redisson.api; -import java.util.concurrent.TimeUnit; - import reactor.core.publisher.Mono; +import java.util.concurrent.TimeUnit; + /** * Reactive interface of Redis based {@link java.util.concurrent.Semaphore}. *

@@ -125,5 +125,18 @@ public interface RSemaphoreReactive extends RExpirableReactive { */ Mono addPermits(int permits); - + /** + * Returns amount of available permits. + * + * @return number of permits + */ + Mono availablePermits(); + + /** + * Acquires and returns all permits that are immediately available. + * + * @return number of permits + */ + Mono drainPermits(); + } diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java b/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java index c921bac7d..7f60e4820 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java @@ -126,4 +126,18 @@ public interface RSemaphoreRx extends RExpirableRx { */ Completable addPermits(int permits); + /** + * Returns amount of available permits. + * + * @return number of permits + */ + Single availablePermits(); + + /** + * Acquires and returns all permits that are immediately available. + * + * @return number of permits + */ + Single drainPermits(); + }