From 823c786a19c5a1f071795578b8fba0e4f6fb6940 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 30 May 2023 14:40:21 +0300 Subject: [PATCH] Feature - RCountDownLatch, RLock, RPermitExpirableSemaphore, RSemaphore objects uses sharded PubSub in Redis Cluster 7.0+. #5047 --- .../org/redisson/RedissonCountDownLatch.java | 15 +++--- .../java/org/redisson/RedissonFairLock.java | 12 +++-- .../main/java/org/redisson/RedissonLock.java | 9 ++-- .../java/org/redisson/RedissonObject.java | 5 ++ .../RedissonPermitExpirableSemaphore.java | 46 ++++++++++--------- .../java/org/redisson/RedissonReadLock.java | 11 +++-- .../java/org/redisson/RedissonSemaphore.java | 19 ++++---- .../java/org/redisson/RedissonWriteLock.java | 11 +++-- .../pubsub/PublishSubscribeService.java | 11 +++++ 9 files changed, 82 insertions(+), 57 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java index 49933c33b..a825bce00 100644 --- a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -253,8 +253,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local v = redis.call('decr', KEYS[1]);" + "if v <= 0 then redis.call('del', KEYS[1]) end;" + - "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;", - Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.ZERO_COUNT_MESSAGE); + "if v == 0 then redis.call(ARGV[2], KEYS[2], ARGV[1]) end;", + Arrays.asList(getRawName(), getChannelName()), + CountDownLatchPubSub.ZERO_COUNT_MESSAGE, getSubscribeService().getPublishCommand()); } private String getEntryName() { @@ -285,24 +286,26 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('exists', KEYS[1]) == 0 then " + "redis.call('set', KEYS[1], ARGV[2]); " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[3], KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", - Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE, count); + Arrays.asList(getRawName(), getChannelName()), + CountDownLatchPubSub.NEW_COUNT_MESSAGE, count, getSubscribeService().getPublishCommand()); } @Override public RFuture deleteAsync() { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('del', KEYS[1]) == 1 then " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", - Arrays.asList(getRawName(), getChannelName()), CountDownLatchPubSub.NEW_COUNT_MESSAGE); + Arrays.asList(getRawName(), getChannelName()), + CountDownLatchPubSub.NEW_COUNT_MESSAGE, getSubscribeService().getPublishCommand()); } } diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index 3b6fa0ac2..ebb12c67e 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -254,7 +254,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { + "if (redis.call('exists', KEYS[1]) == 0) then " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + + "redis.call(ARGV[5], KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; " + "end;" + @@ -270,11 +270,12 @@ public class RedissonFairLock extends RedissonLock implements RLock { "redis.call('del', KEYS[1]); " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + + "redis.call(ARGV[5], KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; ", Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()), - LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()); + LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), + System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @Override @@ -332,13 +333,14 @@ public class RedissonFairLock extends RedissonLock implements RLock { "if (redis.call('del', KEYS[1]) == 1) then " + "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + "if nextThreadId ~= false then " + - "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + + "redis.call(ARGV[3], KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " + "end; " + "return 1; " + "end; " + "return 0;", Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()), - LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis()); + LockPubSub.UNLOCK_MESSAGE, System.currentTimeMillis(), + getSubscribeService().getPublishCommand()); } } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java index 289a01d0f..4959a67b3 100644 --- a/redisson/src/main/java/org/redisson/RedissonLock.java +++ b/redisson/src/main/java/org/redisson/RedissonLock.java @@ -317,12 +317,12 @@ public class RedissonLock extends RedissonBaseLock { cancelExpirationRenewal(null); return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", - Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); + Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, getSubscribeService().getPublishCommand()); } @@ -338,11 +338,12 @@ public class RedissonLock extends RedissonBaseLock { "return 0; " + "else " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[4], KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", - Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); + Arrays.asList(getRawName(), getChannelName()), + LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index d5cd8bf4e..ec2834251 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -26,6 +26,7 @@ import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.ServiceManager; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.Hash; +import org.redisson.pubsub.PublishSubscribeService; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -485,4 +486,8 @@ public abstract class RedissonObject implements RObject { .collect(Collectors.toList()); } + protected PublishSubscribeService getSubscribeService() { + return commandExecutor.getConnectionManager().getSubscribeService(); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java index 3787cbf0d..ad4cb3ab6 100644 --- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java @@ -43,8 +43,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen private final String channelName; private final SemaphorePubSub semaphorePubSub; - final CommandAsyncExecutor commandExecutor; - private final String timeoutName; private final long nonExpirableTimeout = 922337203685477L; @@ -52,7 +50,6 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen public RedissonPermitExpirableSemaphore(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.timeoutName = suffixName(getRawName(), "timeout"); - this.commandExecutor = commandExecutor; this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub(); this.channelName = prefixName("redisson_sc", getRawName()); } @@ -355,7 +352,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + - "redis.call('publish', KEYS[3], value); " + + "redis.call(ARGV[6], KEYS[3], value); " + "end;" + "end; " + "local value = redis.call('get', KEYS[1]); " + @@ -376,7 +373,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "end " + "return nil;", Arrays.asList(getRawName(), timeoutName, channelName), - permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout); + permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout, getSubscribeService().getPublishCommand()); } @Override @@ -566,13 +563,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "return 0;" + "end;" + "local value = redis.call('incrby', KEYS[1], ARGV[2]); " + - "redis.call('publish', KEYS[2], value); " + + "redis.call(ARGV[4], KEYS[2], value); " + "if tonumber(expire) <= tonumber(ARGV[3]) then " + "return 0;" + "end;" + "return 1;", Arrays.asList(getRawName(), channelName, timeoutName), - id, 1, System.currentTimeMillis()); + id, 1, System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @Override @@ -639,13 +636,14 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + - "redis.call('publish', KEYS[3], value); " + + "redis.call(ARGV[2], KEYS[3], value); " + "end;" + "return value; " + "end; " + "local ret = redis.call('get', KEYS[1]); " + "return ret == false and 0 or ret;", - Arrays.asList(getRawName(), timeoutName, channelName), System.currentTimeMillis()); + Arrays.asList(getRawName(), timeoutName, channelName), + System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @Override @@ -656,7 +654,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + - "redis.call('publish', KEYS[3], value); " + + "redis.call(ARGV[2], KEYS[3], value); " + "end;" + "end; " + "local available = redis.call('get', KEYS[1]); " + @@ -668,7 +666,8 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "return tonumber(available) " + "end;" + "return tonumber(available) + acquired;", - Arrays.asList(getRawName(), timeoutName, channelName), System.currentTimeMillis()); + Arrays.asList(getRawName(), timeoutName, channelName), + System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @Override @@ -679,12 +678,13 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + - "redis.call('publish', KEYS[3], value); " + + "redis.call(ARGV[2], KEYS[3], value); " + "end;" + "end; " + "local acquired = redis.call('zcount', KEYS[2], 0, '+inf'); " + "return acquired == false and 0 or acquired;", - Arrays.asList(getRawName(), timeoutName, channelName), System.currentTimeMillis()); + Arrays.asList(getRawName(), timeoutName, channelName), + System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @Override @@ -703,7 +703,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "local available = redis.call('get', KEYS[1]); " + "if (available == false) then " + "redis.call('set', KEYS[1], ARGV[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return;" + "end;" + "local acquired = redis.call('zcount', KEYS[3], 0, '+inf'); " + @@ -712,8 +712,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "return;" + "end;" + "redis.call('incrby', KEYS[1], tonumber(ARGV[1]) - maximum); " + - "redis.call('publish', KEYS[2], ARGV[1]);", - Arrays.asList(getRawName(), channelName, timeoutName), permits); + "redis.call(ARGV[2], KEYS[2], ARGV[1]);", + Arrays.asList(getRawName(), channelName, timeoutName), + permits, getSubscribeService().getPublishCommand()); } @Override @@ -722,11 +723,12 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "redis.call('set', KEYS[1], ARGV[1]); " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return 1;" + "end;" + "return 0;", - Arrays.asList(getRawName(), channelName), permits); + Arrays.asList(getRawName(), channelName), + permits, getSubscribeService().getPublishCommand()); } @Override @@ -743,9 +745,9 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen + "end;" + "redis.call('set', KEYS[1], tonumber(value) + tonumber(ARGV[1])); " + "if tonumber(ARGV[1]) > 0 then " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "end;", - Arrays.asList(getRawName(), channelName), permits); + Arrays.asList(getRawName(), channelName), permits, getSubscribeService().getPublishCommand()); } @Override @@ -758,7 +760,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen "redis.call('zrem', KEYS[2], unpack(expiredIds)); " + "local value = redis.call('incrby', KEYS[1], #expiredIds); " + "if tonumber(value) > 0 then " + - "redis.call('publish', KEYS[3], value); " + + "redis.call(ARGV[4], KEYS[3], value); " + "end;" + "end; " + @@ -769,7 +771,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen + "end;" + "return 0;", Arrays.asList(getRawName(), timeoutName, channelName), - id, timeoutDate, System.currentTimeMillis()); + id, timeoutDate, System.currentTimeMillis(), getSubscribeService().getPublishCommand()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonReadLock.java b/redisson/src/main/java/org/redisson/RedissonReadLock.java index f34a1f50b..2de09a250 100644 --- a/redisson/src/main/java/org/redisson/RedissonReadLock.java +++ b/redisson/src/main/java/org/redisson/RedissonReadLock.java @@ -88,7 +88,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[3], KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " + @@ -126,10 +126,10 @@ public class RedissonReadLock extends RedissonLock implements RLock { "end; " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[3], KEYS[2], ARGV[1]); " + "return 1; ", Arrays.asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix), - LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)); + LockPubSub.UNLOCK_MESSAGE, getLockName(threadId), getSubscribeService().getPublishCommand()); } protected String getKeyPrefix(long threadId, String timeoutPrefix) { @@ -176,11 +176,12 @@ public class RedissonReadLock extends RedissonLock implements RLock { return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'read') then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return 0; ", - Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); + Arrays.asList(getRawName(), getChannelName()), + LockPubSub.UNLOCK_MESSAGE, getSubscribeService().getPublishCommand()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java index ad56c4378..bbc6155ce 100644 --- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java +++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java @@ -49,12 +49,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { private final SemaphorePubSub semaphorePubSub; - final CommandAsyncExecutor commandExecutor; - public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); - this.commandExecutor = commandExecutor; - this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub(); + this.semaphorePubSub = getSubscribeService().getSemaphorePubSub(); } String getChannelName() { @@ -440,8 +437,8 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { RFuture future = commandExecutor.syncedEval(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID, "local value = redis.call('incrby', KEYS[1], ARGV[1]); " + - "redis.call('publish', KEYS[2], value); ", - Arrays.asList(getRawName(), getChannelName()), permits); + "redis.call(ARGV[2], KEYS[2], value); ", + Arrays.asList(getRawName(), getChannelName()), permits, getSubscribeService().getPublishCommand()); if (log.isDebugEnabled()) { future.thenAccept(o -> { log.debug("released, permits: {}, name: {}", permits, getName()); @@ -488,11 +485,12 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { "local value = redis.call('get', KEYS[1]); " + "if (value == false) then " + "redis.call('set', KEYS[1], ARGV[1]); " - + "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return 1;" + "end;" + "return 0;", - Arrays.asList(getRawName(), getChannelName()), permits); + Arrays.asList(getRawName(), getChannelName()), + permits, getSubscribeService().getPublishCommand()); if (log.isDebugEnabled()) { future.thenAccept(r -> { @@ -519,8 +517,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore { + "value = 0;" + "end;" + "redis.call('set', KEYS[1], value + ARGV[1]); " - + "redis.call('publish', KEYS[2], value + ARGV[1]); ", - Arrays.asList(getRawName(), getChannelName()), permits); + + "redis.call(ARGV[2], KEYS[2], value + ARGV[1]); ", + Arrays.asList(getRawName(), getChannelName()), + permits, getSubscribeService().getPublishCommand()); } diff --git a/redisson/src/main/java/org/redisson/RedissonWriteLock.java b/redisson/src/main/java/org/redisson/RedissonWriteLock.java index c02d2d25b..6d643f504 100644 --- a/redisson/src/main/java/org/redisson/RedissonWriteLock.java +++ b/redisson/src/main/java/org/redisson/RedissonWriteLock.java @@ -80,7 +80,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "local mode = redis.call('hget', KEYS[1], 'mode'); " + "if (mode == false) then " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[4], KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (mode == 'write') then " + @@ -96,7 +96,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { "redis.call('hdel', KEYS[1], ARGV[3]); " + "if (redis.call('hlen', KEYS[1]) == 1) then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[4], KEYS[2], ARGV[1]); " + "else " + // has unlocked read-locks "redis.call('hset', KEYS[1], 'mode', 'read'); " + @@ -107,7 +107,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { "end; " + "return nil;", Arrays.asList(getRawName(), getChannelName()), - LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); + LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand()); } @Override @@ -133,11 +133,12 @@ public class RedissonWriteLock extends RedissonLock implements RLock { return commandExecutor.syncedEvalWithRetry(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hget', KEYS[1], 'mode') == 'write') then " + "redis.call('del', KEYS[1]); " + - "redis.call('publish', KEYS[2], ARGV[1]); " + + "redis.call(ARGV[2], KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return 0; ", - Arrays.asList(getRawName(), getChannelName()), LockPubSub.READ_UNLOCK_MESSAGE); + Arrays.asList(getRawName(), getChannelName()), + LockPubSub.READ_UNLOCK_MESSAGE, getSubscribeService().getPublishCommand()); } @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 3ef05f0be..41fccb1c3 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -19,6 +19,7 @@ import io.netty.util.Timeout; import org.redisson.PubSubPatternStatusListener; import org.redisson.client.*; import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ClientConnectionsEntry; @@ -878,6 +879,16 @@ public class PublishSubscribeService { this.shardingSupported = value; } + public boolean isShardingSupported() { + return shardingSupported; + } + public String getPublishCommand() { + if (shardingSupported) { + return RedisCommands.SPUBLISH.getName(); + } + return RedisCommands.PUBLISH.getName(); + } + @Override public String toString() { return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", entry2PubSubConnection=" + entry2PubSubConnection + "]";