RPermitExpirableSemaphore.availablePermits doesn't return actual permits account under certain conditions #659

pull/660/head
Nikita 8 years ago
parent 6af83a1ca0
commit 47743daf03

@ -662,7 +662,18 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Integer> availablePermitsAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_INTEGER, getName());
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"end;" +
"return value; " +
"end; " +
"return redis.call('get', KEYS[1]); ",
Arrays.<Object>asList(getName(), timeoutName, getChannelName()), System.currentTimeMillis());
}
@Override

@ -6,12 +6,32 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.client.RedisException;
public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
@Test
public void testAvailablePermits() throws InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test-semaphore");
assertThat(semaphore.trySetPermits(2)).isTrue();
Assert.assertEquals(2, semaphore.availablePermits());
String acquire1 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire1).isNotNull();
String acquire2 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire2).isNotNull();
String acquire3 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire3).isNull();
Assert.assertEquals(0, semaphore.availablePermits());
Thread.sleep(1100);
String acquire4 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire4).isNotNull();
Thread.sleep(1100);
Assert.assertEquals(2, semaphore.availablePermits());
}
@Test
public void testExpire() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");

Loading…
Cancel
Save