availablePermitsAsync and drainPermitsAsync methods added to RSemaphoreAsync

pull/2563/head
Nikita Koksharov 5 years ago
parent d2d391762b
commit 97c1aaa191

@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RSemaphore; import org.redisson.api.RSemaphore;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
@ -441,26 +442,32 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
Arrays.<Object>asList(getName(), getChannelName()), permits); Arrays.<Object>asList(getName(), getChannelName()), permits);
} }
@Override @Override
public int drainPermits() { public int drainPermits() {
RFuture<Long> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, return get(drainPermitsAsync());
}
@Override
public RFuture<Integer> drainPermitsAsync() {
return commandExecutor.evalWriteAsync(getName(), IntegerCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local value = redis.call('get', KEYS[1]); " + "local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then " + "if (value == false or value == 0) then " +
"return 0; " + "return 0; " +
"end; " + "end; " +
"redis.call('set', KEYS[1], 0); " + "redis.call('set', KEYS[1], 0); " +
"return value;", "return value;",
Collections.<Object>singletonList(getName())); Collections.singletonList(getName()));
Long res = get(future);
return res.intValue();
} }
@Override @Override
public int availablePermits() { public int availablePermits() {
RFuture<Long> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); RFuture<Integer> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, getName());
Long res = get(future); return get(future);
return res.intValue(); }
@Override
public RFuture<Integer> availablePermitsAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, getName());
} }
@Override @Override

@ -123,4 +123,18 @@ public interface RSemaphoreAsync extends RExpirableAsync {
*/ */
RFuture<Void> addPermitsAsync(int permits); RFuture<Void> addPermitsAsync(int permits);
/**
* Returns amount of available permits.
*
* @return number of permits
*/
RFuture<Integer> availablePermitsAsync();
/**
* Acquires and returns all permits that are immediately available.
*
* @return number of permits
*/
RFuture<Integer> drainPermitsAsync();
} }

@ -15,10 +15,10 @@
*/ */
package org.redisson.api; package org.redisson.api;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
/** /**
* Reactive interface of Redis based {@link java.util.concurrent.Semaphore}. * Reactive interface of Redis based {@link java.util.concurrent.Semaphore}.
* <p> * <p>
@ -125,5 +125,18 @@ public interface RSemaphoreReactive extends RExpirableReactive {
*/ */
Mono<Void> addPermits(int permits); Mono<Void> addPermits(int permits);
/**
* Returns amount of available permits.
*
* @return number of permits
*/
Mono<Integer> availablePermits();
/**
* Acquires and returns all permits that are immediately available.
*
* @return number of permits
*/
Mono<Integer> drainPermits();
} }

@ -126,4 +126,18 @@ public interface RSemaphoreRx extends RExpirableRx {
*/ */
Completable addPermits(int permits); Completable addPermits(int permits);
/**
* Returns amount of available permits.
*
* @return number of permits
*/
Single<Integer> availablePermits();
/**
* Acquires and returns all permits that are immediately available.
*
* @return number of permits
*/
Single<Integer> drainPermits();
} }

Loading…
Cancel
Save