|
|
|
@ -68,7 +68,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
@Override
|
|
|
|
|
public String acquire(long leaseTime, TimeUnit timeUnit) throws InterruptedException {
|
|
|
|
|
List<String> ids = acquire(1, leaseTime, timeUnit);
|
|
|
|
|
return ids.isEmpty() ? null : ids.get(0);
|
|
|
|
|
return getFirstOrNull(ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -82,7 +82,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
@Override
|
|
|
|
|
public List<String> acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException {
|
|
|
|
|
List<String> permitsIds = tryAcquire(permits, ttl, timeUnit);
|
|
|
|
|
if (!permitsIds.isEmpty() && !permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -95,12 +95,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
permitsIds = tryAcquire(permits, ttl, timeUnit);
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
if (permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
}
|
|
|
|
|
return permitsIds;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (nearestTimeout != null) {
|
|
|
|
@ -131,20 +129,18 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
long timeoutDate = calcTimeout(ttl, timeUnit);
|
|
|
|
|
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
|
|
|
|
|
CompletionStage<List<String>> f = tryAcquireFuture.thenCompose(permitsIds -> {
|
|
|
|
|
if (!permitsIds.isEmpty() && !permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return CompletableFuture.completedFuture(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
|
|
|
|
|
semaphorePubSub.timeout(subscribeFuture);
|
|
|
|
|
return subscribeFuture.thenCompose(res -> {
|
|
|
|
|
return acquireAsync(permits, res, ttl, timeUnit);
|
|
|
|
|
});
|
|
|
|
|
return subscribeFuture.thenCompose(res -> acquireAsync(permits, res, ttl, timeUnit));
|
|
|
|
|
});
|
|
|
|
|
f.whenComplete((r, e) -> {
|
|
|
|
|
if (f.toCompletableFuture().isCancelled()) {
|
|
|
|
|
tryAcquireFuture.whenComplete((permitsIds, ex) -> {
|
|
|
|
|
if (!permitsIds.isEmpty() && !permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
@ -178,16 +174,14 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
Long nearestTimeout;
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
if (permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
if (!result.complete(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
if (!result.complete(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
long el = System.currentTimeMillis() - curr;
|
|
|
|
@ -275,13 +269,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
Long nearestTimeout;
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
if (permitsIds.size() == 1 && permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
return CompletableFuture.completedFuture(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
return CompletableFuture.completedFuture(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (entry.getLatch().tryAcquire(permits)) {
|
|
|
|
@ -315,13 +307,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
@Override
|
|
|
|
|
public String tryAcquire() {
|
|
|
|
|
List<String> ids = tryAcquire(1);
|
|
|
|
|
return ids.isEmpty() ? null : ids.get(0);
|
|
|
|
|
return getFirstOrNull(ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public List<String> tryAcquire(int permits) {
|
|
|
|
|
List<String> ids = tryAcquire(permits, -1, TimeUnit.MILLISECONDS);
|
|
|
|
|
if (ids.size() == 1 && ids.get(0).startsWith(":")) {
|
|
|
|
|
if (hasOnlyNextTimeout(ids)) {
|
|
|
|
|
return Collections.emptyList();
|
|
|
|
|
}
|
|
|
|
|
return ids;
|
|
|
|
@ -342,7 +334,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<String> tryAcquireAsync() {
|
|
|
|
|
CompletionStage<String> future = tryAcquireAsync(1)
|
|
|
|
|
.thenApply(r -> r.isEmpty() ? null : r.get(0));
|
|
|
|
|
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
|
|
|
|
|
return new CompletableFutureWrapper<>(future);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -350,13 +342,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
public RFuture<List<String>> tryAcquireAsync(int permits) {
|
|
|
|
|
CompletableFuture<List<String>> future = tryAcquireAsync(permits, nonExpirableTimeout).toCompletableFuture()
|
|
|
|
|
.thenApply(permitsIds -> {
|
|
|
|
|
if (permitsIds.size() == 1 && permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return permitsIds;
|
|
|
|
|
});
|
|
|
|
|
future.whenComplete((permitsIds, e) -> {
|
|
|
|
|
if (future.isCancelled() && !permitsIds.isEmpty() && !permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (future.isCancelled() && !permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
@ -425,20 +417,20 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<String> tryAcquireAsync(long waitTime, TimeUnit unit) {
|
|
|
|
|
CompletionStage<String> future = tryAcquireAsync(1, waitTime, -1, unit)
|
|
|
|
|
.thenApply(ids -> ids.isEmpty() ? null : ids.get(0));
|
|
|
|
|
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
|
|
|
|
|
return new CompletableFutureWrapper<>(future);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String tryAcquire(long waitTime, long ttl, TimeUnit unit) throws InterruptedException {
|
|
|
|
|
List<String> ids = tryAcquire(1, waitTime, ttl, unit);
|
|
|
|
|
return ids.isEmpty() ? null : ids.get(0);
|
|
|
|
|
return getFirstOrNull(ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<String> tryAcquireAsync(long waitTime, long ttl, TimeUnit unit) {
|
|
|
|
|
CompletionStage<String> future = tryAcquireAsync(1, waitTime, ttl, unit)
|
|
|
|
|
.thenApply(ids -> ids.isEmpty() ? null : ids.get(0));
|
|
|
|
|
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
|
|
|
|
|
return new CompletableFutureWrapper<>(future);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -448,7 +440,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
long current = System.currentTimeMillis();
|
|
|
|
|
|
|
|
|
|
List<String> permitsIds = tryAcquire(permits, ttl, unit);
|
|
|
|
|
if (!permitsIds.isEmpty() && !permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -478,12 +470,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
permitsIds = tryAcquire(permits, ttl, unit);
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
if (permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
}
|
|
|
|
|
return permitsIds;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
@ -524,7 +514,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!permitsIds.isEmpty() && !permitsIds.get(0).startsWith(":")) {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
if (!result.complete(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
}
|
|
|
|
@ -584,11 +574,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException {
|
|
|
|
|
List<String> res = tryAcquire(1, waitTime, -1, unit);
|
|
|
|
|
if (res.isEmpty() || res.get(0).startsWith(":")) {
|
|
|
|
|
List<String> ids = tryAcquire(1, waitTime, -1, unit);
|
|
|
|
|
if (ids.isEmpty() || hasOnlyNextTimeout(ids)) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return res.get(0);
|
|
|
|
|
return ids.get(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -874,4 +864,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
return get(updateLeaseTimeAsync(permitId, leaseTime, unit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static boolean hasOnlyNextTimeout(List<String> list) {
|
|
|
|
|
return list.size() == 1 && list.get(0).startsWith(":");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static String getFirstOrNull(List<String> list) {
|
|
|
|
|
if (list.isEmpty()) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return list.get(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|