Added methods to acquire/release multiple permits in RedissonPermitExpirableSemaphore.

Added tests.

Signed-off-by: Sergey Kuznetsov <iksss.88@gmail.com>
Signed-off-by: Sergey Kuznetsov <sergey.kuznetsov@infobip.com>
pull/5247/head
Sergey Kuznetsov 2 years ago
parent 446c584e57
commit e2b28291de

@ -59,6 +59,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return acquire(1, -1, TimeUnit.MILLISECONDS);
}
@Override
public String acquire(int permits) throws InterruptedException {
return acquire(permits, -1, TimeUnit.MILLISECONDS);
}
@Override
public String acquire(long leaseTime, TimeUnit timeUnit) throws InterruptedException {
return acquire(1, leaseTime, timeUnit);
@ -69,7 +74,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return acquireAsync(1, leaseTime, timeUnit);
}
private String acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException {
@Override
public String acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException {
String permitId = tryAcquire(permits, ttl, timeUnit);
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
@ -86,7 +92,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (!permitId.startsWith(":")) {
return permitId;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
nearestTimeout = Long.parseLong(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
@ -108,7 +114,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return acquireAsync(1, -1, TimeUnit.MILLISECONDS);
}
private RFuture<String> acquireAsync(int permits, long ttl, TimeUnit timeUnit) {
@Override
public RFuture<String> acquireAsync(int permits) {
return acquireAsync(permits, -1, TimeUnit.MILLISECONDS);
}
@Override
public RFuture<String> acquireAsync(int permits, long ttl, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
CompletionStage<String> f = tryAcquireFuture.thenCompose(permitId -> {
@ -165,7 +177,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
return;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
nearestTimeout = Long.parseLong(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
@ -259,7 +271,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
unsubscribe(entry);
return CompletableFuture.completedFuture(permitId);
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
nearestTimeout = Long.parseLong(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
@ -295,16 +307,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public String tryAcquire() {
String res = tryAcquire(1, -1, TimeUnit.MILLISECONDS);
if (res != null && res.startsWith(":")) {
return null;
return tryAcquire(1, -1, TimeUnit.MILLISECONDS);
}
return res;
@Override
public String tryAcquire(int permits) {
return tryAcquire(permits, -1, TimeUnit.MILLISECONDS);
}
private String tryAcquire(int permits, long ttl, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit);
return get(tryAcquireAsync(permits, timeoutDate));
String res = get(tryAcquireAsync(permits, timeoutDate));
if (res != null && res.startsWith(":")) {
return null;
}
return res;
}
private long calcTimeout(long ttl, TimeUnit timeUnit) {
@ -316,7 +333,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<String> tryAcquireAsync() {
CompletableFuture<String> future = tryAcquireAsync(1, nonExpirableTimeout).toCompletableFuture();
return tryAcquireAsync(1);
}
@Override
public RFuture<String> tryAcquireAsync(int permits) {
CompletableFuture<String> future = tryAcquireAsync(permits, nonExpirableTimeout).toCompletableFuture();
CompletableFuture<String> f = future.thenApply(permitId -> {
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
@ -381,6 +403,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return tryAcquireAsync(1, waitTime, -1, unit);
}
@Override
public RFuture<String> tryAcquireAsync(int permits, long waitTime, TimeUnit unit) {
return tryAcquireAsync(permits, waitTime, -1, unit);
}
@Override
public String tryAcquire(long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
return tryAcquire(1, waitTime, ttl, unit);
@ -391,7 +418,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return tryAcquireAsync(1, waitTime, ttl, unit);
}
private String tryAcquire(int permits, long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
@Override
public String tryAcquire(int permits, long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
@ -428,7 +456,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (!permitId.startsWith(":")) {
return permitId;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
nearestTimeout = Long.parseLong(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
nearestTimeout = null;
@ -460,7 +488,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// return get(tryAcquireAsync(permits, waitTime, ttl, unit));
}
private RFuture<String> tryAcquireAsync(int permits, long waitTime, long ttl, TimeUnit timeUnit) {
@Override
public RFuture<String> tryAcquireAsync(int permits, long waitTime, long ttl, TimeUnit timeUnit) {
CompletableFuture<String> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
long curr = System.currentTimeMillis();
@ -544,16 +573,34 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
get(releaseAsync(permitId));
}
@Override
public void release(String permitId, int permits) {
get(releaseAsync(permitId, permits));
}
@Override
public boolean tryRelease(String permitId) {
return get(tryReleaseAsync(permitId));
return get(tryReleaseAsync(permitId, 1));
}
@Override
public boolean tryRelease(String permitId, int permits) {
return get(tryReleaseAsync(permitId, permits));
}
@Override
public RFuture<Boolean> tryReleaseAsync(String permitId) {
return tryReleaseAsync(permitId, 1);
}
@Override
public RFuture<Boolean> tryReleaseAsync(String permitId, int permits) {
if (permitId == null) {
throw new IllegalArgumentException("permitId can't be null");
}
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
byte[] id = ByteBufUtil.decodeHexDump(permitId);
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -569,12 +616,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end;" +
"return 1;",
Arrays.asList(getRawName(), channelName, timeoutName),
id, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
id, permits, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), timeoutName);
List<Object> keys = Arrays.asList(getRawName(), timeoutName);
return super.sizeInMemoryAsync(keys);
}
@ -600,7 +647,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Void> releaseAsync(String permitId) {
CompletionStage<Void> f = tryReleaseAsync(permitId).handle((res, e) -> {
return releaseAsync(permitId, 1);
}
@Override
public RFuture<Void> releaseAsync(String permitId, int permits) {
CompletionStage<Void> f = tryReleaseAsync(permitId, permits).handle((res, e) -> {
if (e != null) {
throw new CompletionException(e);
}
@ -642,7 +694,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end; " +
"local ret = redis.call('get', KEYS[1]); " +
"return ret == false and 0 or ret;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
Arrays.asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@ -666,7 +718,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return tonumber(available) " +
"end;" +
"return tonumber(available) + acquired;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
Arrays.asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@ -683,7 +735,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end; " +
"local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " +
"return acquired == false and 0 or acquired;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
Arrays.asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@ -713,7 +765,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end;" +
"redis.call('incrby', KEYS[1], tonumber(ARGV[1]) - maximum); " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]);",
Arrays.<Object>asList(getRawName(), channelName, timeoutName),
Arrays.asList(getRawName(), channelName, timeoutName),
permits, getSubscribeService().getPublishCommand());
}
@ -727,7 +779,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getRawName(), channelName),
Arrays.asList(getRawName(), channelName),
permits, getSubscribeService().getPublishCommand());
}

@ -39,6 +39,17 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String acquire() throws InterruptedException;
/**
* Acquires defined amount of <code>permits</code>.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @return permits id
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
String acquire(int permits) throws InterruptedException;
/**
* Acquires a permit with defined <code>leaseTime</code> and return its id.
* Waits if necessary until a permit became available.
@ -50,6 +61,19 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Acquires defined amount of <code>permits</code> with defined <code>leaseTime</code> and returns id.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permit lease time
* @param unit time unit
* @return permits id
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
String acquire(int permits, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire currently available permit and return its id.
*
@ -58,6 +82,16 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String tryAcquire();
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns id.
*
* @param permits the number of permits to acquire
* @return permits id if permits were acquired and {@code null}
* otherwise
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
String tryAcquire(int permits);
/**
* Tries to acquire currently available permit and return its id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
@ -84,23 +118,64 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and return id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permit lease time, use -1 to make it permanent
* @param unit the time unit
* @return permit id if a permit was acquired and <code>null</code>
* if the waiting time elapsed before a permit was acquired
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
String tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to release permit by its id.
*
* @param permitId permit id
* @return <code>true</code> if a permit has been released and <code>false</code>
* otherwise
* @throws IllegalArgumentException if <code>permitId</code> is null
*/
boolean tryRelease(String permitId);
/**
* Tries to release defined amount of <code>permits</code> permits by id.
*
* @param permitId permit id
* @param permits the number of permits to release
* @return <code>true</code> if a permits has been released and <code>false</code>
* otherwise
* @throws IllegalArgumentException if <code>permitId</code> is null
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
boolean tryRelease(String permitId, int permits);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
*
* @param permitId - permit id
* @throws IllegalArgumentException if <code>permitId</code> is null
*/
void release(String permitId);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
*
* @param permitId - permit id
* @param permits the number of permits to release
* @throws IllegalArgumentException if <code>permitId</code> is null
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
void release(String permitId, int permits);
/**
* Returns number of available permits.
*

@ -38,6 +38,16 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> acquireAsync();
/**
* Acquires defined amount of <code>permits</code>.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @return permits id
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<String> acquireAsync(int permits);
/**
* Acquires a permit with defined <code>leaseTime</code> and return its id.
* Waits if necessary until a permit became available.
@ -48,6 +58,18 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> acquireAsync(long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> and return its id.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permit lease time
* @param unit time unit
* @return permits id
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<String> acquireAsync(int permits, long leaseTime, TimeUnit unit);
/**
* Tries to acquire currently available permit and return its id.
*
@ -56,6 +78,16 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> tryAcquireAsync();
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns id.
*
* @param permits the number of permits to acquire
* @return permits id if a permit was acquired and {@code null}
* otherwise
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<String> tryAcquireAsync(int permits);
/**
* Tries to acquire currently available permit and return its id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
@ -67,6 +99,19 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> tryAcquireAsync(long waitTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param unit the time unit
* @return permits id if a permit was acquired and {@code null}
* if the waiting time elapsed before a permit was acquired
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<String> tryAcquireAsync(int permits, long waitTime, TimeUnit unit);
/**
* Tries to acquire currently available permit
* with defined <code>leaseTime</code> and return its id.
@ -80,6 +125,21 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and returns id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permit lease time, use -1 to make it permanent
* @param unit the time unit
* @return permits id if a permit was acquired and <code>null</code>
* if the waiting time elapsed before a permit was acquired
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<String> tryAcquireAsync(int permits, long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to release permit by its id.
*
@ -89,6 +149,17 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<Boolean> tryReleaseAsync(String permitId);
/**
* Tries to release defined amount of <code>permits</code> by id.
*
* @param permitId permit id
* @param permits the number of permits to acquire
* @return <code>true</code> if a permit has been released and <code>false</code>
* otherwise
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<Boolean> tryReleaseAsync(String permitId, int permits);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
@ -98,6 +169,19 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<Void> releaseAsync(String permitId);
/**
* Releases defined amount of <code>permits</code> by id.
* Increases the number of available permits by <code>permits</code> .
* Throws an exception if permit id doesn't exist or has already been released,
* or amount of <code>permits</code> is negative.
*
* @param permitId - permit id
* @param permits the number of permits to release
* @return void
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<Void> releaseAsync(String permitId, int permits);
/**
* Returns number of available permits.
*

@ -12,6 +12,7 @@ import org.redisson.config.Config;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -325,6 +326,18 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testBlockingAcquireMany() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
String permitId = s.acquire(6);
assertThat(s.availablePermits()).isEqualTo(4);
s.release(permitId, 6);
Assertions.assertThrows(RedisException.class, () -> s.release(permitId, 4));
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testTryAcquire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
@ -358,6 +371,19 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testTryAcquireMany() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
String permitId = s.tryAcquire(4);
assertThat(permitId).hasSize(32);
String permitId2 = s.tryAcquire(5);
assertThat(permitId2).hasSize(32);
assertThat(s.availablePermits()).isEqualTo(1);
}
@Test
public void testReleaseWithoutPermits() {
Assertions.assertThrows(RedisException.class, () -> {
@ -440,4 +466,35 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
});
}
@Test
public void testAcquireAsyncMany() {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");
semaphore.trySetPermits(10);
RFuture<String> permitId = semaphore.acquireAsync(6);
Awaitility.await().atMost(Duration.ofMillis(100)).pollDelay(Duration.ofMillis(10)).untilAsserted(() -> {
assertThat(permitId.isDone()).isTrue();
});
assertThat(semaphore.availablePermits()).isEqualTo(4);
}
@Test
public void testReleaseAsyncMany() throws InterruptedException, ExecutionException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");
semaphore.trySetPermits(10);
String permitId = semaphore.acquire(6);
assertThat(semaphore.availablePermits()).isEqualTo(4);
RFuture<Boolean> releaseResult = semaphore.tryReleaseAsync(permitId, 4);
Awaitility.await().atMost(Duration.ofMillis(100)).pollDelay(Duration.ofMillis(10)).untilAsserted(() -> {
assertThat(releaseResult.isDone()).isTrue();
});
assertThat(releaseResult.get()).isTrue();
assertThat(semaphore.availablePermits()).isEqualTo(8);
}
}

Loading…
Cancel
Save