diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index 6a53029a1..fa58c0cc4 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -200,7 +200,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); " + "local released = 0; " + "for i, v in ipairs(expiredValues) do " - + "local random, permits = struct.unpack('fI', v);" + + "local random, permits = struct.unpack('Lc0I', v);" + "released = released + permits;" + "end; " @@ -218,13 +218,13 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); " + "return 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));" + "else " - + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); " + + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Lc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end; " + "else " + "redis.call('set', valueName, rate); " - + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); " + + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Lc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); " + "redis.call('decrby', valueName, ARGV[1]); " + "return nil; " + "end;", @@ -318,7 +318,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[1]) - interval); " + "local released = 0; " + "for i, v in ipairs(expiredValues) do " - + "local random, permits = struct.unpack('fI', v);" + + "local random, permits = struct.unpack('Lc0I', v);" + "released = released + permits;" + "end; " diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java index a49fbaaf3..fb3d26006 100644 --- a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -1,25 +1,49 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; - -import java.time.Duration; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RRateLimiter; +import org.redisson.api.RScoredSortedSet; import org.redisson.api.RateIntervalUnit; import org.redisson.api.RateType; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; + public class RedissonRateLimiterTest extends BaseTest { + @Test + public void testRateValue() throws InterruptedException { + RRateLimiter rateLimiter = redisson.getRateLimiter("test1"); + int rate = 10_000; + rateLimiter.setRate(RateType.OVERALL, rate, 10_000, RateIntervalUnit.MILLISECONDS); + + ExecutorService e = Executors.newFixedThreadPool(200); + for (int i = 0; i < 200; i++) { + e.execute(() -> { + while (true) { + rateLimiter.acquire(); + } + }); + } + + RScoredSortedSet sortedSet = redisson.getScoredSortedSet("{test1}:permits"); + List sizes = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + sizes.add(sortedSet.size()); + Thread.sleep(1000); + } + + assertThat(sizes.stream().filter(s -> s == rate).count()).isGreaterThan(16); + e.shutdownNow(); + } + @Test public void testExpire() throws InterruptedException { RRateLimiter rr = redisson.getRateLimiter("limiter");