diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java index 17654cec2..8d3482ce9 100644 --- a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphore.java @@ -42,7 +42,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS /** * Acquires defined amount of permits. - * Waits if necessary until all permits became available. + * Waits if necessary until enough permits became available. * * @param permits the number of permits to acquire * @return permits ids @@ -64,7 +64,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS /** * Acquires defined amount of permits with defined leaseTime and returns ids. - * Waits if necessary until all permits became available. + * Waits if necessary until enough permits became available. * * @param permits the number of permits to acquire * @param leaseTime permit lease time @@ -87,7 +87,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS * Tries to acquire defined amount of currently available permits and returns ids. * * @param permits the number of permits to acquire - * @return permits ids if permits were acquired and empty list + * @return permits ids if permits were acquired and empty collection * otherwise * @throws IllegalArgumentException if permits is negative */ @@ -122,13 +122,13 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS /** * Tries to acquire defined amount of currently available permits * with defined leaseTime and return ids. - * Waits up to defined waitTime if necessary until permits became available. + * Waits up to defined waitTime if necessary until enough permits became available. * * @param permits the number of permits to acquire * @param waitTime the maximum time to wait * @param leaseTime permit lease time, use -1 to make it permanent * @param unit the time unit - * @return permits ids if permits were acquired and empty list + * @return permits ids if permits were acquired and empty collection * if the waiting time elapsed before permits were acquired * @throws InterruptedException if the current thread is interrupted * @throws IllegalArgumentException if permits is negative @@ -164,7 +164,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS void release(String permitId); /** - * Releases a permits by their ids. Increases the number of available permits. + * Releases permits by their ids. Increases the number of available permits. * * @param permitsIds - permits ids * @throws IllegalArgumentException if permitsIds is null or empty diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java index dfa9491f9..94b2a6cce 100644 --- a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreReactive.java @@ -15,8 +15,10 @@ */ package org.redisson.api; +import java.util.List; import java.util.concurrent.TimeUnit; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -39,6 +41,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { * @return permit id */ Mono acquire(); + + /** + * Acquires defined amount of permits. + * Waits if necessary until all permits became available. + * + * @param permits the number of permits to acquire + * @return permits ids + */ + Flux acquire(int permits); /** * Acquires a permit with defined leaseTime and return its id. @@ -49,6 +60,17 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { * @return permit id */ Mono acquire(long leaseTime, TimeUnit unit); + + /** + * Acquires defined amount of permits with defined leaseTime and returns ids. + * Waits if necessary until all permits became available. + * + * @param permits the number of permits to acquire + * @param leaseTime permits lease time + * @param unit time unit + * @return permits ids + */ + Flux acquire(int permits, long leaseTime, TimeUnit unit); /** * Tries to acquire currently available permit and return its id. @@ -58,6 +80,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { */ Mono tryAcquire(); + /** + * Tries to acquire defined amount of currently available permits and returns ids. + * + * @param permits the number of permits to acquire + * @return permits ids if permits were acquired and empty collection + * otherwise + */ + Flux tryAcquire(int permits); + /** * Tries to acquire currently available permit and return its id. * Waits up to defined waitTime if necessary until a permit became available. @@ -81,6 +112,20 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { * if the waiting time elapsed before a permit was acquired */ Mono tryAcquire(long waitTime, long leaseTime, TimeUnit unit); + + /** + * Tries to acquire defined amount of currently available permits + * with defined leaseTime and return their ids. + * Waits up to defined waitTime if necessary until enough permits became available. + * + * @param permits the number of permits to acquire + * @param waitTime the maximum time to wait + * @param leaseTime permits lease time, use -1 to make them permanent + * @param unit the time unit + * @return permits ids if permits were acquired and empty collection + * if the waiting time elapsed before permits were acquired + */ + Flux tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit); /** * Tries to release permit by its id. @@ -91,6 +136,14 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { */ Mono tryRelease(String permitId); + /** + * Tries to release permits by their ids. + * + * @param permitsIds permits ids + * @return amount of released permits + */ + Mono tryRelease(List permitsIds); + /** * Releases a permit by its id. Increases the number of available permits. * Throws an exception if permit id doesn't exist or has already been released. @@ -99,6 +152,14 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive { * @return void */ Mono release(String permitId); + + /** + * Releases a permits by their ids. Increases the number of available permits. + * + * @param permitsIds - permits ids + * @return amount of released permits + */ + Mono release(List permitsIds); /** * Returns amount of available permits. diff --git a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreRx.java b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreRx.java index 19d32951e..5821342bd 100644 --- a/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreRx.java +++ b/redisson/src/main/java/org/redisson/api/RPermitExpirableSemaphoreRx.java @@ -15,9 +15,11 @@ */ package org.redisson.api; +import java.util.List; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; @@ -54,7 +56,29 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx { * @return permit id */ Single acquire(); - + + /** + * Acquires defined amount of permits from this semaphore, blocking until enough permits are + * available, or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires permits permits, if they are available and returns their ids, + * reducing the number of available permits by permits. + * + *

If not enough permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes the {@link #release(String)} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + *
+ * + * @param permits - the number of permits to acquire + * @return permits ids + */ + Flowable acquire(int permits); + /** * Acquires a permit with defined lease time from this semaphore, * blocking until one is available, @@ -78,7 +102,32 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx { * @return permit id */ Single acquire(long leaseTime, TimeUnit unit); - + + /** + * Acquires defined amount of permits with defined lease time from this semaphore, + * blocking until enough permits are available, + * or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires permits permits, if they are available and returns their ids, + * reducing the number of available permits by permits. + * + *

If not enough permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + *
+ * + * @param permits - the number of permits to acquire + * @param leaseTime - permit lease time + * @param unit - time unit + * @return permits ids + */ + Flowable acquire(int permits, long leaseTime, TimeUnit unit); + /** * Acquires a permit only if one is available at the * time of invocation. @@ -95,6 +144,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx { */ Maybe tryAcquire(); + /** + * Acquires defined amount of permits only if they are available at the + * time of invocation. + * + *

Acquires permits permits, if they are available and returns immediately, + * with the permits ids, + * reducing the number of available permits by permits. + * + *

If not enough permits are available then this method will return + * immediately with empty collection. + * + * @param permits - the number of permits to acquire + * @return permits ids if permit were acquired and empty collection + * otherwise + */ + Flowable tryAcquire(int permits); + /** * Acquires a permit from this semaphore, if one becomes available * within the given waiting time and the current thread has not @@ -163,6 +229,42 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx { */ Maybe tryAcquire(long waitTime, long leaseTime, TimeUnit unit); + /** + * Acquires defined amount of permits with defined lease time from this semaphore, + * if enough permits become available + * within the given waiting time and the current thread has not + * been {@linkplain Thread#interrupt interrupted}. + * + *

Acquires permits permits, if they are available and returns immediately, + * with the permits ids, + * reducing the number of available permits by permits. + * + *

If not enough permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of three things happens: + *

    + *
  • Some other thread invokes the {@link #release(String)} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + *
  • The specified waiting time elapses. + *
+ * + *

If permit are acquired then permits ids are returned. + * + *

If the specified waiting time elapses then the empty collection + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits the number of permits to acquire + * @param waitTime the maximum time to wait for permits + * @param leaseTime permits lease time + * @param unit the time unit of the {@code timeout} argument + * @return permits ids if permit were acquired and empty collection + * if the waiting time elapsed before permits were acquired + */ + Flowable tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit); + /** * Releases a permit by its id, returning it to the semaphore. * @@ -181,6 +283,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx { */ Single tryRelease(String permitId); + /** + * Releases permits by their ids, returning them to the semaphore. + * + *

Releases permits permits, increasing the number of available permits + * by released amount. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given one of the permits that were just released. + * + *

There is no requirement that a thread that releases permits must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permitsIds - permits ids + * @return amount of released permits + */ + Single tryRelease(List permitsIds); + /** * Releases a permit by its id, returning it to the semaphore. * @@ -200,6 +319,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx { */ Completable release(String permitId); + /** + * Releases permits by their ids, returning them to the semaphore. + * + *

Releases permits permits, increasing the number of available permits + * by released amount. If any threads of Redisson client are trying to acquire a permit, + * then one is selected and given the permit that were just released. + * + *

There is no requirement that a thread that releases permits must + * have acquired that permit by calling {@link #acquire()}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permitsIds - permits ids + * @return amount of released permits + */ + Single release(List permitsIds); + /** * Returns the current number of available permits. *