Feature - RCountDownLatch, RLock, RPermitExpirableSemaphore, RSemaphore objects uses sharded PubSub in Redis Cluster 7.0+. #5047

pull/5079/head
Nikita Koksharov 2 years ago
parent 52aa237a6b
commit 823c786a19

@ -253,8 +253,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('decr', KEYS[1]);" +
"if v <= 0 then redis.call('del', KEYS[1]) end;" +
"if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
Arrays.<Object>asList(getRawName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE);
"if v == 0 then redis.call(ARGV[2], KEYS[2], ARGV[1]) end;",
Arrays.<Object>asList(getRawName(), getChannelName()),
CountDownLatchPubSub.ZERO_COUNT_MESSAGE, getSubscribeService().getPublishCommand());
}
private String getEntryName() {
@ -285,24 +286,26 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('exists', KEYS[1]) == 0 then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "redis.call(ARGV[3], KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count);
Arrays.asList(getRawName(), getChannelName()),
CountDownLatchPubSub.NEW_COUNT_MESSAGE, count, getSubscribeService().getPublishCommand());
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE);
Arrays.asList(getRawName(), getChannelName()),
CountDownLatchPubSub.NEW_COUNT_MESSAGE, getSubscribeService().getPublishCommand());
}
}

@ -254,7 +254,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"redis.call(ARGV[5], KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
@ -270,11 +270,12 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"redis.call(ARGV[5], KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
@ -332,13 +333,14 @@ public class RedissonFairLock extends RedissonLock implements RLock {
"if (redis.call('del', KEYS[1]) == 1) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"redis.call(ARGV[3], KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end; " +
"return 0;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis());
LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis(),
getSubscribeService().getPublishCommand());
}
}

@ -317,12 +317,12 @@ public class RedissonLock extends RedissonBaseLock {
cancelExpirationRenewal(null);
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else "
+ "return 0 "
+ "end",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, getSubscribeService().getPublishCommand());
}
@ -338,11 +338,12 @@ public class RedissonLock extends RedissonBaseLock {
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
Arrays.asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());
}
@Override

@ -26,6 +26,7 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ServiceManager;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.Hash;
import org.redisson.pubsub.PublishSubscribeService;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@ -485,4 +486,8 @@ public abstract class RedissonObject implements RObject {
.collect(Collectors.toList());
}
protected PublishSubscribeService getSubscribeService() {
return commandExecutor.getConnectionManager().getSubscribeService();
}
}

@ -43,8 +43,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
private final String channelName;
private final SemaphorePubSub semaphorePubSub;
final CommandAsyncExecutor commandExecutor;
private final String timeoutName;
private final long nonExpirableTimeout = 922337203685477L;
@ -52,7 +50,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RedissonPermitExpirableSemaphore(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.timeoutName = suffixName(getRawName(), "timeout");
this.commandExecutor = commandExecutor;
this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub();
this.channelName = prefixName("redisson_sc", getRawName());
}
@ -355,7 +352,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"redis.call(ARGV[6], KEYS[3], value); " +
"end;" +
"end; " +
"local value = redis.call('get', KEYS[1]); " +
@ -376,7 +373,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end " +
"return nil;",
Arrays.asList(getRawName(), timeoutName, channelName),
permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout);
permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout, getSubscribeService().getPublishCommand());
}
@Override
@ -566,13 +563,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return 0;" +
"end;" +
"local value = redis.call('incrby', KEYS[1], ARGV[2]); " +
"redis.call('publish', KEYS[2], value); " +
"redis.call(ARGV[4], KEYS[2], value); " +
"if tonumber(expire) <= tonumber(ARGV[3]) then " +
"return 0;" +
"end;" +
"return 1;",
Arrays.asList(getRawName(), channelName, timeoutName),
id, 1, System.currentTimeMillis());
id, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
@ -639,13 +636,14 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"redis.call(ARGV[2], KEYS[3], value); " +
"end;" +
"return value; " +
"end; " +
"local ret = redis.call('get', KEYS[1]); " +
"return ret == false and 0 or ret;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
@ -656,7 +654,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"redis.call(ARGV[2], KEYS[3], value); " +
"end;" +
"end; " +
"local available = redis.call('get', KEYS[1]); " +
@ -668,7 +666,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return tonumber(available) " +
"end;" +
"return tonumber(available) + acquired;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
@ -679,12 +678,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"redis.call(ARGV[2], KEYS[3], value); " +
"end;" +
"end; " +
"local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " +
"return acquired == false and 0 or acquired;",
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
Arrays.<Object>asList(getRawName(), timeoutName, channelName),
System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override
@ -703,7 +703,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"local available = redis.call('get', KEYS[1]); " +
"if (available == false) then " +
"redis.call('set', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]); " +
"return;" +
"end;" +
"local acquired = redis.call('zcount', KEYS[3], 0, '+inf'); " +
@ -712,8 +712,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return;" +
"end;" +
"redis.call('incrby', KEYS[1], tonumber(ARGV[1]) - maximum); " +
"redis.call('publish', KEYS[2], ARGV[1]);",
Arrays.<Object>asList(getRawName(), channelName, timeoutName), permits);
"redis.call(ARGV[2], KEYS[2], ARGV[1]);",
Arrays.<Object>asList(getRawName(), channelName, timeoutName),
permits, getSubscribeService().getPublishCommand());
}
@Override
@ -722,11 +723,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getRawName(), channelName), permits);
Arrays.<Object>asList(getRawName(), channelName),
permits, getSubscribeService().getPublishCommand());
}
@Override
@ -743,9 +745,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "end;"
+ "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); "
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "end;",
Arrays.asList(getRawName(), channelName), permits);
Arrays.asList(getRawName(), channelName), permits, getSubscribeService().getPublishCommand());
}
@Override
@ -758,7 +760,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"redis.call('zrem', KEYS[2], unpack(expiredIds)); " +
"local value = redis.call('incrby', KEYS[1], #expiredIds); " +
"if tonumber(value) > 0 then " +
"redis.call('publish', KEYS[3], value); " +
"redis.call(ARGV[4], KEYS[3], value); " +
"end;" +
"end; " +
@ -769,7 +771,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "end;"
+ "return 0;",
Arrays.asList(getRawName(), timeoutName, channelName),
id, timeoutDate, System.currentTimeMillis());
id, timeoutDate, System.currentTimeMillis(), getSubscribeService().getPublishCommand());
}
@Override

@ -88,7 +88,7 @@ public class RedissonReadLock extends RedissonLock implements RLock {
return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
@ -126,10 +126,10 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"end; " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[3], KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix),
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
LockPubSub.UNLOCK_MESSAGE, getLockName(threadId), getSubscribeService().getPublishCommand());
}
protected String getKeyPrefix(long threadId, String timeoutPrefix) {
@ -176,11 +176,12 @@ public class RedissonReadLock extends RedissonLock implements RLock {
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'read') then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0; ",
Arrays.<Object>asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
Arrays.asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE, getSubscribeService().getPublishCommand());
}
@Override

@ -49,12 +49,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
private final SemaphorePubSub semaphorePubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub();
this.semaphorePubSub = getSubscribeService().getSemaphorePubSub();
}
String getChannelName() {
@ -440,8 +437,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
RFuture<Void> future = commandExecutor.syncedEval(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
"redis.call('publish', KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits);
"redis.call(ARGV[2], KEYS[2], value); ",
Arrays.asList(getRawName(), getChannelName()), permits, getSubscribeService().getPublishCommand());
if (log.isDebugEnabled()) {
future.thenAccept(o -> {
log.debug("released, permits: {}, name: {}", permits, getName());
@ -488,11 +485,12 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
"local value = redis.call('get', KEYS[1]); " +
"if (value == false) then "
+ "redis.call('set', KEYS[1], ARGV[1]); "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "redis.call(ARGV[2], KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.asList(getRawName(), getChannelName()), permits);
Arrays.asList(getRawName(), getChannelName()),
permits, getSubscribeService().getPublishCommand());
if (log.isDebugEnabled()) {
future.thenAccept(r -> {
@ -519,8 +517,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
+ "value = 0;"
+ "end;"
+ "redis.call('set', KEYS[1], value + ARGV[1]); "
+ "redis.call('publish', KEYS[2], value + ARGV[1]); ",
Arrays.asList(getRawName(), getChannelName()), permits);
+ "redis.call(ARGV[2], KEYS[2], value + ARGV[1]); ",
Arrays.asList(getRawName(), getChannelName()),
permits, getSubscribeService().getPublishCommand());
}

@ -80,7 +80,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
"if (mode == false) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (mode == 'write') then " +
@ -96,7 +96,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"redis.call('hdel', KEYS[1], ARGV[3]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
@ -107,7 +107,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
"end; "
+ "return nil;",
Arrays.<Object>asList(getRawName(), getChannelName()),
LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());
}
@Override
@ -133,11 +133,12 @@ public class RedissonWriteLock extends RedissonLock implements RLock {
return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hget', KEYS[1], 'mode') == 'write') then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"redis.call(ARGV[2], KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0; ",
Arrays.<Object>asList(getRawName(), getChannelName()), LockPubSub.READ_UNLOCK_MESSAGE);
Arrays.asList(getRawName(), getChannelName()),
LockPubSub.READ_UNLOCK_MESSAGE, getSubscribeService().getPublishCommand());
}
@Override

@ -19,6 +19,7 @@ import io.netty.util.Timeout;
import org.redisson.PubSubPatternStatusListener;
import org.redisson.client.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
@ -878,6 +879,16 @@ public class PublishSubscribeService {
this.shardingSupported = value;
}
public boolean isShardingSupported() {
return shardingSupported;
}
public String getPublishCommand() {
if (shardingSupported) {
return RedisCommands.SPUBLISH.getName();
}
return RedisCommands.PUBLISH.getName();
}
@Override
public String toString() {
return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", entry2PubSubConnection=" + entry2PubSubConnection + "]";

Loading…
Cancel
Save