Rename ttl to leaseTime (according ot api)

Signed-off-by: Sergey Kuznetsov <sergey.kuznetsov@infobip.com>
pull/5247/head
Sergey Kuznetsov 2 years ago
parent 52e276288d
commit 8a8bb319e5

@ -16,7 +16,6 @@
package org.redisson; package org.redisson;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.ByteArrayCodec;
@ -70,19 +69,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
List<String> ids = acquire(1, leaseTime, timeUnit); List<String> ids = acquire(1, leaseTime, timeUnit);
return getFirstOrNull(ids); return getFirstOrNull(ids);
} }
@Override
public RFuture<String> acquireAsync(long leaseTime, TimeUnit timeUnit) {
CompletionStage<String> future = acquireAsync(1, leaseTime, timeUnit)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
@Override @Override
public List<String> acquire(int permits, long ttl, TimeUnit timeUnit) throws InterruptedException { public List<String> acquire(int permits, long leaseTime, TimeUnit timeUnit) throws InterruptedException {
List<String> ids = tryAcquire(permits, ttl, timeUnit); List<String> ids = tryAcquire(permits, leaseTime, timeUnit);
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) { if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
return ids; return ids;
} }
@ -92,10 +83,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
try { try {
while (true) { while (true) {
Long nearestTimeout; Long nearestTimeout;
ids = tryAcquire(permits, ttl, timeUnit); ids = tryAcquire(permits, leaseTime, timeUnit);
if (ids.isEmpty()) { if (ids.isEmpty()) {
nearestTimeout = null; nearestTimeout = null;
} else if (hasOnlyNextTimeout(ids)) { } else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis(); nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else { } else {
return ids; return ids;
@ -110,7 +101,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} finally { } finally {
unsubscribe(entry); unsubscribe(entry);
} }
// return get(acquireAsync(permits, ttl, timeUnit)); // return get(acquireAsync(permits, leaseTime, timeUnit));
} }
@Override @Override
@ -126,22 +117,30 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
@Override @Override
public RFuture<List<String>> acquireAsync(int permits, long ttl, TimeUnit timeUnit) { public RFuture<String> acquireAsync(long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit); CompletionStage<String> future = acquireAsync(1, leaseTime, timeUnit)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future);
}
@Override
public RFuture<List<String>> acquireAsync(int permits, long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(leaseTime, timeUnit);
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate); RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
CompletionStage<List<String>> f = tryAcquireFuture.thenCompose(ids -> { CompletionStage<List<String>> f = tryAcquireFuture.thenCompose(ids -> {
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) { if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
return CompletableFuture.completedFuture(ids); return CompletableFuture.completedFuture(ids);
} }
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
semaphorePubSub.timeout(subscribeFuture); semaphorePubSub.timeout(subscribeFuture);
return subscribeFuture.thenCompose(res -> acquireAsync(permits, res, ttl, timeUnit)); return subscribeFuture.thenCompose(res -> acquireAsync(permits, res, leaseTime, timeUnit));
}); });
f.whenComplete((r, e) -> { f.whenComplete((r, e) -> {
if (f.toCompletableFuture().isCancelled()) { if (f.toCompletableFuture().isCancelled()) {
tryAcquireFuture.whenComplete((ids, ex) -> { tryAcquireFuture.whenComplete((ids, ex) -> {
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) { if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
releaseAsync(ids); releaseAsync(ids);
} }
}); });
@ -150,7 +149,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return new CompletableFutureWrapper<>(f); return new CompletableFutureWrapper<>(f);
} }
private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, CompletableFuture<List<String>> result, long ttl, TimeUnit timeUnit) { private void tryAcquireAsync(AtomicLong time, int permits, RedissonLockEntry entry, CompletableFuture<List<String>> result, long leaseTime, TimeUnit timeUnit) {
if (result.isDone()) { if (result.isDone()) {
unsubscribe(entry); unsubscribe(entry);
return; return;
@ -162,7 +161,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return; return;
} }
long timeoutDate = calcTimeout(ttl, timeUnit); long timeoutDate = calcTimeout(leaseTime, timeUnit);
long curr = System.currentTimeMillis(); long curr = System.currentTimeMillis();
RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate); RFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate);
tryAcquireFuture.whenComplete((ids, e) -> { tryAcquireFuture.whenComplete((ids, e) -> {
@ -175,7 +174,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
Long nearestTimeout; Long nearestTimeout;
if (ids.isEmpty()) { if (ids.isEmpty()) {
nearestTimeout = null; nearestTimeout = null;
} else if (hasOnlyNextTimeout(ids)) { } else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis(); nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else { } else {
unsubscribe(entry); unsubscribe(entry);
@ -197,24 +196,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
// waiting for message // waiting for message
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
if (entry.getLatch().tryAcquire()) { if (entry.getLatch().tryAcquire()) {
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit); tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
} else { } else {
AtomicReference<Timeout> waitTimeoutFutureRef = new AtomicReference<>(); AtomicReference<Timeout> waitTimeoutFutureRef = new AtomicReference<>();
Timeout scheduledFuture; Timeout scheduledFuture;
if (nearestTimeout != null) { if (nearestTimeout != null) {
scheduledFuture = getServiceManager().newTimeout(new TimerTask() { scheduledFuture = getServiceManager().newTimeout(timeout -> {
@Override if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
public void run(Timeout timeout) throws Exception { return;
if (waitTimeoutFutureRef.get() != null && !waitTimeoutFutureRef.get().cancel()) {
return;
}
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit);
} }
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}, nearestTimeout, TimeUnit.MILLISECONDS); }, nearestTimeout, TimeUnit.MILLISECONDS);
} else { } else {
scheduledFuture = null; scheduledFuture = null;
@ -233,24 +229,21 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed); time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit); tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}; };
entry.addListener(listener); entry.addListener(listener);
long t = time.get(); long t = time.get();
Timeout waitTimeoutFuture = getServiceManager().newTimeout(new TimerTask() { Timeout waitTimeoutFuture = getServiceManager().newTimeout(timeout -> {
@Override if (scheduledFuture != null && !scheduledFuture.cancel()) {
public void run(Timeout timeout) throws Exception { return;
if (scheduledFuture != null && !scheduledFuture.cancel()) { }
return;
}
if (entry.removeListener(listener)) { if (entry.removeListener(listener)) {
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed); time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, entry, result, ttl, timeUnit); tryAcquireAsync(time, permits, entry, result, leaseTime, timeUnit);
}
} }
}, t, TimeUnit.MILLISECONDS); }, t, TimeUnit.MILLISECONDS);
waitTimeoutFutureRef.set(waitTimeoutFuture); waitTimeoutFutureRef.set(waitTimeoutFuture);
@ -259,8 +252,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
private CompletableFuture<List<String>> acquireAsync(int permits, RedissonLockEntry entry, long ttl, TimeUnit timeUnit) { private CompletableFuture<List<String>> acquireAsync(int permits, RedissonLockEntry entry, long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit); long timeoutDate = calcTimeout(leaseTime, timeUnit);
CompletableFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate).toCompletableFuture(); CompletableFuture<List<String>> tryAcquireFuture = tryAcquireAsync(permits, timeoutDate).toCompletableFuture();
return tryAcquireFuture.whenComplete((p, e) -> { return tryAcquireFuture.whenComplete((p, e) -> {
if (e != null) { if (e != null) {
@ -270,7 +263,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
Long nearestTimeout; Long nearestTimeout;
if (ids.isEmpty()) { if (ids.isEmpty()) {
nearestTimeout = null; nearestTimeout = null;
} else if (hasOnlyNextTimeout(ids)) { } else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis(); nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else { } else {
unsubscribe(entry); unsubscribe(entry);
@ -278,14 +271,14 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
if (entry.getLatch().tryAcquire(permits)) { if (entry.getLatch().tryAcquire(permits)) {
return acquireAsync(permits, entry, ttl, timeUnit); return acquireAsync(permits, entry, leaseTime, timeUnit);
} }
CompletableFuture<List<String>> res = new CompletableFuture<>(); CompletableFuture<List<String>> res = new CompletableFuture<>();
Timeout scheduledFuture; Timeout scheduledFuture;
if (nearestTimeout != null) { if (nearestTimeout != null) {
scheduledFuture = getServiceManager().newTimeout(timeout -> { scheduledFuture = getServiceManager().newTimeout(timeout -> {
CompletableFuture<List<String>> r = acquireAsync(permits, entry, ttl, timeUnit); CompletableFuture<List<String>> r = acquireAsync(permits, entry, leaseTime, timeUnit);
commandExecutor.transfer(r, res); commandExecutor.transfer(r, res);
}, nearestTimeout, TimeUnit.MILLISECONDS); }, nearestTimeout, TimeUnit.MILLISECONDS);
} else { } else {
@ -297,7 +290,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
entry.getLatch().release(); entry.getLatch().release();
return; return;
} }
CompletableFuture<List<String>> r = acquireAsync(permits, entry, ttl, timeUnit); CompletableFuture<List<String>> r = acquireAsync(permits, entry, leaseTime, timeUnit);
commandExecutor.transfer(r, res); commandExecutor.transfer(r, res);
}; };
entry.addListener(listener); entry.addListener(listener);
@ -314,20 +307,20 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override @Override
public List<String> tryAcquire(int permits) { public List<String> tryAcquire(int permits) {
List<String> ids = tryAcquire(permits, -1, TimeUnit.MILLISECONDS); List<String> ids = tryAcquire(permits, -1, TimeUnit.MILLISECONDS);
if (hasOnlyNextTimeout(ids)) { if (hasOnlyNearestTimeout(ids)) {
return Collections.emptyList(); return Collections.emptyList();
} }
return ids; return ids;
} }
private List<String> tryAcquire(int permits, long ttl, TimeUnit timeUnit) { private List<String> tryAcquire(int permits, long leaseTime, TimeUnit timeUnit) {
long timeoutDate = calcTimeout(ttl, timeUnit); long timeoutDate = calcTimeout(leaseTime, timeUnit);
return get(tryAcquireAsync(permits, timeoutDate)); return get(tryAcquireAsync(permits, timeoutDate));
} }
private long calcTimeout(long ttl, TimeUnit timeUnit) { private long calcTimeout(long leaseTime, TimeUnit timeUnit) {
if (ttl != -1) { if (leaseTime != -1) {
return System.currentTimeMillis() + timeUnit.toMillis(ttl); return System.currentTimeMillis() + timeUnit.toMillis(leaseTime);
} }
return nonExpirableTimeout; return nonExpirableTimeout;
} }
@ -343,13 +336,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RFuture<List<String>> tryAcquireAsync(int permits) { public RFuture<List<String>> tryAcquireAsync(int permits) {
CompletableFuture<List<String>> future = tryAcquireAsync(permits, nonExpirableTimeout).toCompletableFuture() CompletableFuture<List<String>> future = tryAcquireAsync(permits, nonExpirableTimeout).toCompletableFuture()
.thenApply(ids -> { .thenApply(ids -> {
if (hasOnlyNextTimeout(ids)) { if (hasOnlyNearestTimeout(ids)) {
return null; return null;
} }
return ids; return ids;
}); });
future.whenComplete((ids, e) -> { future.whenComplete((ids, e) -> {
if (future.isCancelled() && !ids.isEmpty() && !hasOnlyNextTimeout(ids)) { if (future.isCancelled() && !ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
releaseAsync(ids); releaseAsync(ids);
} }
}); });
@ -423,25 +416,25 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
@Override @Override
public String tryAcquire(long waitTime, long ttl, TimeUnit unit) throws InterruptedException { public String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
List<String> ids = tryAcquire(1, waitTime, ttl, unit); List<String> ids = tryAcquire(1, waitTime, leaseTime, unit);
return getFirstOrNull(ids); return getFirstOrNull(ids);
} }
@Override @Override
public RFuture<String> tryAcquireAsync(long waitTime, long ttl, TimeUnit unit) { public RFuture<String> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit) {
CompletionStage<String> future = tryAcquireAsync(1, waitTime, ttl, unit) CompletionStage<String> future = tryAcquireAsync(1, waitTime, leaseTime, unit)
.thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull); .thenApply(RedissonPermitExpirableSemaphore::getFirstOrNull);
return new CompletableFutureWrapper<>(future); return new CompletableFutureWrapper<>(future);
} }
@Override @Override
public List<String> tryAcquire(int permits, long waitTime, long ttl, TimeUnit unit) throws InterruptedException { public List<String> tryAcquire(int permits, long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
List<String> ids = tryAcquire(permits, ttl, unit); List<String> ids = tryAcquire(permits, leaseTime, unit);
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) { if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
return ids; return ids;
} }
@ -468,10 +461,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
while (true) { while (true) {
current = System.currentTimeMillis(); current = System.currentTimeMillis();
Long nearestTimeout; Long nearestTimeout;
ids = tryAcquire(permits, ttl, unit); ids = tryAcquire(permits, leaseTime, unit);
if (ids.isEmpty()) { if (ids.isEmpty()) {
nearestTimeout = null; nearestTimeout = null;
} else if (hasOnlyNextTimeout(ids)) { } else if (hasOnlyNearestTimeout(ids)) {
nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis(); nearestTimeout = Long.parseLong(ids.get(0).substring(1)) - System.currentTimeMillis();
} else { } else {
return ids; return ids;
@ -500,22 +493,22 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} finally { } finally {
unsubscribe(entry); unsubscribe(entry);
} }
// return get(tryAcquireAsync(permits, waitTime, ttl, unit)); // return get(tryAcquireAsync(permits, waitTime, leaseTime, unit));
} }
@Override @Override
public RFuture<List<String>> tryAcquireAsync(int permits, long waitTime, long ttl, TimeUnit timeUnit) { public RFuture<List<String>> tryAcquireAsync(int permits, long waitTime, long leaseTime, TimeUnit timeUnit) {
CompletableFuture<List<String>> result = new CompletableFuture<>(); CompletableFuture<List<String>> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime)); AtomicLong time = new AtomicLong(timeUnit.toMillis(waitTime));
long curr = System.currentTimeMillis(); long curr = System.currentTimeMillis();
long timeoutDate = calcTimeout(ttl, timeUnit); long timeoutDate = calcTimeout(leaseTime, timeUnit);
tryAcquireAsync(permits, timeoutDate).whenComplete((ids, e) -> { tryAcquireAsync(permits, timeoutDate).whenComplete((ids, e) -> {
if (e != null) { if (e != null) {
result.completeExceptionally(e); result.completeExceptionally(e);
return; return;
} }
if (!ids.isEmpty() && !hasOnlyNextTimeout(ids)) { if (!ids.isEmpty() && !hasOnlyNearestTimeout(ids)) {
if (!result.complete(ids)) { if (!result.complete(ids)) {
releaseAsync(ids); releaseAsync(ids);
} }
@ -531,7 +524,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
} }
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>(); AtomicReference<Timeout> futureRef = new AtomicReference<>();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(); CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
subscribeFuture.whenComplete((r, ex) -> { subscribeFuture.whenComplete((r, ex) -> {
if (ex != null) { if (ex != null) {
@ -546,16 +539,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
long elapsed = System.currentTimeMillis() - current; long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed); time.addAndGet(-elapsed);
tryAcquireAsync(time, permits, r, result, ttl, timeUnit); tryAcquireAsync(time, permits, r, result, leaseTime, timeUnit);
}); });
if (!subscribeFuture.isDone()) { if (!subscribeFuture.isDone()) {
Timeout scheduledFuture = getServiceManager().newTimeout(new TimerTask() { Timeout scheduledFuture = getServiceManager().newTimeout(timeout -> {
@Override if (!subscribeFuture.isDone()) {
public void run(Timeout timeout) throws Exception { result.complete(null);
if (!subscribeFuture.isDone()) {
result.complete(null);
}
} }
}, time.get(), TimeUnit.MILLISECONDS); }, time.get(), TimeUnit.MILLISECONDS);
futureRef.set(scheduledFuture); futureRef.set(scheduledFuture);
@ -576,7 +566,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override @Override
public String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException { public String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException {
List<String> ids = tryAcquire(1, waitTime, -1, unit); List<String> ids = tryAcquire(1, waitTime, -1, unit);
if (ids.isEmpty() || hasOnlyNextTimeout(ids)) { if (ids.isEmpty() || hasOnlyNearestTimeout(ids)) {
return null; return null;
} }
return ids.get(0); return ids.get(0);
@ -865,7 +855,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
return get(updateLeaseTimeAsync(permitId, leaseTime, unit)); return get(updateLeaseTimeAsync(permitId, leaseTime, unit));
} }
private static boolean hasOnlyNextTimeout(List<String> ids) { private static boolean hasOnlyNearestTimeout(List<String> ids) {
return ids.size() == 1 && ids.get(0).startsWith(":"); return ids.size() == 1 && ids.get(0).startsWith(":");
} }

Loading…
Cancel
Save