|
|
|
@ -26,6 +26,8 @@ import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
|
import org.redisson.misc.RedissonPromise;
|
|
|
|
|
import org.redisson.pubsub.SemaphorePubSub;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
@ -44,6 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
*/
|
|
|
|
|
public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
|
|
|
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(RSemaphore.class);
|
|
|
|
|
|
|
|
|
|
private final SemaphorePubSub semaphorePubSub;
|
|
|
|
|
|
|
|
|
|
final CommandAsyncExecutor commandExecutor;
|
|
|
|
@ -279,48 +283,58 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean tryAcquire(int permits, long waitTime, TimeUnit unit) throws InterruptedException {
|
|
|
|
|
log.debug("trying to acquire, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
|
|
|
|
|
|
|
|
|
|
long time = unit.toMillis(waitTime);
|
|
|
|
|
long current = System.currentTimeMillis();
|
|
|
|
|
|
|
|
|
|
if (tryAcquire(permits)) {
|
|
|
|
|
log.debug("acquired, permits: {}, waitTime: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
|
if (time <= 0) {
|
|
|
|
|
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
current = System.currentTimeMillis();
|
|
|
|
|
RFuture<RedissonLockEntry> future = subscribe();
|
|
|
|
|
if (!future.await(time, TimeUnit.MILLISECONDS)) {
|
|
|
|
|
log.debug("unable to subscribe for permits acquisition, permits: {}, name: {}", permits, getName());
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
|
if (time <= 0) {
|
|
|
|
|
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
current = System.currentTimeMillis();
|
|
|
|
|
if (tryAcquire(permits)) {
|
|
|
|
|
log.debug("acquired, permits: {}, wait-time: {}, unit: {}, name: {}", permits, waitTime, unit, getName());
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
|
if (time <= 0) {
|
|
|
|
|
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// waiting for message
|
|
|
|
|
current = System.currentTimeMillis();
|
|
|
|
|
|
|
|
|
|
log.debug("wait for acquisition, permits: {}, wait-time(ms): {}, name: {}", permits, time, getName());
|
|
|
|
|
future.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
time -= System.currentTimeMillis() - current;
|
|
|
|
|
if (time <= 0) {
|
|
|
|
|
log.debug("unable to acquire, permits: {}, name: {}", permits, getName());
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -434,10 +448,18 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
|
|
|
|
|
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
|
|
|
|
|
"redis.call('publish', KEYS[2], value); ",
|
|
|
|
|
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
|
|
|
|
|
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
|
|
|
|
|
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
|
|
|
|
|
"redis.call('publish', KEYS[2], value); ",
|
|
|
|
|
Arrays.asList(getRawName(), getChannelName()), permits);
|
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
|
future.onComplete((o, e) -> {
|
|
|
|
|
if (e == null) {
|
|
|
|
|
log.debug("released, permits: {}, name: {}", permits, getName());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
return future;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -474,15 +496,26 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> trySetPermitsAsync(int permits) {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
|
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
|
"local value = redis.call('get', KEYS[1]); " +
|
|
|
|
|
"if (value == false or value == 0) then "
|
|
|
|
|
+ "redis.call('set', KEYS[1], ARGV[1]); "
|
|
|
|
|
+ "redis.call('publish', KEYS[2], ARGV[1]); "
|
|
|
|
|
+ "return 1;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return 0;",
|
|
|
|
|
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
|
|
|
|
|
"if (value == false or value == 0) then "
|
|
|
|
|
+ "redis.call('set', KEYS[1], ARGV[1]); "
|
|
|
|
|
+ "redis.call('publish', KEYS[2], ARGV[1]); "
|
|
|
|
|
+ "return 1;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return 0;",
|
|
|
|
|
Arrays.asList(getRawName(), getChannelName()), permits);
|
|
|
|
|
|
|
|
|
|
if (log.isDebugEnabled()) {
|
|
|
|
|
future.onComplete((r, e) -> {
|
|
|
|
|
if (r) {
|
|
|
|
|
log.debug("permits set, permits: {}, name: {}", permits, getName());
|
|
|
|
|
} else {
|
|
|
|
|
log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
return future;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|