From 071276829b352d2a69726c67b7bde9f7a993607c Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 12 Apr 2024 09:43:10 +0300 Subject: [PATCH] Feature - RSemaphore.trySetPermits() method with ttl parameter added. --- .../java/org/redisson/RedissonSemaphore.java | 42 ++++++++++++++++--- .../java/org/redisson/api/RSemaphore.java | 12 +++++- .../org/redisson/api/RSemaphoreAsync.java | 11 ++++- .../org/redisson/api/RSemaphoreReactive.java | 11 ++++- .../java/org/redisson/api/RSemaphoreRx.java | 9 ++++ .../org/redisson/RedissonSemaphoreTest.java | 11 ++++- 6 files changed, 86 insertions(+), 10 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index 84c00b87f..5a57080f3 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -506,13 +506,13 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { @Override public RFuture trySetPermitsAsync(int permits) { RFuture future = commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "local value = redis.call('get', KEYS[1]); " + + "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " - + "redis.call('set', KEYS[1], ARGV[1]); " - + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " - + "return 1;" - + "end;" - + "return 0;", + + "redis.call('set', KEYS[1], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + + "return 1;" + + "end;" + + "return 0;", Arrays.asList(getRawName(), getChannelName()), permits, getSubscribeService().getPublishCommand()); @@ -528,6 +528,36 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { return future; } + @Override + public boolean trySetPermits(int permits, Duration timeToLive) { + return get(trySetPermitsAsync(permits, timeToLive)); + } + + @Override + public RFuture trySetPermitsAsync(int permits, Duration timeToLive) { + RFuture future = commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('get', KEYS[1]); " + + "if (value == false) then " + + "redis.call('set', KEYS[1], ARGV[1], 'px', ARGV[3]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + + "return 1;" + + "end;" + + "return 0;", + Arrays.asList(getRawName(), getChannelName()), + permits, getSubscribeService().getPublishCommand(), timeToLive.toMillis()); + + if (LOGGER.isDebugEnabled()) { + future.thenAccept(r -> { + if (r) { + LOGGER.debug("permits set, permits: {}, name: {}", permits, getName()); + } else { + LOGGER.debug("unable to set permits, permits: {}, name: {}", permits, getName()); + } + }); + } + return future; + } + @Override public void addPermits(int permits) { get(addPermitsAsync(permits)); diff --git a/redisson/src/main/java/org/redisson/api/RSemaphore.java b/redisson/src/main/java/org/redisson/api/RSemaphore.java index 253ea8535..110ce1c19 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphore.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphore.java @@ -142,12 +142,22 @@ public interface RSemaphore extends RExpirable, RSemaphoreAsync { /** * Tries to set number of permits. * - * @param permits - number of permits + * @param permits number of permits * @return true if permits has been set successfully, * otherwise false if permits were already set. */ boolean trySetPermits(int permits); + /** + * Tries to set number of permits with defined time to live. + * + * @param timeToLive time to live + * @param permits number of permits + * @return true if permits has been set successfully, + * otherwise false if permits were already set. + */ + boolean trySetPermits(int permits, Duration timeToLive); + /** * Increases or decreases the number of available permits by defined value. * diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java b/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java index a688962cb..f3916f969 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreAsync.java @@ -83,11 +83,20 @@ public interface RSemaphoreAsync extends RExpirableAsync { /** * Tries to set number of permits. * - * @param permits - number of permits + * @param permits number of permits * @return true if permits has been set successfully, otherwise false. */ RFuture trySetPermitsAsync(int permits); + /** + * Tries to set number of permits with defined time to live. + * + * @param timeToLive time to live + * @param permits number of permits + * @return true if permits has been set successfully, otherwise false. + */ + RFuture trySetPermitsAsync(int permits, Duration timeToLive); + /** * Use {@link #tryAcquireAsync(Duration)} instead * diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java index c7e0b1cee..f706f3339 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreReactive.java @@ -85,11 +85,20 @@ public interface RSemaphoreReactive extends RExpirableReactive { /** * Tries to set number of permits. * - * @param permits - number of permits + * @param permits number of permits * @return true if permits has been set successfully, otherwise false. */ Mono trySetPermits(int permits); + /** + * Tries to set number of permits with defined time to live. + * + * @param timeToLive time to live + * @param permits number of permits + * @return true if permits has been set successfully, otherwise false. + */ + Mono trySetPermits(int permits, Duration timeToLive); + /** * Use {@link #tryAcquire(Duration)} instead * diff --git a/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java b/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java index 598467845..4ffd94c1c 100644 --- a/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java +++ b/redisson/src/main/java/org/redisson/api/RSemaphoreRx.java @@ -91,6 +91,15 @@ public interface RSemaphoreRx extends RExpirableRx { */ Single trySetPermits(int permits); + /** + * Tries to set number of permits with defined time to live. + * + * @param timeToLive time to live + * @param permits number of permits + * @return true if permits has been set successfully, otherwise false. + */ + Single trySetPermits(int permits, Duration timeToLive); + /** * Use {@link #tryAcquire(Duration)} instead * diff --git a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java index 2b53a5747..b21aad185 100644 --- a/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSemaphoreTest.java @@ -58,12 +58,21 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest { } @Test - public void testTrySetPermits() { + public void testTrySetPermits() throws InterruptedException { RSemaphore s = redisson.getSemaphore("test"); assertThat(s.trySetPermits(10)).isTrue(); assertThat(s.availablePermits()).isEqualTo(10); assertThat(s.trySetPermits(15)).isFalse(); assertThat(s.availablePermits()).isEqualTo(10); + s.delete(); + + assertThat(s.isExists()).isFalse(); + assertThat(s.trySetPermits(1, Duration.ofSeconds(2))).isTrue(); + Thread.sleep(1000); + assertThat(s.availablePermits()).isEqualTo(1); + Thread.sleep(1000); + assertThat(s.availablePermits()).isZero(); + assertThat(s.isExists()).isFalse(); } @Test