refactoring

pull/5038/head
Nikita Koksharov 2 years ago
parent 7ca3f05ee1
commit e63d1c77cf

@ -15,15 +15,6 @@
*/
package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RTopic;
@ -33,6 +24,9 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
*
* @author Nikita Koksharov
@ -60,7 +54,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredValues > 0 then "
+ "for i, v in ipairs(expiredValues) do "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "redis.call('rpush', KEYS[1], value);"
+ "redis.call('lrem', KEYS[3], 1, v);"
+ "end; "
@ -100,10 +94,10 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
long delayInMs = timeUnit.toMillis(delay);
long timeout = System.currentTimeMillis() + delayInMs;
long randomId = ThreadLocalRandom.current().nextLong();
byte[] random = getServiceManager().generateIdArray(8);
return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
"local value = struct.pack('Bc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]);"
+ "redis.call('zadd', KEYS[2], ARGV[1], value);"
+ "redis.call('rpush', KEYS[3], value);"
// if new object added to queue head when publish its startTime
@ -113,7 +107,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "redis.call('publish', KEYS[4], ARGV[1]); "
+ "end;",
Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
timeout, randomId, encode(e));
timeout, random, encode(e));
}
@Override
@ -173,7 +167,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return (V) get(commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); "
+ "if v ~= false then "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "return value; "
+ "end "
+ "return nil;",
@ -184,7 +178,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
get(commandExecutor.evalWriteAsync(getRawName(), null, RedisCommands.EVAL_VOID,
"local v = redis.call('lindex', KEYS[1], ARGV[1]);" +
"if v ~= false then " +
"local randomId, value = struct.unpack('dLc0', v);" +
"local randomId, value = struct.unpack('Bc0Lc0', v);" +
"redis.call('lrem', KEYS[1], 1, v);" +
"redis.call('zrem', KEYS[2], v);" +
"end; ",
@ -266,7 +260,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local result = {}; " +
"local items = redis.call('lrange', KEYS[1], 0, -1); "
+ "for i, v in ipairs(items) do "
+ "local randomId, value = struct.unpack('dLc0', v); "
+ "local randomId, value = struct.unpack('Bc0Lc0', v); "
+ "table.insert(result, value);"
+ "end; "
+ "return result; ",
@ -281,7 +275,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local v = redis.call('lpop', KEYS[1]);" +
"if v ~= false then " +
"redis.call('zrem', KEYS[2], v); " +
"local randomId, value = struct.unpack('dLc0', v);" +
"local randomId, value = struct.unpack('Bc0Lc0', v);" +
"table.insert(result, value);" +
"else " +
"return result;" +
@ -306,7 +300,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s-1, 1 do "
+ "local v = redis.call('lindex', KEYS[1], i);"
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "if ARGV[1] == value then "
+ "redis.call('zrem', KEYS[2], v);"
+ "redis.call('lrem', KEYS[1], 1, v);"
@ -327,7 +321,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s-1, 1 do "
+ "local v = redis.call('lindex', KEYS[1], i);"
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "for j = 1, #ARGV, 1 do "
+ "if value == ARGV[j] then "
@ -361,7 +355,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local i = 0;" +
"while i < s do "
+ "local v = redis.call('lindex', KEYS[1], i);"
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "for j = 1, #ARGV, 1 do "
+ "if value == ARGV[j] then "
@ -400,7 +394,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local items = redis.call('lrange', KEYS[1], 0, -1); "
+ "local i = 1; "
+ "while i <= #items do "
+ "local randomId, element = struct.unpack('dLc0', items[i]); "
+ "local randomId, element = struct.unpack('Bc0Lc0', items[i]); "
+ "local isInAgrs = false; "
+ "for j = 1, #ARGV, 1 do "
+ "if ARGV[j] == element then "
@ -454,7 +448,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], 0); "
+ "if v ~= false then "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "return value; "
+ "end "
+ "return nil;",
@ -467,7 +461,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local v = redis.call('lpop', KEYS[1]); "
+ "if v ~= false then "
+ "redis.call('zrem', KEYS[2], v); "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "return value; "
+ "end "
+ "return nil;",
@ -485,7 +479,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local v = redis.call('rpop', KEYS[1]); "
+ "if v ~= false then "
+ "redis.call('zrem', KEYS[2], v); "
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "redis.call('lpush', KEYS[3], value); "
+ "return value; "
+ "end "
@ -499,7 +493,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
"local s = redis.call('llen', KEYS[1]);" +
"for i = 0, s-1, 1 do "
+ "local v = redis.call('lindex', KEYS[1], i);"
+ "local randomId, value = struct.unpack('dLc0', v);"
+ "local randomId, value = struct.unpack('Bc0Lc0', v);"
+ "if ARGV[1] == value then "
+ "return 1;"
+ "end; "

@ -424,7 +424,10 @@ public class ServiceManager {
}
public byte[] generateIdArray() {
byte[] id = new byte[16];
return generateIdArray(16);
}
public byte[] generateIdArray(int size) {
byte[] id = new byte[size];
ThreadLocalRandom.current().nextBytes(id);
return id;
}

Loading…
Cancel
Save