Merge pull request #4719 from newrelic-forks/mob/track-max-permits

Add tracking and setters for max permits
pull/4799/head
Nikita Koksharov 2 years ago committed by GitHub
commit 1e0053a869
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -618,6 +618,16 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public int availablePermits() {
return get(availablePermitsAsync());
}
@Override
public int getPermits() {
return get(getPermitsAsync());
}
@Override
public int acquiredPermits() {
return get(acquiredPermitsAsync());
}
@Override
public RFuture<Integer> availablePermitsAsync() {
@ -636,11 +646,74 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
}
@Override
public RFuture<Integer> getPermitsAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"end;" +
"end; " +
"local available = redis.call('get', KEYS[1]); " +
"if available == false then " +
"return 0 " +
"end;" +
"local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " +
"if acquired == false then " +
"return tonumber(available) " +
"end;" +
"return tonumber(available) + acquired;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
}
@Override
public RFuture<Integer> acquiredPermitsAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"end;" +
"end; " +
"local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " +
"return acquired == false and 0 or acquired;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
}
@Override
public boolean trySetPermits(int permits) {
return get(trySetPermitsAsync(permits));
}
@Override
public void setPermits(int permits) {
get(setPermitsAsync(permits));
}
@Override
public RFuture<Void> setPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local available = redis.call('get', KEYS[1]); " +
"if (available == false) then " +
"redis.call('set', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return;" +
"end;" +
"local acquired = redis.call('zcount', KEYS[3], 0, '+inf'); " +
"local maximum = (acquired == false and 0 or acquired) + tonumber(available); " +
"if (maximum == ARGV[1]) then " +
"return;" +
"end;" +
"redis.call('incrby', KEYS[1], tonumber(ARGV[1]) - maximum); " +
"redis.call('publish', KEYS[2], ARGV[1]);",
Arrays.<Object>asList(getRawName(), channelName, timeoutName), permits);
}
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

@ -102,20 +102,43 @@ public interface RPermitExpirableSemaphore extends RExpirable, RPermitExpirableS
void release(String permitId);
/**
* Returns amount of available permits.
* Returns number of available permits.
*
* @return number of permits
* @return number of available permits
*/
int availablePermits();
/**
* Tries to set number of permits.
* Returns the number of permits.
*
* @return number of permits
*/
int getPermits();
/**
* Returns the number of acquired permits.
*
* @return number of acquired permits
*/
int acquiredPermits();
/**
* Tries to set the initial number of available permits.
*
* @param permits - number of permits
* @return <code>true</code> if permits has been set successfully, otherwise <code>false</code>.
*/
boolean trySetPermits(int permits);
/**
* Sets the number of permits to the provided value.
* Calculates the <code>delta</code> between the given <code>permits</code> value and the
* current number of permits, then increases the number of available permits by <code>delta</code>.
*
* @param permits - number of permits
*/
void setPermits(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*

@ -99,20 +99,43 @@ public interface RPermitExpirableSemaphoreAsync extends RExpirableAsync {
RFuture<Void> releaseAsync(String permitId);
/**
* Returns amount of available permits.
* Returns number of available permits.
*
* @return number of permits
*/
RFuture<Integer> availablePermitsAsync();
/**
* Tries to set number of permits.
* Returns the number of permits.
*
* @return number of permits
*/
RFuture<Integer> getPermitsAsync();
/**
* Returns the number of acquired permits.
*
* @return number of acquired permits
*/
RFuture<Integer> acquiredPermitsAsync();
/**
* Tries to set number of available permits.
*
* @param permits - number of permits
* @return <code>true</code> if permits has been set successfully, otherwise <code>false</code>.
*/
RFuture<Boolean> trySetPermitsAsync(int permits);
/**
* Sets the number of permits to the provided value.
* Calculates the <code>delta</code> between the given <code>permits</code> value and the
* current number of permits, then increases the number of available permits by <code>delta</code>.
*
* @param permits - number of permits
*/
RFuture<Void> setPermitsAsync(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*

@ -107,6 +107,20 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
*/
Mono<Integer> availablePermits();
/**
* Returns the number of permits.
*
* @return number of permits
*/
Mono<Integer> getPermits();
/**
* Returns the number of acquired permits.
*
* @return number of acquired permits
*/
Mono<Integer> acquiredPermits();
/**
* Tries to set number of permits.
*
@ -115,6 +129,15 @@ public interface RPermitExpirableSemaphoreReactive extends RExpirableReactive {
*/
Mono<Boolean> trySetPermits(int permits);
/**
* Sets the number of permits to the provided value.
* Calculates the <code>delta</code> between the given <code>permits</code> value and the
* current number of permits, then increases the number of available permits by <code>delta</code>.
*
* @param permits - number of permits
*/
Mono<Void> setPermits(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*

@ -207,6 +207,20 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Single<Integer> availablePermits();
/**
* Returns the number of permits.
*
* @return number of permits
*/
Single<Integer> getPermits();
/**
* Returns the number of acquired permits.
*
* @return number of acquired permits
*/
Single<Integer> acquiredPermits();
/**
* Sets number of permits.
*
@ -215,6 +229,15 @@ public interface RPermitExpirableSemaphoreRx extends RExpirableRx {
*/
Single<Boolean> trySetPermits(int permits);
/**
* Sets the number of permits to the provided value.
* Calculates the <code>delta</code> between the given <code>permits</code> value and the
* current number of permits, then increases the number of available permits by <code>delta</code>.
*
* @param permits - number of permits
*/
Single<Void> setPermits(int permits);
/**
* Increases or decreases the number of available permits by defined value.
*

@ -103,6 +103,45 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
Assertions.assertEquals(2, semaphore.availablePermits());
}
@Test
public void testAcquiredPermits() throws InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test-semaphore");
assertThat(semaphore.trySetPermits(2)).isTrue();
Assertions.assertEquals(0, semaphore.acquiredPermits());
String acquire1 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire1).isNotNull();
Assertions.assertEquals(1, semaphore.acquiredPermits());
String acquire2 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire2).isNotNull();
String acquire3 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire3).isNull();
Assertions.assertEquals(2, semaphore.acquiredPermits());
Thread.sleep(1100);
String acquire4 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire4).isNotNull();
Thread.sleep(1100);
Assertions.assertEquals(0, semaphore.acquiredPermits());
}
@Test
public void testGetPermits() throws InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test-semaphore");
assertThat(semaphore.trySetPermits(2)).isTrue();
Assertions.assertEquals(2, semaphore.getPermits());
String acquire1 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire1).isNotNull();
String acquire2 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire2).isNotNull();
String acquire3 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire3).isNull();
Assertions.assertEquals(2, semaphore.getPermits());
Thread.sleep(1100);
String acquire4 = semaphore.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire4).isNotNull();
Thread.sleep(1100);
Assertions.assertEquals(2, semaphore.getPermits());
}
@Test
public void testExpiration() throws InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("some-key");
@ -139,7 +178,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
t.start();
t.join();
assertThat(s.tryRelease(permitId)).isFalse();
assertThat(bool.get()).isTrue();
}
@ -170,7 +209,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
t.start();
t.join();
assertThat(s.tryRelease(permitId)).isFalse();
assertThat(bool.get()).isTrue();
}
@ -184,7 +223,65 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(s.trySetPermits(15)).isFalse();
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testSetPermits() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.setPermits(10);
assertThat(s.getPermits()).isEqualTo(10);
assertThat(s.availablePermits()).isEqualTo(10);
assertThat(s.acquiredPermits()).isEqualTo(0);
// attempts to set available permits fail
assertThat(s.trySetPermits(15)).isFalse();
assertThat(s.getPermits()).isEqualTo(10);
assertThat(s.availablePermits()).isEqualTo(10);
assertThat(s.acquiredPermits()).isEqualTo(0);
// attempts to set max permits succeeds
s.setPermits(15);
assertThat(s.getPermits()).isEqualTo(15);
assertThat(s.availablePermits()).isEqualTo(15);
assertThat(s.acquiredPermits()).isEqualTo(0);
// setting to existing value succeeds
s.setPermits(15);
assertThat(s.getPermits()).isEqualTo(15);
assertThat(s.availablePermits()).isEqualTo(15);
assertThat(s.acquiredPermits()).isEqualTo(0);
// decreasing max permits succeeds
s.setPermits(5);
assertThat(s.getPermits()).isEqualTo(5);
assertThat(s.availablePermits()).isEqualTo(5);
assertThat(s.acquiredPermits()).isEqualTo(0);
// changing the max after acquiring permits succeeds
String acquire1 = s.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire1).isNotNull();
String acquire2 = s.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire2).isNotNull();
String acquire3 = s.tryAcquire(200, 1000, TimeUnit.MILLISECONDS);
assertThat(acquire3).isNotNull();
assertThat(s.getPermits()).isEqualTo(5);
assertThat(s.availablePermits()).isEqualTo(2);
assertThat(s.acquiredPermits()).isEqualTo(3);
// decreasing the max to the number of claimed permits is allowed
s.setPermits(3);
assertThat(s.getPermits()).isEqualTo(3);
assertThat(s.availablePermits()).isEqualTo(0);
assertThat(s.acquiredPermits()).isEqualTo(3);
// decreasing the max to below the number of claimed permits is allowed
// and results in a negative number of available permits
s.setPermits(2);
assertThat(s.getPermits()).isEqualTo(2);
assertThat(s.availablePermits()).isEqualTo(-1);
assertThat(s.acquiredPermits()).isEqualTo(3);
}
@Test
public void testAddPermits() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
@ -194,6 +291,8 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(s.availablePermits()).isEqualTo(15);
s.addPermits(-10);
assertThat(s.availablePermits()).isEqualTo(5);
s.addPermits(-10);
assertThat(s.availablePermits()).isEqualTo(-5);
}
@Test

Loading…
Cancel
Save