Merge pull request #5247 from ikss/multi_permit_redisson_expirable_semaphore

Feature: multiple permit RedissonPermitExpirableSemaphore
pull/5004/merge
Nikita Koksharov 2 years ago committed by GitHub
commit 4cf340488d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,8 +27,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.SemaphorePubSub;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -56,23 +55,25 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public String acquire() throws InterruptedException {
return acquire(1, -1, TimeUnit.MILLISECONDS);
return acquire(-1, TimeUnit.MILLISECONDS);
}
@Override
public List<String> acquire(int permits) throws InterruptedException {
return acquire(permits, -1, TimeUnit.MILLISECONDS);
}
@Override
public String acquire(long leaseTime, TimeUnit timeUnit) throws InterruptedException {
return acquire(1, leaseTime, timeUnit);
List<String> ids = acquire(1, leaseTime, timeUnit);
return getFirstOrNull(ids);
}
@Override
public RFuture<String> acquireAsync(long leaseTime, TimeUnit timeUnit) {
return acquireAsync(1, leaseTime, timeUnit);
}
private String acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException {
String permitId = tryAcquire(permits, ttl, timeUnit);
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
public List<String> acquire(int permits, long leaseTime, TimeUnit timeUnit) throws InterruptedException {
List<String> ids = tryAcquire(permits, leaseTime, timeUnit);
if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
return ids;
}
CompletableFuture<RedissonLockEntry> future = subscribe();
@ -81,15 +82,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
try {
while (true) {
Long nearestTimeout;
permitId = tryAcquire(permits, ttl, timeUnit);
if (permitId != null) {
if (!permitId.startsWith(":")) {
return permitId;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
ids = tryAcquire(permits, leaseTime, timeUnit);
if (ids.isEmpty()) {
nearestTimeout = null;
} else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else {
return ids;
}
if (nearestTimeout != null) {
@ -101,32 +100,47 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} finally {
unsubscribe(entry);
}
// return get(acquireAsync(permits, ttl, timeUnit));
// return get(acquireAsync(permits, leaseTime, timeUnit));
}
@Override
public RFuture<String> acquireAsync() {
return acquireAsync(1, -1, TimeUnit.MILLISECONDS);
CompletionStage<String> future = acquireAsync(1)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
@Override
public RFuture<List<String>> acquireAsync(int permits) {
return acquireAsync(permits, -1, TimeUnit.MILLISECONDS);
}
private RFuture<String> acquireAsync(int permits, long ttl, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
CompletionStage<String> f = tryAcquireFuture.thenCompose(permitId -> {
if (permitId != null && !permitId.startsWith(":")) {
return CompletableFuture.completedFuture(permitId);
@Override
public RFuture<String> acquireAsync(long leaseTime, TimeUnit timeUnit) {
CompletionStage<String> future = acquireAsync(1, leaseTime, timeUnit)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
@Override
public RFuture<List<String>> acquireAsync(int permits, long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(leaseTime, timeUnit);
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
CompletionStage<List<String>> f = tryAcquireFuture.thenCompose(ids -> {
if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
return CompletableFuture.completedFuture(ids);
}
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
semaphorePubSub.timeout(subscribeFuture);
return subscribeFuture.thenCompose(res -> {
return acquireAsync(permits, res, ttl, timeUnit);
});
return subscribeFuture.thenCompose(res -> acquireAsync(permits, res, leaseTime, timeUnit));
});
f.whenComplete((r, e) -> {
if (f.toCompletableFuture().isCancelled()) {
tryAcquireFuture.whenComplete((permitId, ex) -> {
if (permitId != null && !permitId.startsWith(":")) {
releaseAsync(permitId);
tryAcquireFuture.whenComplete((ids, ex) -> {
if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
releaseAsync(ids);
}
});
}
@ -134,7 +148,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return new CompletableFutureWrapper<>(f);
}
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, CompletableFuture<String> result, long ttl, TimeUnit timeUnit) {
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, CompletableFuture<List<String>> result, long leaseTime, TimeUnit timeUnit) {
if (result.isDone()) {
unsubscribe(entry);
return;
@ -146,10 +160,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return;
}
long timeoutDate = calcTimeout(ttl, timeUnit);
long timeoutDate = calcTimeout(leaseTime, timeUnit);
long curr = System.currentTimeMillis();
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.whenComplete((permitId, e) -> {
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.whenComplete((ids, e) -> {
if (e != null) {
unsubscribe(entry);
result.completeExceptionally(e);
@ -157,18 +171,16 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
Long nearestTimeout;
if (permitId != null) {
if (!permitId.startsWith(":")) {
unsubscribe(entry);
if (!result.complete(permitId)) {
releaseAsync(permitId);
}
return;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
if (ids.isEmpty()) {
nearestTimeout = null;
} else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else {
unsubscribe(entry);
if (!result.complete(ids)) {
releaseAsync(ids);
}
return;
}
long el = System.currentTimeMillis() - curr;
@ -183,7 +195,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// waiting for message
long current = System.currentTimeMillis();
if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
} else {
AtomicReference<Timeout> waitTimeoutFutureRef = new AtomicReference<>();
@ -199,7 +211,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}
}, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
@ -219,7 +231,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
};
entry.addListener(listener);
@ -235,7 +247,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}
}
}, t, TimeUnit.MILLISECONDS);
@ -245,35 +257,33 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
private CompletableFuture<String> acquireAsync(int permits, RedissonLockEntry entry, long ttl, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit);
CompletableFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate).toCompletableFuture();
private CompletableFuture<List<String>> acquireAsync(int permits, RedissonLockEntry entry, long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(leaseTime, timeUnit);
CompletableFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate).toCompletableFuture();
return tryAcquireFuture.whenComplete((p, e) -> {
if (e != null) {
unsubscribe(entry);
}
}).thenCompose(permitId -> {
}).thenCompose(ids -> {
Long nearestTimeout;
if (permitId != null) {
if (!permitId.startsWith(":")) {
unsubscribe(entry);
return CompletableFuture.completedFuture(permitId);
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
if (ids.isEmpty()) {
nearestTimeout = null;
} else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else {
unsubscribe(entry);
return CompletableFuture.completedFuture(ids);
}
if (entry.getLatch().tryAcquire(permits)) {
return acquireAsync(permits, entry, ttl, timeUnit);
return acquireAsync(permits, entry, leaseTime, timeUnit);
}
CompletableFuture<String> res = new CompletableFuture<>();
CompletableFuture<List<String>> res = new CompletableFuture<>();
Timeout scheduledFuture;
if (nearestTimeout != null) {
scheduledFuture = getServiceManager().newTimeout(timeout -> {
CompletableFuture<String> r = acquireAsync(permits, entry, ttl, timeUnit);
CompletableFuture<List<String>> r = acquireAsync(permits, entry, leaseTime, timeUnit);
commandExecutor.transfer(r, res);
}, nearestTimeout, TimeUnit.MILLISECONDS);
} else {
@ -285,7 +295,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
entry.getLatch().release();
return;
}
CompletableFuture<String> r = acquireAsync(permits, entry, ttl, timeUnit);
CompletableFuture<List<String>> r = acquireAsync(permits, entry, leaseTime, timeUnit);
commandExecutor.transfer(r, res);
};
entry.addListener(listener);
@ -295,114 +305,158 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public String tryAcquire() {
String res = tryAcquire(1, -1, TimeUnit.MILLISECONDS);
if (res != null && res.startsWith(":")) {
return null;
List<String> ids = tryAcquire(1);
return getFirstOrNull(ids);
}
@Override
public List<String> tryAcquire(int permits) {
List<String> ids = tryAcquire(permits, -1, TimeUnit.MILLISECONDS);
if (hasOnlyNearestTimeout(ids)) {
return Collections.emptyList();
}
return res;
return ids;
}
private String tryAcquire(int permits, long ttl, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit);
private List<String> tryAcquire(int permits, long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(leaseTime, timeUnit);
return get(tryAcquireAsync(permits, timeoutDate));
}
private long calcTimeout(long ttl, TimeUnit timeUnit) {
if (ttl != -1) {
return System.currentTimeMillis() + timeUnit.toMillis(ttl);
private long calcTimeout(long leaseTime, TimeUnit timeUnit) {
if (leaseTime != -1) {
return System.currentTimeMillis() + timeUnit.toMillis(leaseTime);
}
return nonExpirableTimeout;
}
@Override
public RFuture<String> tryAcquireAsync() {
CompletableFuture<String> future = tryAcquireAsync(1, nonExpirableTimeout).toCompletableFuture();
CompletableFuture<String> f = future.thenApply(permitId -> {
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
}
return null;
});
CompletionStage<String> future = tryAcquireAsync(1)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
future.whenComplete((permitId, e) -> {
if (f.isCancelled() && permitId != null && !permitId.startsWith(":")) {
releaseAsync(permitId);
@Override
public RFuture<List<String>> tryAcquireAsync(int permits) {
CompletableFuture<List<String>> future = tryAcquireAsync(permits, nonExpirableTimeout).toCompletableFuture()
.thenApply(ids -> {
if (hasOnlyNearestTimeout(ids)) {
return null;
}
return ids;
});
future.whenComplete((ids, e) -> {
if (future.isCancelled() && !ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
releaseAsync(ids);
}
});
return new CompletableFutureWrapper<>(f);
return new CompletableFutureWrapper<>(future);
}
private RFuture<String> tryAcquireAsync(int permits, long timeoutDate) {
private RFuture<List<String>> tryAcquireAsync(int permits, long timeoutDate) {
if (permits < 0) {
throw new IllegalArgumentException("Permits amount can't be negative");
}
List<String> ids = new ArrayList<>(permits);
for (int i = 0; i < permits; i++) {
ids.add(getServiceManager().generateId());
}
byte[] id = getServiceManager().generateIdArray();
return getServiceManager().execute(() -> {
RFuture<String> future = tryAcquireAsync(id, permits, timeoutDate);
return commandExecutor.handleNoSync(future, () -> releaseAsync(ByteBufUtil.hexDump(id)));
RFuture<List<String>> future = tryAcquireAsync(ids, timeoutDate);
return commandExecutor.handleNoSync(future, () -> releaseAsync(ids));
});
}
private RFuture<String> tryAcquireAsync(byte[] id, int permits, long timeoutDate) {
return commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_PERMIT_DATA,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " +
private RFuture<List<String>> tryAcquireAsync(List<String> ids, long timeoutDate) {
List<Object> params = new ArrayList<>();
params.add(ids.size());
params.add(timeoutDate);
params.add(System.currentTimeMillis());
params.add(nonExpirableTimeout);
params.add(getSubscribeService().getPublishCommand());
for (String permitId: ids) {
params.add(ByteBufUtil.decodeHexDump(permitId));
}
CompletionStage<List<String>> future = commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_STRING,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, ARGV[1]); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[6], KEYS[3], value); " +
"redis.call(ARGV[5], KEYS[3], value); " +
"end;" +
"end; " +
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"redis.call('decrby', KEYS[1], ARGV[1]); " +
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[3]); " +
"for i = 6, #ARGV, 1 do " +
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); " +
"end; " +
"local ttl = redis.call('pttl', KEYS[1]); " +
"if ttl > 0 then " +
"redis.call('pexpire', KEYS[2], ttl); " +
"end; " +
"return ARGV[3]; " +
"return 'OK'; " +
"end; " +
"local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " +
"if v[1] ~= nil and v[2] ~= ARGV[5] then " +
"if v[1] ~= nil and v[2] ~= ARGV[4] then " +
"return ':' .. tostring(v[2]); " +
"end " +
"return nil;",
Arrays.asList(getRawName(), timeoutName, channelName),
permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout, getSubscribeService().getPublishCommand());
params.toArray()
).thenApply(result -> {
if (result == null) {
return Collections.emptyList();
}
if (result.equals("OK")) {
return ids;
}
return Collections.singletonList(result);
});
return new CompletableFutureWrapper<>(future);
}
@Override
public RFuture<String> tryAcquireAsync(long waitTime, TimeUnit unit) {
return tryAcquireAsync(1, waitTime, -1, unit);
CompletionStage<String> future = tryAcquireAsync(1, waitTime, -1, unit)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
@Override
public String tryAcquire(long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
return tryAcquire(1, waitTime, ttl, unit);
public String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
List<String> ids = tryAcquire(1, waitTime, leaseTime, unit);
return getFirstOrNull(ids);
}
@Override
public RFuture<String> tryAcquireAsync(long waitTime, long ttl, TimeUnit unit) {
return tryAcquireAsync(1, waitTime, ttl, unit);
public RFuture<String> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit) {
CompletionStage<String> future = tryAcquireAsync(1, waitTime, leaseTime, unit)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
private String tryAcquire(int permits, long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
@Override
public List<String> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
String permitId = tryAcquire(permits, ttl, unit);
if (permitId != null && !permitId.startsWith(":")) {
return permitId;
List<String> ids = tryAcquire(permits, leaseTime, unit);
if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
return ids;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
return null;
return Collections.emptyList();
}
current = System.currentTimeMillis();
@ -411,32 +465,30 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
try {
entry = future.get(time, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
return null;
return Collections.emptyList();
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
return null;
return Collections.emptyList();
}
while (true) {
current = System.currentTimeMillis();
Long nearestTimeout;
permitId = tryAcquire(permits, ttl, unit);
if (permitId != null) {
if (!permitId.startsWith(":")) {
return permitId;
} else {
nearestTimeout = Long.valueOf(permitId.substring(1)) - System.currentTimeMillis();
}
} else {
ids = tryAcquire(permits, leaseTime, unit);
if (ids.isEmpty()) {
nearestTimeout = null;
} else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else {
return ids;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
return null;
return Collections.emptyList();
}
// waiting for message
@ -451,30 +503,30 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current;
time -= elapsed;
if (time <= 0) {
return null;
return Collections.emptyList();
}
}
} finally {
unsubscribe(entry);
}
// return get(tryAcquireAsync(permits, waitTime, ttl, unit));
// return get(tryAcquireAsync(permits, waitTime, leaseTime, unit));
}
private RFuture<String> tryAcquireAsync(int permits, long waitTime, long ttl, TimeUnit timeUnit) {
CompletableFuture<String> result = new CompletableFuture<>();
@Override
public RFuture<List<String>> tryAcquireAsync(int permits, long waitTime, long leaseTime, TimeUnit timeUnit) {
CompletableFuture<List<String>> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
long curr = System.currentTimeMillis();
long timeoutDate = calcTimeout(ttl, timeUnit);
RFuture<String> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.whenComplete((permitId, e) -> {
long timeoutDate = calcTimeout(leaseTime, timeUnit);
tryAcquireAsync(permits, timeoutDate).whenComplete((ids, e) -> {
if (e != null) {
result.completeExceptionally(e);
return;
}
if (permitId != null && !permitId.startsWith(":")) {
if (!result.complete(permitId)) {
releaseAsync(permitId);
if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
if (!result.complete(ids)) {
releaseAsync(ids);
}
return;
}
@ -503,7 +555,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, r, result, ttl, timeUnit);
tryAcquireAsync(time, permits, r, result, leaseTime, timeUnit);
});
if (!subscribeFuture.isDone()) {
@ -532,11 +584,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException {
String res = tryAcquire(1, waitTime, -1, unit);
if (res != null && res.startsWith(":")) {
List<String> ids = tryAcquire(1, waitTime, -1, unit);
if (ids.isEmpty() || hasOnlyNearestTimeout(ids)) {
return null;
}
return res;
return ids.get(0);
}
@Override
@ -544,11 +596,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
get(releaseAsync(permitId));
}
@Override
public void release(List<String> permitsIds) {
get(releaseAsync(permitsIds));
}
@Override
public boolean tryRelease(String permitId) {
return get(tryReleaseAsync(permitId));
}
@Override
public int tryRelease(List<String> permitsIds) {
return get(tryReleaseAsync(permitsIds));
}
@Override
public RFuture<Boolean> tryReleaseAsync(String permitId) {
if (permitId == null) {
@ -572,6 +634,45 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
id, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
public RFuture<Integer> tryReleaseAsync(List<String> permitsIds) {
if (permitsIds == null || permitsIds.isEmpty()) {
throw new IllegalArgumentException("permitIds can't be null or empty");
}
List<Object> params = new ArrayList<>(permitsIds.size() + 3);
params.add(permitsIds.size());
params.add(System.currentTimeMillis());
params.add(getSubscribeService().getPublishCommand());
for (String permitId : permitsIds) {
params.add(ByteBufUtil.decodeHexDump(permitId));
}
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredIds = redis.call('zrangebyscore', KEYS[3], 0, ARGV[2], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[3], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[3], KEYS[2], value); " +
"end; " +
"end; " +
"local keys = {}; " +
"for i = 4, #ARGV, 1 do " +
"table.insert(keys, ARGV[i]); " +
"end; " +
"local removed = redis.call('zrem', KEYS[3], unpack(keys)); " +
"if tonumber(removed) == 0 then " +
"return 0;" +
"end; " +
"redis.call('incrby', KEYS[1], removed); " +
"redis.call(ARGV[3], KEYS[2], removed); " +
"return removed;",
Arrays.asList(getRawName(), channelName, timeoutName),
params.toArray());
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
List<Object> keys = Arrays.<Object>asList(getRawName(), timeoutName);
@ -613,6 +714,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return new CompletableFutureWrapper<>(f);
}
@Override
public RFuture<Void> releaseAsync(List<String> permitsIds) {
CompletionStage<Void> f = tryReleaseAsync(permitsIds).handle((res, e) -> {
if (e != null) {
throw new CompletionException(e);
}
if (res == permitsIds.size()) {
return null;
}
throw new CompletionException(new IllegalArgumentException("Permits with ids " + permitsIds + " have already been released or don't exist"));
});
return new CompletableFutureWrapper<>(f);
}
@Override
public int availablePermits() {
return get(availablePermitsAsync());
@ -779,4 +895,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return get(updateLeaseTimeAsync(permitId, leaseTime, unit));
}
private static boolean hasOnlyNearestTimeout(List<String> ids) {
return ids.size() == 1 && ids.get(0).startsWith(":");
}
private static String getFirstOrNull(List<String> ids) {
if (ids.isEmpty()) {
return null;
}
return ids.get(0);
}
}

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@ -39,6 +40,17 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String acquire() throws InterruptedException;
/**
* Acquires defined amount of <code>permits</code>.
* Waits if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @return permits ids
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
List<String> acquire(int permits) throws InterruptedException;
/**
* Acquires a permit with defined <code>leaseTime</code> and return its id.
* Waits if necessary until a permit became available.
@ -50,6 +62,19 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Acquires defined amount of <code>permits</code> with defined <code>leaseTime</code> and returns ids.
* Waits if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permit lease time
* @param unit time unit
* @return permits ids
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
List<String> acquire(int permits, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire currently available permit and return its id.
*
@ -58,6 +83,16 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String tryAcquire();
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns ids.
*
* @param permits the number of permits to acquire
* @return permits ids if permits were acquired and empty collection
* otherwise
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
List<String> tryAcquire(int permits);
/**
* Tries to acquire currently available permit and return its id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
@ -84,23 +119,59 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
*/
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and return ids.
* Waits up to defined <code>waitTime</code> if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permit lease time, use -1 to make it permanent
* @param unit the time unit
* @return permits ids if permits were acquired and empty collection
* if the waiting time elapsed before permits were acquired
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
List<String> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to release permit by its id.
*
* @param permitId permit id
* @return <code>true</code> if a permit has been released and <code>false</code>
* otherwise
* @throws IllegalArgumentException if <code>permitId</code> is null
*/
boolean tryRelease(String permitId);
/**
* Tries to release permits by their ids.
*
* @param permitsIds - permits ids
* @return amount of released permits
* @throws IllegalArgumentException if <code>permitsIds</code> is null or empty
*/
int tryRelease(List<String> permitsIds);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
*
* @param permitId - permit id
* @throws IllegalArgumentException if <code>permitId</code> is null
*/
void release(String permitId);
/**
* Releases permits by their ids. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
*
* @param permitsIds - permits ids
* @throws IllegalArgumentException if <code>permitsIds</code> is null or empty
*/
void release(List<String> permitsIds);
/**
* Returns number of available permits.
*

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@ -38,6 +39,16 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> acquireAsync();
/**
* Acquires defined amount of <code>permits</code> and returns their ids.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @return permits ids
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<List<String>> acquireAsync(int permits);
/**
* Acquires a permit with defined <code>leaseTime</code> and return its id.
* Waits if necessary until a permit became available.
@ -48,6 +59,18 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> acquireAsync(long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> and return their ids.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permit lease time
* @param unit time unit
* @return permits ids
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<List<String>> acquireAsync(int permits, long leaseTime, TimeUnit unit);
/**
* Tries to acquire currently available permit and return its id.
*
@ -56,6 +79,16 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> tryAcquireAsync();
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns their ids.
*
* @param permits the number of permits to acquire
* @return permits ids if permits were acquired and empty list
* otherwise
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<List<String>> tryAcquireAsync(int permits);
/**
* Tries to acquire currently available permit and return its id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
@ -80,6 +113,21 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<String> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and returns their ids.
* Waits up to defined <code>waitTime</code> if necessary until permits became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permit lease time, use -1 to make it permanent
* @param unit the time unit
* @return permits ids if permits were acquired and empty list
* if the waiting time elapsed before permit were acquired
* @throws IllegalArgumentException if <code>permits</code> is negative
*/
RFuture<List<String>> tryAcquireAsync(int permits, long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to release permit by its id.
*
@ -89,6 +137,15 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<Boolean> tryReleaseAsync(String permitId);
/**
* Tries to release defined permits by their ids.
*
* @param permitsIds - permits ids
* @return amount of released permits
* @throws IllegalArgumentException if <code>permitsIds</code> is null or empty
*/
RFuture<Integer> tryReleaseAsync(List<String> permitsIds);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
@ -98,6 +155,16 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
*/
RFuture<Void> releaseAsync(String permitId);
/**
* Releases permits by their ids.
* Increases the number of available permits.
* Throws an exception if permits ids don't exist or have already been released.
*
* @param permitsIds - permit id
* @throws IllegalArgumentException if <code>permitsIds</code> is null or empty
*/
RFuture<Void> releaseAsync(List<String> permitsIds);
/**
* Returns number of available permits.
*

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Mono;
@ -39,6 +40,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* @return permit id
*/
Mono<String> acquire();
/**
* Acquires defined amount of <code>permits</code>.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @return permits ids
*/
Mono<List<String>> acquire(int permits);
/**
* Acquires a permit with defined <code>leaseTime</code> and return its id.
@ -49,6 +59,17 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* @return permit id
*/
Mono<String> acquire(long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> with defined <code>leaseTime</code> and returns ids.
* Waits if necessary until all permits became available.
*
* @param permits the number of permits to acquire
* @param leaseTime permits lease time
* @param unit time unit
* @return permits ids
*/
Mono<List<String>> acquire(int permits, long leaseTime, TimeUnit unit);
/**
* Tries to acquire currently available permit and return its id.
@ -58,6 +79,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
*/
Mono<String> tryAcquire();
/**
* Tries to acquire defined amount of currently available <code>permits</code> and returns ids.
*
* @param permits the number of permits to acquire
* @return permits ids if permits were acquired and empty collection
* otherwise
*/
Mono<List<String>> tryAcquire(int permits);
/**
* Tries to acquire currently available permit and return its id.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
@ -81,6 +111,20 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* if the waiting time elapsed before a permit was acquired
*/
Mono<String> tryAcquire(long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>
* with defined <code>leaseTime</code> and return their ids.
* Waits up to defined <code>waitTime</code> if necessary until enough permits became available.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait
* @param leaseTime permits lease time, use -1 to make them permanent
* @param unit the time unit
* @return permits ids if permits were acquired and empty collection
* if the waiting time elapsed before permits were acquired
*/
Mono<List<String>> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit);
/**
* Tries to release permit by its id.
@ -91,6 +135,14 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
*/
Mono<Boolean> tryRelease(String permitId);
/**
* Tries to release permits by their ids.
*
* @param permitsIds permits ids
* @return amount of released permits
*/
Mono<Integer> tryRelease(List<String> permitsIds);
/**
* Releases a permit by its id. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
@ -99,6 +151,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
* @return void
*/
Mono<Void> release(String permitId);
/**
* Releases a permits by their ids. Increases the number of available permits.
* Throws an exception if permit id doesn't exist or has already been released.
*
* @param permitsIds - permits ids
* @return void
*/
Mono<Void> release(List<String> permitsIds);
/**
* Returns amount of available permits.

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.util.List;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Completable;
@ -54,7 +55,29 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
* @return permit id
*/
Single<String> acquire();
/**
* Acquires defined amount of <code>permits</code> from this semaphore, blocking until enough permits are
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns their ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param permits - the number of permits to acquire
* @return permits ids
*/
Single<List<String>> acquire(int permits);
/**
* Acquires a permit with defined lease time from this semaphore,
* blocking until one is available,
@ -78,7 +101,32 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
* @return permit id
*/
Single<String> acquire(long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> with defined lease time from this semaphore,
* blocking until enough permits are available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns their ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* @param permits - the number of permits to acquire
* @param leaseTime - permit lease time
* @param unit - time unit
* @return permits ids
*/
Single<List<String>> acquire(int permits, long leaseTime, TimeUnit unit);
/**
* Acquires a permit only if one is available at the
* time of invocation.
@ -95,6 +143,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Maybe<String> tryAcquire();
/**
* Acquires defined amount of <code>permits</code> only if they are available at the
* time of invocation.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns immediately,
* with the permits ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then this method will return
* immediately with empty collection.
*
* @param permits - the number of permits to acquire
* @return permits ids if permit were acquired and empty collection
* otherwise
*/
Single<List<String>> tryAcquire(int permits);
/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
@ -163,6 +228,42 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Maybe<String> tryAcquire(long waitTime, long leaseTime, TimeUnit unit);
/**
* Acquires defined amount of <code>permits</code> with defined lease time from this semaphore,
* if enough permits become available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires <code>permits</code> permits, if they are available and returns immediately,
* with the permits ids,
* reducing the number of available permits by <code>permits</code>.
*
* <p>If not enough permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release(String)} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If permit are acquired then permits ids are returned.
*
* <p>If the specified waiting time elapses then the empty collection
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param permits the number of permits to acquire
* @param waitTime the maximum time to wait for permits
* @param leaseTime permits lease time
* @param unit the time unit of the {@code timeout} argument
* @return permits ids if permit were acquired and empty collection
* if the waiting time elapsed before permits were acquired
*/
Single<List<String>> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit);
/**
* Releases a permit by its id, returning it to the semaphore.
*
@ -181,6 +282,23 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Single<Boolean> tryRelease(String permitId);
/**
* Releases permits by their ids, returning them to the semaphore.
*
* <p>Releases <code>permits</code> permits, increasing the number of available permits
* by released amount. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given one of the permits that were just released.
*
* <p>There is no requirement that a thread that releases permits must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permitsIds - permits ids
* @return amount of released permits
*/
Single<Integer> tryRelease(List<String> permitsIds);
/**
* Releases a permit by its id, returning it to the semaphore.
*
@ -200,6 +318,25 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Completable release(String permitId);
/**
* Releases permits by their ids, returning them to the semaphore.
*
* <p>Releases <code>permits</code> permits, increasing the number of available permits
* by released amount. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that were just released.
*
* <p>There is no requirement that a thread that releases permits must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* <p>Throws an exception if permit id doesn't exist or has already been release
*
* @param permitsIds - permits ids
* @return void
*/
Completable release(List<String> permitsIds);
/**
* Returns the current number of available permits.
*

@ -424,8 +424,6 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor());
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand("EVAL",
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()));
RedisStrictCommand<String> EVAL_PERMIT_DATA = new RedisStrictCommand("EVAL",
new ObjectDecoder(new PermitDecoder()));
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());
RedisStrictCommand<Double> EVAL_DOUBLE = new RedisStrictCommand<Double>("EVAL", new DoubleNullSafeReplayConvertor());
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");

@ -1,44 +0,0 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.CharsetUtil;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
*/
public class PermitDecoder implements Decoder<String> {
@Override
public String decode(ByteBuf buf, State state) {
if (!buf.isReadable()) {
return null;
}
if (buf.isReadable(14)
&& !buf.isReadable(16)
&& buf.getByte(buf.readerIndex()) == (byte) ':') {
return buf.toString(CharsetUtil.UTF_8);
}
return ByteBufUtil.hexDump(buf);
}
}

@ -12,6 +12,8 @@ import org.redisson.config.Config;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -325,6 +327,17 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testBlockingAcquireMany() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
List<String> permitsIds = s.acquire(6);
assertThat(s.availablePermits()).isEqualTo(4);
s.release(permitsIds);
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testTryAcquire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
@ -358,6 +371,19 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(s.availablePermits()).isEqualTo(0);
}
@Test
public void testTryAcquireMany() {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
List<String> permitsIds = s.tryAcquire(4);
assertThat(permitsIds).hasSize(4);
List<String> permitsIds2 = s.tryAcquire(5);
assertThat(permitsIds2).hasSize(5);
assertThat(s.availablePermits()).isEqualTo(1);
}
@Test
public void testReleaseWithoutPermits() {
Assertions.assertThrows(RedisException.class, () -> {
@ -440,4 +466,101 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
});
}
@Test
public void testRelease() throws InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");
semaphore.trySetPermits(10);
List<String> permitsIds = semaphore.acquire(6);
assertThat(permitsIds).hasSize(6);
assertThat(semaphore.availablePermits()).isEqualTo(4);
List<String> permitsIdsFirstPart = permitsIds.subList(0, 4);
semaphore.release(permitsIdsFirstPart);
assertThat(semaphore.availablePermits()).isEqualTo(8);
List<String> permitsIdsSecondPart = permitsIds.subList(4, 6);
semaphore.release(permitsIdsSecondPart);
assertThat(semaphore.availablePermits()).isEqualTo(10);
Assertions.assertThrows(RedisException.class, () -> semaphore.release(permitsIds));
}
@Test
public void testAcquireAsyncMany() throws ExecutionException, InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");
semaphore.trySetPermits(10);
RFuture<List<String>> permitsIds = semaphore.acquireAsync(6);
Awaitility.await().atMost(Duration.ofMillis(100)).pollDelay(Duration.ofMillis(10)).untilAsserted(() -> {
assertThat(permitsIds.isDone()).isTrue();
});
assertThat(permitsIds.get()).hasSize(6);
assertThat(semaphore.availablePermits()).isEqualTo(4);
}
@Test
public void testReleaseAsyncMany() throws InterruptedException, ExecutionException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");
semaphore.trySetPermits(10);
List<String> permitsIds = semaphore.acquire(6);
assertThat(permitsIds).hasSize(6);
assertThat(semaphore.availablePermits()).isEqualTo(4);
List<String> permitsIdsFirstPart = permitsIds.subList(0, 4);
RFuture<Integer> releaseResult1 = semaphore.tryReleaseAsync(permitsIdsFirstPart);
Awaitility.await().atMost(Duration.ofMillis(100)).pollDelay(Duration.ofMillis(10)).untilAsserted(() -> {
assertThat(releaseResult1.isDone()).isTrue();
});
assertThat(releaseResult1.get()).isEqualTo(4);
assertThat(semaphore.availablePermits()).isEqualTo(8);
List<String> permitsIdsSecondPart = permitsIds.subList(4, 6);
RFuture<Integer> releaseResult2 = semaphore.tryReleaseAsync(permitsIdsSecondPart);
Awaitility.await().atMost(Duration.ofMillis(100)).pollDelay(Duration.ofMillis(10)).untilAsserted(() -> {
assertThat(releaseResult2.isDone()).isTrue();
});
assertThat(releaseResult2.get()).isEqualTo(2);
assertThat(semaphore.availablePermits()).isEqualTo(10);
}
@Test
public void testReleaseManyExpiredDoesNotThrow() throws InterruptedException, ExecutionException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");
semaphore.trySetPermits(10);
List<String> permitsIds = semaphore.acquire(6, 100, TimeUnit.MILLISECONDS);
assertThat(permitsIds).hasSize(6);
assertThat(semaphore.availablePermits()).isEqualTo(4);
Thread.sleep(250);
semaphore.acquire(100, TimeUnit.MILLISECONDS);
assertThat(semaphore.availablePermits()).isEqualTo(9);
Awaitility.await().atMost(Duration.ofMillis(250)).pollDelay(Duration.ofMillis(10)).untilAsserted(() -> {
assertThat(semaphore.availablePermits()).isEqualTo(10);
});
Assertions.assertDoesNotThrow(() -> semaphore.release(permitsIds));
}
@Test
public void testTryReleaseManyExpired() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
List<String> timedPermitsIds = s.tryAcquire(3, 100, 100, TimeUnit.MILLISECONDS);
List<String> infinitePermitsIds = s.tryAcquire(3);
assertThat(s.availablePermits()).isEqualTo(4);
Thread.sleep(200);
int released = s.tryRelease(infinitePermitsIds);
assertThat(released).isEqualTo(3);
assertThat(s.availablePermits()).isEqualTo(10);
released = s.tryRelease(timedPermitsIds);
assertThat(released).isEqualTo(0);
assertThat(s.availablePermits()).isEqualTo(10);
}
}

Loading…
Cancel
Save