Review fixes:

- use byte arrays as keys

Signed-off-by: Sergey Kuznetsov <sergey.kuznetsov@infobip.com>
pull/5247/head
Sergey Kuznetsov 2 years ago
parent e24d413119
commit d513705bc6

@ -15,6 +15,7 @@
*/
package org.redisson;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture;
@ -26,10 +27,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.pubsub.SemaphorePubSub;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -373,36 +371,47 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
private RFuture<List<String>> tryAcquireAsync(List<String> ids, long timeoutDate) {
List<Object> params = new ArrayList<>();
params.add(ids.size());
params.add(timeoutDate);
params.add(System.currentTimeMillis());
params.add(nonExpirableTimeout);
params.add(getSubscribeService().getPublishCommand());
for (String permitId: ids) {
params.add(ByteBufUtil.decodeHexDump(permitId));
}
CompletionStage<List<String>> future = commandExecutor.syncedEval(getRawName(), ByteArrayCodec.INSTANCE, RedisCommands.EVAL_STRING,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[4], 'limit', 0, ARGV[1]); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[6], KEYS[3], value); " +
"end;" +
"end; " +
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"redis.call('decrby', KEYS[1], ARGV[1]); " +
"for key in string.gmatch(ARGV[3], '%w+') do " +
"redis.call('zadd', KEYS[2], ARGV[2], key); " +
"end; " +
"local ttl = redis.call('pttl', KEYS[1]); " +
"if ttl > 0 then " +
"redis.call('pexpire', KEYS[2], ttl); " +
"end; " +
"return 'OK'; " +
"end; " +
"local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " +
"if v[1] ~= nil and v[2] ~= ARGV[5] then " +
"return ':' .. tostring(v[2]); " +
"end " +
"return nil;",
Arrays.asList(getRawName(), timeoutName, channelName),
ids.size(), timeoutDate, ids, System.currentTimeMillis(), nonExpirableTimeout, getSubscribeService().getPublishCommand()
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, ARGV[1]); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[5], KEYS[3], value); " +
"end;" +
"end; " +
"local value = redis.call('get', KEYS[1]); " +
"if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
"redis.call('decrby', KEYS[1], ARGV[1]); " +
"for i = 6, #ARGV, 1 do " +
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); " +
"end; " +
"local ttl = redis.call('pttl', KEYS[1]); " +
"if ttl > 0 then " +
"redis.call('pexpire', KEYS[2], ttl); " +
"end; " +
"return 'OK'; " +
"end; " +
"local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " +
"if v[1] ~= nil and v[2] ~= ARGV[4] then " +
"return ':' .. tostring(v[2]); " +
"end " +
"return nil;",
Arrays.asList(getRawName(), timeoutName, channelName),
params.toArray()
).thenApply(result -> {
if (result == null) {
return Collections.emptyList();
@ -608,6 +617,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
throw new IllegalArgumentException("permitId can't be null");
}
byte[] id = ByteBufUtil.decodeHexDump(permitId);
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expire = redis.call('zscore', KEYS[3], ARGV[1]);" +
"local removed = redis.call('zrem', KEYS[3], ARGV[1]);" +
@ -621,7 +631,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end;" +
"return 1;",
Arrays.asList(getRawName(), channelName, timeoutName),
permitId, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
id, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
@ -630,28 +640,37 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
throw new IllegalArgumentException("permitIds can't be null or empty");
}
List<Object> params = new ArrayList<>(permitsIds.size() + 3);
params.add(permitsIds.size());
params.add(System.currentTimeMillis());
params.add(getSubscribeService().getPublishCommand());
for (String permitId : permitsIds) {
params.add(ByteBufUtil.decodeHexDump(permitId));
}
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredIds = redis.call('zrangebyscore', KEYS[3], 0, ARGV[3], 'limit', 0, -1); " +
"local expiredIds = redis.call('zrangebyscore', KEYS[3], 0, ARGV[2], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
"redis.call('zrem', KEYS[3], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call(ARGV[4], KEYS[2], value); " +
"redis.call(ARGV[3], KEYS[2], value); " +
"end; " +
"end; " +
"local keys = {}; " +
"for key in string.gmatch(ARGV[1], '%w+') do " +
"table.insert(keys, key); " +
"for i = 4, #ARGV, 1 do " +
"table.insert(keys, ARGV[i]); " +
"end; " +
"local removed = redis.call('zrem', KEYS[3], unpack(keys)); " +
"if tonumber(removed) == 0 then " +
"return 0;" +
"end; " +
"redis.call('incrby', KEYS[1], removed); " +
"redis.call(ARGV[4], KEYS[2], removed); " +
"redis.call(ARGV[3], KEYS[2], removed); " +
"return removed;",
Arrays.asList(getRawName(), channelName, timeoutName),
permitsIds, permitsIds.size(), System.currentTimeMillis(), getSubscribeService().getPublishCommand());
params.toArray());
}
@Override
@ -840,6 +859,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
@Override
public RFuture<Boolean> updateLeaseTimeAsync(String permitId, long leaseTime, TimeUnit unit) {
long timeoutDate = calcTimeout(leaseTime, unit);
byte[] id = ByteBufUtil.decodeHexDump(permitId);
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local expiredIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[3], 'limit', 0, -1); " +
"if #expiredIds > 0 then " +
@ -857,7 +877,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "end;"
+ "return 0;",
Arrays.asList(getRawName(), timeoutName, channelName),
permitId, timeoutDate, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
id, timeoutDate, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override

Loading…
Cancel
Save