Fixed - RRateLimiter decreases limit over the time in highly concurrent environment. #3804

pull/3848/head
Nikita Koksharov 3 years ago
parent 33e5b5872b
commit d30a636862

@ -200,7 +200,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
+ "local random, permits = struct.unpack('fI', v);"
+ "local random, permits = struct.unpack('Lc0I', v);"
+ "released = released + permits;"
+ "end; "
@ -218,13 +218,13 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
+ "return 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
+ "else "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Lc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end; "
+ "else "
+ "redis.call('set', valueName, rate); "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Lc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
+ "end;",
@ -318,7 +318,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[1]) - interval); "
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
+ "local random, permits = struct.unpack('fI', v);"
+ "local random, permits = struct.unpack('Lc0I', v);"
+ "released = released + permits;"
+ "end; "

@ -1,25 +1,49 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonRateLimiterTest extends BaseTest {
@Test
public void testRateValue() throws InterruptedException {
RRateLimiter rateLimiter = redisson.getRateLimiter("test1");
int rate = 10_000;
rateLimiter.setRate(RateType.OVERALL, rate, 10_000, RateIntervalUnit.MILLISECONDS);
ExecutorService e = Executors.newFixedThreadPool(200);
for (int i = 0; i < 200; i++) {
e.execute(() -> {
while (true) {
rateLimiter.acquire();
}
});
}
RScoredSortedSet<Object> sortedSet = redisson.getScoredSortedSet("{test1}:permits");
List<Integer> sizes = new ArrayList<>();
for (int i = 0; i < 20; i++) {
sizes.add(sortedSet.size());
Thread.sleep(1000);
}
assertThat(sizes.stream().filter(s -> s == rate).count()).isGreaterThan(16);
e.shutdownNow();
}
@Test
public void testExpire() throws InterruptedException {
RRateLimiter rr = redisson.getRateLimiter("limiter");

Loading…
Cancel
Save