diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index c057c7cc9..e9bf767e8 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -211,8 +211,8 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit + "end;" + "if tonumber(currentValue) < tonumber(ARGV[1]) then " - + "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); " - + "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);" + + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); " + + "return 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));" + "else " + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java index c2a245e0b..a49fbaaf3 100644 --- a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -184,6 +184,56 @@ public class RedissonRateLimiterTest extends BaseTest { assertThat(deleted).isTrue(); } + @Test + public void testConcurrency2() throws InterruptedException { + RRateLimiter rr = redisson.getRateLimiter("test"); + rr.trySetRate(RateType.OVERALL, 18, 1, RateIntervalUnit.SECONDS); + + Queue queue = new ConcurrentLinkedQueue(); + AtomicLong counter = new AtomicLong(); + ExecutorService pool = Executors.newFixedThreadPool(8); + for (int i = 0; i < 8; i++) { + pool.execute(new Runnable() { + @Override + public void run() { + try { + while (true) { + rr.acquire(); + queue.add(System.currentTimeMillis()); + if (counter.incrementAndGet() > 1000) { + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + + pool.shutdown(); + assertThat(pool.awaitTermination(2, TimeUnit.MINUTES)).isTrue(); + + int count = 0; + long start = 0; + boolean skip = true; + for (Long value : queue) { + if (start == 0) { + start = value; + } + count++; + if (value - start >= 1000) { + if (!skip) { + assertThat(count).isLessThanOrEqualTo(18); + } else { + skip = false; + } + start = 0; + count = 0; + } + } + } + @Test public void testConcurrency() throws InterruptedException { RRateLimiter rr = redisson.getRateLimiter("test");