|
|
|
@ -81,9 +81,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public List<String> acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException {
|
|
|
|
|
List<String> permitsIds = tryAcquire(permits, ttl, timeUnit);
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
List<String> ids = tryAcquire(permits, ttl, timeUnit);
|
|
|
|
|
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) {
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<RedissonLockEntry> future = subscribe();
|
|
|
|
@ -92,13 +92,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
try {
|
|
|
|
|
while (true) {
|
|
|
|
|
Long nearestTimeout;
|
|
|
|
|
permitsIds = tryAcquire(permits, ttl, timeUnit);
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
ids = tryAcquire(permits, ttl, timeUnit);
|
|
|
|
|
if (ids.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else if (hasOnlyNextTimeout(ids)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (nearestTimeout != null) {
|
|
|
|
@ -129,9 +129,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
public RFuture<List<String>> acquireAsync(int permits, long ttl, TimeUnit timeUnit) {
|
|
|
|
|
long timeoutDate = calcTimeout(ttl, timeUnit);
|
|
|
|
|
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
|
|
|
|
|
CompletionStage<List<String>> f = tryAcquireFuture.thenCompose(permitsIds -> {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return CompletableFuture.completedFuture(permitsIds);
|
|
|
|
|
CompletionStage<List<String>> f = tryAcquireFuture.thenCompose(ids -> {
|
|
|
|
|
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) {
|
|
|
|
|
return CompletableFuture.completedFuture(ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
|
|
|
|
@ -140,9 +140,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
});
|
|
|
|
|
f.whenComplete((r, e) -> {
|
|
|
|
|
if (f.toCompletableFuture().isCancelled()) {
|
|
|
|
|
tryAcquireFuture.whenComplete((permitsIds, ex) -> {
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
tryAcquireFuture.whenComplete((ids, ex) -> {
|
|
|
|
|
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) {
|
|
|
|
|
releaseAsync(ids);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -165,7 +165,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
long timeoutDate = calcTimeout(ttl, timeUnit);
|
|
|
|
|
long curr = System.currentTimeMillis();
|
|
|
|
|
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
|
|
|
|
|
tryAcquireFuture.whenComplete((permitsIds, e) -> {
|
|
|
|
|
tryAcquireFuture.whenComplete((ids, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
result.completeExceptionally(e);
|
|
|
|
@ -173,14 +173,14 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Long nearestTimeout;
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
if (ids.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else if (hasOnlyNextTimeout(ids)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
if (!result.complete(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
if (!result.complete(ids)) {
|
|
|
|
|
releaseAsync(ids);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -266,15 +266,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
if (e != null) {
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
}
|
|
|
|
|
}).thenCompose(permitsIds -> {
|
|
|
|
|
}).thenCompose(ids -> {
|
|
|
|
|
Long nearestTimeout;
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
if (ids.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else if (hasOnlyNextTimeout(ids)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
unsubscribe(entry);
|
|
|
|
|
return CompletableFuture.completedFuture(permitsIds);
|
|
|
|
|
return CompletableFuture.completedFuture(ids);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (entry.getLatch().tryAcquire(permits)) {
|
|
|
|
@ -342,15 +342,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<List<String>> tryAcquireAsync(int permits) {
|
|
|
|
|
CompletableFuture<List<String>> future = tryAcquireAsync(permits, nonExpirableTimeout).toCompletableFuture()
|
|
|
|
|
.thenApply(permitsIds -> {
|
|
|
|
|
if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
.thenApply(ids -> {
|
|
|
|
|
if (hasOnlyNextTimeout(ids)) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return permitsIds;
|
|
|
|
|
return ids;
|
|
|
|
|
});
|
|
|
|
|
future.whenComplete((permitsIds, e) -> {
|
|
|
|
|
if (future.isCancelled() && !permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
future.whenComplete((ids, e) -> {
|
|
|
|
|
if (future.isCancelled() && !ids.isEmpty() && !hasOnlyNextTimeout(ids)) {
|
|
|
|
|
releaseAsync(ids);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
@ -440,9 +440,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
long time = unit.toMillis(waitTime);
|
|
|
|
|
long current = System.currentTimeMillis();
|
|
|
|
|
|
|
|
|
|
List<String> permitsIds = tryAcquire(permits, ttl, unit);
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
List<String> ids = tryAcquire(permits, ttl, unit);
|
|
|
|
|
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) {
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
@ -468,13 +468,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
while (true) {
|
|
|
|
|
current = System.currentTimeMillis();
|
|
|
|
|
Long nearestTimeout;
|
|
|
|
|
permitsIds = tryAcquire(permits, ttl, unit);
|
|
|
|
|
if (permitsIds.isEmpty()) {
|
|
|
|
|
ids = tryAcquire(permits, ttl, unit);
|
|
|
|
|
if (ids.isEmpty()) {
|
|
|
|
|
nearestTimeout = null;
|
|
|
|
|
} else if (hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(permitsIds.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else if (hasOnlyNextTimeout(ids)) {
|
|
|
|
|
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
|
|
|
|
|
} else {
|
|
|
|
|
return permitsIds;
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
@ -509,15 +509,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
|
|
|
|
|
long curr = System.currentTimeMillis();
|
|
|
|
|
long timeoutDate = calcTimeout(ttl, timeUnit);
|
|
|
|
|
tryAcquireAsync(permits, timeoutDate).whenComplete((permitsIds, e) -> {
|
|
|
|
|
tryAcquireAsync(permits, timeoutDate).whenComplete((ids, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
result.completeExceptionally(e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!permitsIds.isEmpty() && !hasOnlyNextTimeout(permitsIds)) {
|
|
|
|
|
if (!result.complete(permitsIds)) {
|
|
|
|
|
releaseAsync(permitsIds);
|
|
|
|
|
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) {
|
|
|
|
|
if (!result.complete(ids)) {
|
|
|
|
|
releaseAsync(ids);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -865,15 +865,15 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
return get(updateLeaseTimeAsync(permitId, leaseTime, unit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static boolean hasOnlyNextTimeout(List<String> list) {
|
|
|
|
|
return list.size() == 1 && list.get(0).startsWith(":");
|
|
|
|
|
private static boolean hasOnlyNextTimeout(List<String> ids) {
|
|
|
|
|
return ids.size() == 1 && ids.get(0).startsWith(":");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static String getFirstOrNull(List<String> list) {
|
|
|
|
|
if (list.isEmpty()) {
|
|
|
|
|
private static String getFirstOrNull(List<String> ids) {
|
|
|
|
|
if (ids.isEmpty()) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return list.get(0);
|
|
|
|
|
return ids.get(0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|