diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index f2b36ccdb..dc3be595c 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -627,6 +627,16 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen public int availablePermits() { return get(availablePermitsAsync()); } + + @Override + public int totalPermits() { + return get(totalPermitsAsync()); + } + + @Override + public int claimedPermits() { + return get(claimedPermitsAsync()); + } @Override public RFuture availablePermitsAsync() { @@ -645,11 +655,75 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen Arrays.asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis()); } + @Override + public RFuture totalPermitsAsync() { + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " + + "if #expiredIds > 0 then " + + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + + "if tonumber(value) > 0 then " + + "redis.call('publish', KEYS[3], value); " + + "end;" + + "return value; " + + "end; " + + "local available = redis.call('get', KEYS[1]); " + + "local claimed = redis.call('zcount', KEYS[2], 0, ARGV[1]); " + + "return available == false and 0 or (available + (claimed == false and 0 or claimed));", + Arrays.asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis()); + } + + @Override + public RFuture claimedPermitsAsync() { + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, + "local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " + + "if #expiredIds > 0 then " + + "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + + "if tonumber(value) > 0 then " + + "redis.call('publish', KEYS[3], value); " + + "end;" + + "return value; " + + "end; " + + "local claimed = redis.call('zcount', KEYS[2], 0, ARGV[1]); " + + "return claimed == false and 0 or claimed;", + Arrays.asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis()); + } + @Override public boolean trySetPermits(int permits) { return get(trySetPermitsAsync(permits)); } + @Override + public boolean trySetMaximumPermits(int permits) { + return get(trySetMaximumPermitsAsync(permits)); + } + + @Override + public RFuture trySetMaximumPermitsAsync(int permits) { + return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local available = redis.call('get', KEYS[1]); " + + "if (available == false) then " + + "redis.call('set', KEYS[1], ARGV[1]); " + + "redis.call('publish', KEYS[2], ARGV[1]); " + + "return 1;" + + "end;" + "local claimed = redis.call('zcount', KEYS[2], 0, ARGV[1]); " + + "local maximum = (claimed == false and 0 or claimed) + tonumber(available); " + + "if (maximum == ARGV[1]) then " + + "return 0;" + + "end;" + "local delta = maximum - tonumber(ARGV[1]); " + + "if (delta == 0) then " + + "return 0;" + + "end;" + "redis.call('incrby', KEYS[1], tonumber(delta)); " + + "redis.call('publish', KEYS[2], permits); " + + "return 0;", + Arrays.asList(getRawName(), getChannelName()), permits); + } + @Override public RFuture trySetPermitsAsync(int permits) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -677,7 +751,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen + "end;" + "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); " + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call('publish', KEYS[2], tonumber(value) + tonumbers(ARGV[1])); " + "end;", Arrays.asList(getRawName(), getChannelName()), permits); } diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java index f8bf1acb5..221f6dba1 100644 --- a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java @@ -102,12 +102,26 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS void release(String permitId); /** - * Returns amount of available permits. + * Returns number of available permits. * - * @return number of permits + * @return number of available permits */ int availablePermits(); + /** + * Returns the maximum number of permits. + * + * @return maximum number of permits + */ + int maximumPermits(); + + /** + * Returns the number of claimed permits. + * + * @return number of permits claimed + */ + int claimedPermits(); + /** * Tries to set number of permits. * @@ -115,7 +129,15 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS * @return true if permits has been set successfully, otherwise false. */ boolean trySetPermits(int permits); - + + /** + * Tries to set the maximum number of permits. + * + * @param permits - number of permits to use as the maximum + * @return true if permits has been set successfully, otherwise false. + */ + boolean trySetMaximumPermits(int permits); + /** * Increases or decreases the number of available permits by defined value. * diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreAsync.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreAsync.java index 02a8da148..a67fbe770 100644 --- a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreAsync.java +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreAsync.java @@ -99,12 +99,26 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync { RFuture releaseAsync(String permitId); /** - * Returns amount of available permits. + * Returns number of available permits. * * @return number of permits */ RFuture availablePermitsAsync(); + /** + * Returns the maximum number of permits. + * + * @return maximum number of permits + */ + RFuture maximumPermitsAsync(); + + /** + * Returns the number of claimed permits. + * + * @return number of permits claimed + */ + RFuture claimedPermitsAsync(); + /** * Tries to set number of permits. * @@ -113,6 +127,14 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync { */ RFuture trySetPermitsAsync(int permits); + /** + * Tries to set the maximum number of permits. + * + * @param permits - number of permits to use as the maximum + * @return true if permits has been set successfully, otherwise false. + */ + RFuture trySetMaximumPermitsAsync(int permits); + /** * Increases or decreases the number of available permits by defined value. *