|
|
|
@ -627,6 +627,16 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
public int availablePermits() {
|
|
|
|
|
return get(availablePermitsAsync());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int totalPermits() {
|
|
|
|
|
return get(totalPermitsAsync());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int claimedPermits() {
|
|
|
|
|
return get(claimedPermitsAsync());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> availablePermitsAsync() {
|
|
|
|
@ -645,11 +655,75 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
Arrays.<Object>asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> totalPermitsAsync() {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
|
|
|
|
|
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " +
|
|
|
|
|
"if #expiredIds > 0 then " +
|
|
|
|
|
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
|
|
|
|
|
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
|
|
|
|
|
"if tonumber(value) > 0 then " +
|
|
|
|
|
"redis.call('publish', KEYS[3], value); " +
|
|
|
|
|
"end;" +
|
|
|
|
|
"return value; " +
|
|
|
|
|
"end; " +
|
|
|
|
|
"local available = redis.call('get', KEYS[1]); " +
|
|
|
|
|
"local claimed = redis.call('zcount', KEYS[2], 0, ARGV[1]); " +
|
|
|
|
|
"return available == false and 0 or (available + (claimed == false and 0 or claimed));",
|
|
|
|
|
Arrays.<Object>asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> claimedPermitsAsync() {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
|
|
|
|
|
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " +
|
|
|
|
|
"if #expiredIds > 0 then " +
|
|
|
|
|
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
|
|
|
|
|
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
|
|
|
|
|
"if tonumber(value) > 0 then " +
|
|
|
|
|
"redis.call('publish', KEYS[3], value); " +
|
|
|
|
|
"end;" +
|
|
|
|
|
"return value; " +
|
|
|
|
|
"end; " +
|
|
|
|
|
"local claimed = redis.call('zcount', KEYS[2], 0, ARGV[1]); " +
|
|
|
|
|
"return claimed == false and 0 or claimed;",
|
|
|
|
|
Arrays.<Object>asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean trySetPermits(int permits) {
|
|
|
|
|
return get(trySetPermitsAsync(permits));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean trySetMaximumPermits(int permits) {
|
|
|
|
|
return get(trySetMaximumPermitsAsync(permits));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> trySetMaximumPermitsAsync(int permits) {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
|
"local available = redis.call('get', KEYS[1]); " +
|
|
|
|
|
"if (available == false) then "
|
|
|
|
|
+ "redis.call('set', KEYS[1], ARGV[1]); "
|
|
|
|
|
+ "redis.call('publish', KEYS[2], ARGV[1]); "
|
|
|
|
|
+ "return 1;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
"local claimed = redis.call('zcount', KEYS[2], 0, ARGV[1]); " +
|
|
|
|
|
"local maximum = (claimed == false and 0 or claimed) + tonumber(available); " +
|
|
|
|
|
"if (maximum == ARGV[1]) then "
|
|
|
|
|
+ "return 0;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
"local delta = maximum - tonumber(ARGV[1]); " +
|
|
|
|
|
"if (delta == 0) then "
|
|
|
|
|
+ "return 0;"
|
|
|
|
|
+ "end;"
|
|
|
|
|
"redis.call('incrby', KEYS[1], tonumber(delta)); " +
|
|
|
|
|
"redis.call('publish', KEYS[2], permits); " +
|
|
|
|
|
"return 0;",
|
|
|
|
|
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> trySetPermitsAsync(int permits) {
|
|
|
|
|
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
@ -677,7 +751,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); "
|
|
|
|
|
+ "if tonumber(ARGV[1]) > 0 then "
|
|
|
|
|
+ "redis.call('publish', KEYS[2], ARGV[1]); "
|
|
|
|
|
+ "redis.call('publish', KEYS[2], tonumber(value) + tonumbers(ARGV[1])); "
|
|
|
|
|
+ "end;",
|
|
|
|
|
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
|
|
|
|
|
}
|
|
|
|
|