refactoring

pull/5775/head
Nikita Koksharov 10 months ago
parent a36f0f4781
commit 6482246358

@ -28,6 +28,7 @@ import org.redisson.pubsub.SemaphorePubSub;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.*;
@ -280,16 +281,21 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
public RFuture<Boolean> tryAcquireAsync(long waitTime, TimeUnit unit) {
return tryAcquireAsync(1, waitTime, unit);
}
@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
LOGGER.debug("trying to acquire, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
public boolean tryAcquire(Duration waitTime) throws InterruptedException {
return tryAcquire(1, waitTime);
}
long time = unit.toMillis(waitTime);
@Override
public boolean tryAcquire(int permits, Duration waitTime) throws InterruptedException {
LOGGER.debug("trying to acquire, permits: {}, waitTime: {}, name: {}", permits, waitTime, getName());
long time = waitTime.toMillis();
long current = System.currentTimeMillis();
if (tryAcquire(permits)) {
LOGGER.debug("acquired, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
LOGGER.debug("acquired, permits: {}, waitTime: {}, name: {}", permits, waitTime, getName());
return true;
}
@ -318,11 +324,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
LOGGER.debug("unable to acquire, permits: {}, name: {}", permits, getName());
return false;
}
while (true) {
current = System.currentTimeMillis();
if (tryAcquire(permits)) {
LOGGER.debug("acquired, permits: {}, wait-time: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
LOGGER.debug("acquired, permits: {}, wait-time: {}, name: {}", permits, waitTime, getName());
return true;
}
@ -347,13 +353,18 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
} finally {
unsubscribe(entry);
}
// return get(tryAcquireAsync(permits, waitTime, unit));
// return get(tryAcquireAsync(permits, waitTime));
}
@Override
public RFuture<Boolean> tryAcquireAsync(int permits, long waitTime, TimeUnit unit) {
public RFuture<Boolean> tryAcquireAsync(Duration waitTime) {
return tryAcquireAsync(1, waitTime);
}
@Override
public RFuture<Boolean> tryAcquireAsync(int permits, Duration waitTime) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
AtomicLong time = new AtomicLong(waitTime.toMillis());
long curr = System.currentTimeMillis();
RFuture<Boolean> tryAcquireFuture = tryAcquireAsync(permits);
tryAcquireFuture.whenComplete((res, e) -> {
@ -368,15 +379,15 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
return;
}
long elap = System.currentTimeMillis() - curr;
time.addAndGet(-elap);
if (time.get() <= 0) {
result.complete(false);
return;
}
long current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe();
semaphorePubSub.timeout(subscribeFuture, time.get());
@ -388,19 +399,29 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
long elapsed = System.currentTimeMillis() - current;
time.addAndGet(-elapsed);
if (time.get() < 0) {
unsubscribe(r);
result.complete(false);
return;
}
tryAcquireAsync(time, permits, r, result);
});
});
return new CompletableFutureWrapper<>(result);
}
@Override
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
return tryAcquire(permits, Duration.ofMillis(unit.toMillis(waitTime)));
}
@Override
public RFuture<Boolean> tryAcquireAsync(int permits, long waitTime, TimeUnit unit) {
return tryAcquireAsync(permits, Duration.ofMillis(unit.toMillis(waitTime)));
}
private CompletableFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getRawName(), getChannelName());
}

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@ -67,16 +68,26 @@ public interface RSemaphore extends RExpirable, RSemaphoreAsync {
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param waitTime the maximum time to wait
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
* @throws InterruptedException if the current thread was interrupted
*/
boolean tryAcquire(Duration waitTime) throws InterruptedException;
/**
* Use {@link #tryAcquire(Duration)} instead
*
* @param waitTime the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
* @throws InterruptedException if the current thread was interrupted
*/
@Deprecated
boolean tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
* Use {@link #tryAcquire(int, Duration)} instead
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
@ -85,8 +96,21 @@ public interface RSemaphore extends RExpirable, RSemaphoreAsync {
* otherwise
* @throws InterruptedException if the current thread was interrupted
*/
@Deprecated
boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException;
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
* @throws InterruptedException if the current thread was interrupted
*/
boolean tryAcquire(int permits, Duration waitTime) throws InterruptedException;
/**
* Releases a permit. Increases the number of available permits.
*

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@ -88,19 +89,28 @@ public interface RSemaphoreAsync extends RExpirableAsync {
RFuture<Boolean> trySetPermitsAsync(int permits);
/**
* Tries to acquire currently available permit.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
* Use {@link #tryAcquireAsync(Duration)} instead
*
* @param waitTime the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
*/
@Deprecated
RFuture<Boolean> tryAcquireAsync(long waitTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
* Tries to acquire currently available permit.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param waitTime the maximum time to wait
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
*/
RFuture<Boolean> tryAcquireAsync(Duration waitTime);
/**
* Use {@link #tryAcquireAsync(int, Duration)} instead
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
@ -108,8 +118,20 @@ public interface RSemaphoreAsync extends RExpirableAsync {
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
*/
@Deprecated
RFuture<Boolean> tryAcquireAsync(int permits, long waitTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
*/
RFuture<Boolean> tryAcquireAsync(int permits, Duration waitTime);
/**
* Increases or decreases the number of available permits by defined value.
*

@ -17,6 +17,7 @@ package org.redisson.api;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@ -90,16 +91,38 @@ public interface RSemaphoreReactive extends RExpirableReactive {
Mono<Boolean> trySetPermits(int permits);
/**
* Tries to acquire currently available permit.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
* Use {@link #tryAcquire(Duration)} instead
*
* @param waitTime the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
*/
@Deprecated
Mono<Boolean> tryAcquire(long waitTime, TimeUnit unit);
/**
* Tries to acquire currently available permit.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param waitTime the maximum time to wait
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
*/
Mono<Boolean> tryAcquire(Duration waitTime);
/**
* Use {@link #tryAcquire(int, Duration)} instead
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
*/
@Deprecated
Mono<Boolean> tryAcquire(int permits, long waitTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
@ -110,7 +133,7 @@ public interface RSemaphoreReactive extends RExpirableReactive {
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
*/
Mono<Boolean> tryAcquire(int permits, long waitTime, TimeUnit unit);
Mono<Boolean> tryAcquire(int permits, Duration waitTime);
/**
* Increases or decreases the number of available permits by defined value.

@ -18,6 +18,7 @@ package org.redisson.api;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@ -91,19 +92,28 @@ public interface RSemaphoreRx extends RExpirableRx {
Single<Boolean> trySetPermits(int permits);
/**
* Tries to acquire currently available permit.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
* Use {@link #tryAcquire(Duration)} instead
*
* @param waitTime the maximum time to wait
* @param unit the time unit
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
*/
@Deprecated
Single<Boolean> tryAcquire(long waitTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
* Tries to acquire currently available permit.
* Waits up to defined <code>waitTime</code> if necessary until a permit became available.
*
* @param waitTime the maximum time to wait
* @return <code>true</code> if a permit was acquired and <code>false</code>
* otherwise
*/
Single<Boolean> tryAcquire(Duration waitTime);
/**
* Use {@link #tryAcquire(int, Duration)} instead
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
@ -111,8 +121,20 @@ public interface RSemaphoreRx extends RExpirableRx {
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
*/
@Deprecated
Single<Boolean> tryAcquire(int permits, long waitTime, TimeUnit unit);
/**
* Tries to acquire defined amount of currently available <code>permits</code>.
* Waits up to defined <code>waitTime</code> if necessary until all permits became available.
*
* @param permits amount of permits
* @param waitTime the maximum time to wait
* @return <code>true</code> if permits were acquired and <code>false</code>
* otherwise
*/
Single<Boolean> tryAcquire(int permits, Duration waitTime);
/**
* Increases or decreases the number of available permits by defined value.
*

@ -44,7 +44,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testZero() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
assertThat(s.tryAcquire(0, 10, TimeUnit.MINUTES)).isTrue();
assertThat(s.tryAcquire(0, Duration.ofMinutes(10))).isTrue();
s.release(0);
assertThat(s.availablePermits()).isZero();
}
@ -191,7 +191,7 @@ public class RedissonSemaphoreTest extends BaseConcurrentTest {
t.join(1);
Awaitility.await().between(Duration.ofMillis(900), Duration.ofMillis(1200)).untilAsserted(() -> {
assertThat(s.tryAcquire(4, 2, TimeUnit.SECONDS)).isTrue();
assertThat(s.tryAcquire(4, Duration.ofSeconds(2))).isTrue();
});
assertThat(s.availablePermits()).isEqualTo(0);

Loading…
Cancel
Save