From e63d1c77cf9e3a0a3bc099663da6dcb00c075724 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 3 May 2023 10:45:56 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonDelayedQueue.java | 46 ++++++++----------- .../redisson/connection/ServiceManager.java | 5 +- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index 27a20ad4c..58a2a7d5d 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -15,15 +15,6 @@ */ package org.redisson; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - import org.redisson.api.RDelayedQueue; import org.redisson.api.RFuture; import org.redisson.api.RTopic; @@ -33,6 +24,9 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.CompletableFutureWrapper; +import java.util.*; +import java.util.concurrent.TimeUnit; + /** * * @author Nikita Koksharov @@ -60,7 +54,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " @@ -100,10 +94,10 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; - - long randomId = ThreadLocalRandom.current().nextLong(); + + byte[] random = getServiceManager().generateIdArray(8); return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID, - "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime @@ -113,7 +107,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;", Arrays.asList(getRawName(), timeoutSetName, queueName, channelName), - timeout, randomId, encode(e)); + timeout, random, encode(e)); } @Override @@ -173,7 +167,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay return (V) get(commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "if v ~= false then " - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "return value; " + "end " + "return nil;", @@ -184,7 +178,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay get(commandExecutor.evalWriteAsync(getRawName(), null, RedisCommands.EVAL_VOID, "local v = redis.call('lindex', KEYS[1], ARGV[1]);" + "if v ~= false then " + - "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "redis.call('lrem', KEYS[1], 1, v);" + "redis.call('zrem', KEYS[2], v);" + "end; ", @@ -266,7 +260,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local result = {}; " + "local items = redis.call('lrange', KEYS[1], 0, -1); " + "for i, v in ipairs(items) do " - + "local randomId, value = struct.unpack('dLc0', v); " + + "local randomId, value = struct.unpack('Bc0Lc0', v); " + "table.insert(result, value);" + "end; " + "return result; ", @@ -281,7 +275,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local v = redis.call('lpop', KEYS[1]);" + "if v ~= false then " + "redis.call('zrem', KEYS[2], v); " + - "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "table.insert(result, value);" + "else " + "return result;" + @@ -306,7 +300,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local s = redis.call('llen', KEYS[1]);" + "for i = 0, s-1, 1 do " + "local v = redis.call('lindex', KEYS[1], i);" - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "if ARGV[1] == value then " + "redis.call('zrem', KEYS[2], v);" + "redis.call('lrem', KEYS[1], 1, v);" @@ -327,7 +321,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local s = redis.call('llen', KEYS[1]);" + "for i = 0, s-1, 1 do " + "local v = redis.call('lindex', KEYS[1], i);" - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "for j = 1, #ARGV, 1 do " + "if value == ARGV[j] then " @@ -361,7 +355,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local i = 0;" + "while i < s do " + "local v = redis.call('lindex', KEYS[1], i);" - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "for j = 1, #ARGV, 1 do " + "if value == ARGV[j] then " @@ -400,7 +394,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local items = redis.call('lrange', KEYS[1], 0, -1); " + "local i = 1; " + "while i <= #items do " - + "local randomId, element = struct.unpack('dLc0', items[i]); " + + "local randomId, element = struct.unpack('Bc0Lc0', items[i]); " + "local isInAgrs = false; " + "for j = 1, #ARGV, 1 do " + "if ARGV[j] == element then " @@ -454,7 +448,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], 0); " + "if v ~= false then " - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "return value; " + "end " + "return nil;", @@ -467,7 +461,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local v = redis.call('lpop', KEYS[1]); " + "if v ~= false then " + "redis.call('zrem', KEYS[2], v); " - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "return value; " + "end " + "return nil;", @@ -485,7 +479,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local v = redis.call('rpop', KEYS[1]); " + "if v ~= false then " + "redis.call('zrem', KEYS[2], v); " - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "redis.call('lpush', KEYS[3], value); " + "return value; " + "end " @@ -499,7 +493,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay "local s = redis.call('llen', KEYS[1]);" + "for i = 0, s-1, 1 do " + "local v = redis.call('lindex', KEYS[1], i);" - + "local randomId, value = struct.unpack('dLc0', v);" + + "local randomId, value = struct.unpack('Bc0Lc0', v);" + "if ARGV[1] == value then " + "return 1;" + "end; " diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 54cf488b5..6eb7589e2 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -424,7 +424,10 @@ public class ServiceManager { } public byte[] generateIdArray() { - byte[] id = new byte[16]; + return generateIdArray(16); + } + public byte[] generateIdArray(int size) { + byte[] id = new byte[size]; ThreadLocalRandom.current().nextBytes(id); return id; }