Review fixes:

- add new methods to RPermitExpirableSemaphoreReactive
- add new methods to RPermitExpirableSemaphoreRx

Signed-off-by: Sergey Kuznetsov <sergey.kuznetsov@infobip.com>
pull/5247/head
Sergey Kuznetsov 1 year ago
parent d513705bc6
commit e4f40c0534

@ -42,7 +42,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
/**
* Acquires defined amount of <code>permits</code>.
* Waits if necessary until all permits became available.
* Waits if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @return permits ids
@ -64,7 +64,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
/**
* Acquires defined amount of <code>permits</code> with defined <code>leaseTime</code> and returns ids.
* Waits if necessary until all permits became available.
* Waits if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permit lease time
@ -87,7 +87,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
* Tries to acquire defined amount of currently available <code>permits</code> and returns ids.
*
* @param permits the number of permits to acquire
* @return permits ids if permits were acquired and empty list
* @return permits ids if permits were acquired and empty collection
* otherwise
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
@ -122,13 +122,13 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and return ids.
* Waits up to defined <code>waitTime</code> if necessary until permits became available.
* Waits up to defined <code>waitTime</code> if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permit lease time, use -1 to make it permanent
* @param unit the time unit
* @return permits ids if permits were acquired and empty list
* @return permits ids if permits were acquired and empty collection
* if the waiting time elapsed before permits were acquired
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
@ -164,7 +164,7 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
void release(String permitId);
/**
* Releases a permits by their ids. Increases the number of available permits.
* Releases permits by their ids. Increases the number of available permits.
*
* @param permitsIds - permits ids
* @throws IllegalArgumentException if <code>permitsIds</code> is null or empty

@ -15,8 +15,10 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@ -39,6 +41,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* @return permit id
*/
Mono<String> acquire();
/**
* Acquires defined amount of <code>permits</code>.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @return permits ids
*/
Flux<String> acquire(int permits);
/**
* Acquires a permit with defined <code>leaseTime</code> and return its id.
@ -49,6 +60,17 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* @return permit id
*/
Mono<String> acquire(long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> with defined <code>leaseTime</code> and returns ids.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permits lease time
* @param unit time unit
* @return permits ids
*/
Flux<String> acquire(int permits, long leaseTime, TimeUnit unit);
/**
* Tries to acquire currently available permit and return its id.
@ -58,6 +80,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
*/
Mono<String> tryAcquire();
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns ids.
*
* @param permits the number of permits to acquire
* @return permits ids if permits were acquired and empty collection
* otherwise
*/
Flux<String> tryAcquire(int permits);
/**
* Tries to acquire currently available permit and return its id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
@ -81,6 +112,20 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* if the waiting time elapsed before a permit was acquired
*/
Mono<String> tryAcquire(long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and return their ids.
* Waits up to defined <code>waitTime</code> if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permits lease time, use -1 to make them permanent
* @param unit the time unit
* @return permits ids if permits were acquired and empty collection
* if the waiting time elapsed before permits were acquired
*/
Flux<String> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to release permit by its id.
@ -91,6 +136,14 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
*/
Mono<Boolean> tryRelease(String permitId);
/**
* Tries to release permits by their ids.
*
* @param permitsIds permits ids
* @return amount of released permits
*/
Mono<Integer> tryRelease(List<String> permitsIds);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
@ -99,6 +152,14 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* @return void
*/
Mono<Void> release(String permitId);
/**
* Releases a permits by their ids. Increases the number of available permits.
*
* @param permitsIds - permits ids
* @return amount of released permits
*/
Mono<Integer> release(List<String> permitsIds);
/**
* Returns amount of available permits.

@ -15,9 +15,11 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
@ -54,7 +56,29 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
* @return permit id
*/
Single<String> acquire();
/**
* Acquires defined amount of <code>permits</code> from this semaphore, blocking until enough permits are
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns their ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param permits - the number of permits to acquire
* @return permits ids
*/
Flowable<String> acquire(int permits);
/**
* Acquires a permit with defined lease time from this semaphore,
* blocking until one is available,
@ -78,7 +102,32 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
* @return permit id
*/
Single<String> acquire(long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> with defined lease time from this semaphore,
* blocking until enough permits are available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns their ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param permits - the number of permits to acquire
* @param leaseTime - permit lease time
* @param unit - time unit
* @return permits ids
*/
Flowable<String> acquire(int permits, long leaseTime, TimeUnit unit);
/**
* Acquires a permit only if one is available at the
* time of invocation.
@ -95,6 +144,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Maybe<String> tryAcquire();
/**
* Acquires defined amount of <code>permits</code> only if they are available at the
* time of invocation.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns immediately,
* with the permits ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then this method will return
* immediately with empty collection.
*
* @param permits - the number of permits to acquire
* @return permits ids if permit were acquired and empty collection
* otherwise
*/
Flowable<String> tryAcquire(int permits);
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
@ -163,6 +229,42 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Maybe<String> tryAcquire(long waitTime, long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> with defined lease time from this semaphore,
* if enough permits become available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns immediately,
* with the permits ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If permit are acquired then permits ids are returned.
*
* <p>If the specified waiting time elapses then the empty collection
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait for permits
* @param leaseTime permits lease time
* @param unit the time unit of the {@code timeout} argument
* @return permits ids if permit were acquired and empty collection
* if the waiting time elapsed before permits were acquired
*/
Flowable<String> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit);
/**
* Releases a permit by its id, returning it to the semaphore.
*
@ -181,6 +283,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Single<Boolean> tryRelease(String permitId);
/**
* Releases permits by their ids, returning them to the semaphore.
*
* <p>Releases <code>permits</code> permits, increasing the number of available permits
* by released amount. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given one of the permits that were just released.
*
* <p>There is no requirement that a thread that releases permits must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permitsIds - permits ids
* @return amount of released permits
*/
Single<Integer> tryRelease(List<String> permitsIds);
/**
* Releases a permit by its id, returning it to the semaphore.
*
@ -200,6 +319,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Completable release(String permitId);
/**
* Releases permits by their ids, returning them to the semaphore.
*
* <p>Releases <code>permits</code> permits, increasing the number of available permits
* by released amount. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that were just released.
*
* <p>There is no requirement that a thread that releases permits must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permitsIds - permits ids
* @return amount of released permits
*/
Single<Integer> release(List<String> permitsIds);
/**
* Returns the current number of available permits.
*

Loading…
Cancel
Save