Merge pull request #6348 from seakider/fix_releaseAsync

Fixed - RPermitExpirableSemaphore.release
pull/6356/head
Nikita Koksharov 1 month ago committed by GitHub
commit a0bac462fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -657,6 +657,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"for i = 4, #ARGV, 1 do " +
"local expire = redis.call('zscore', KEYS[3], ARGV[i]);" +
"if expire== false or tonumber(expire) <= tonumber(ARGV[2]) then " +
"return 0;" +
"end; " +
"end; " +
"local expiredIds = redis.call('zrangebyscore', KEYS[3], 0, ARGV[2], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[3], unpack(expiredIds)); " +
@ -670,9 +676,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"table.insert(keys, ARGV[i]); " +
"end; " +
"local removed = redis.call('zrem', KEYS[3], unpack(keys)); " +
"if tonumber(removed) == 0 then " +
"return 0;" +
"end; " +
"redis.call('incrby', KEYS[1], removed); " +
"redis.call(ARGV[3], KEYS[2], removed); " +
"return removed;",
@ -736,6 +739,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (res == permitsIds.size()) {
return null;
}
throw new CompletionException(new IllegalArgumentException("Permits with ids " + permitsIds + " have already been released or don't exist"));
});
return new CompletableFutureWrapper<>(f);
@ -931,7 +935,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
if (e != null) {
throw new CompletionException(e);
}
if (res == 0) {
throw new CompletionException(new IllegalArgumentException("Permit with id " + permitId + " has already been released or doesn't exist"));
}

@ -476,6 +476,7 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(semaphore.availablePermits()).isEqualTo(10);
Assertions.assertThrows(RedisException.class, () -> semaphore.release(permitsIds));
}
@Test
@ -553,6 +554,38 @@ public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
assertThat(released).isEqualTo(0);
assertThat(s.availablePermits()).isEqualTo(10);
}
@Test
public void testReleaseManyNotExistOrExpired() throws InterruptedException {
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test");
s.trySetPermits(10);
List<String> timedPermitsIds = s.acquire(2, 100, TimeUnit.MILLISECONDS);
List<String> permitsIds = s.tryAcquire(8);
List<String> permitsIdsFirstPart = permitsIds.subList(0, 2);
int released = s.tryRelease(permitsIdsFirstPart);
assertThat(released).isEqualTo(2);
assertThat(s.availablePermits()).isEqualTo(2);
Thread.sleep(100);
Assertions.assertThrows(RedisException.class, () -> s.release(timedPermitsIds));
assertThat(s.availablePermits()).isEqualTo(4);
List<String> permitsIdsSecondPart = permitsIds.subList(2, 4);
permitsIdsSecondPart.addAll(permitsIdsFirstPart);
Assertions.assertThrows(RedisException.class, () -> s.release(permitsIdsSecondPart));
assertThat(s.availablePermits()).isEqualTo(4);
Assertions.assertThrows(RedisException.class, () -> s.release(List.of("1234")));
assertThat(s.availablePermits()).isEqualTo(4);
List<String> permitsIdsThirdPart = permitsIds.subList(4, 6);
permitsIdsThirdPart.add("4567");
Assertions.assertThrows(RedisException.class, () -> s.release(permitsIdsThirdPart));
assertThat(s.availablePermits()).isEqualTo(4);
}
@Test
public void testGetLeaseTime() throws InterruptedException {

Loading…
Cancel
Save