Fixed - RedissonPermitExpirableSemaphore throws CROSSSLOT error in cluster if nameMapper is used. #4708

pull/3899/head^2
Nikita Koksharov 2 years ago
parent b0fbb5f6e2
commit 711cc3d719

@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class RedissonPermitExpirableSemaphore extends RedissonExpirable implements RPermitExpirableSemaphore {
private final String channelName;
private final SemaphorePubSub semaphorePubSub;
final CommandAsyncExecutor commandExecutor;
@ -50,20 +51,10 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
public RedissonPermitExpirableSemaphore(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.timeoutName = suffixName(name, "timeout");
this.timeoutName = suffixName(getRawName(), "timeout");
this.commandExecutor = commandExecutor;
this.semaphorePubSub = commandExecutor.getConnectionManager().getSubscribeService().getSemaphorePubSub();
}
String getChannelName() {
return getChannelName(getRawName());
}
public static String getChannelName(String name) {
if (name.contains("{")) {
return "redisson_sc:" + name;
}
return "redisson_sc:{" + name + "}";
this.channelName = prefixName("redisson_sc", getRawName());
}
@Override
@ -383,7 +374,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return ':' .. tostring(v[2]); " +
"end " +
"return nil;",
Arrays.asList(getRawName(), timeoutName, getChannelName()),
Arrays.asList(getRawName(), timeoutName, channelName),
permits, timeoutDate, id, System.currentTimeMillis(), nonExpirableTimeout);
}
@ -534,11 +525,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
private CompletableFuture<RedissonLockEntry> subscribe() {
return semaphorePubSub.subscribe(getRawName(), getChannelName());
return semaphorePubSub.subscribe(getRawName(), channelName);
}
private void unsubscribe(RedissonLockEntry entry) {
semaphorePubSub.unsubscribe(entry, getRawName(), getChannelName());
semaphorePubSub.unsubscribe(entry, getRawName(), channelName);
}
@Override
@ -578,7 +569,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"return 0;" +
"end;" +
"return 1;",
Arrays.asList(getRawName(), getChannelName(), timeoutName),
Arrays.asList(getRawName(), channelName, timeoutName),
id, 1, System.currentTimeMillis());
}
@ -642,7 +633,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
"end; " +
"local ret = redis.call('get', KEYS[1]); " +
"return ret == false and 0 or ret;",
Arrays.<Object>asList(getRawName(), timeoutName, getChannelName()), System.currentTimeMillis());
Arrays.<Object>asList(getRawName(), timeoutName, channelName), System.currentTimeMillis());
}
@Override
@ -660,7 +651,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
Arrays.<Object>asList(getRawName(), channelName), permits);
}
@Override
@ -679,7 +670,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "if tonumber(ARGV[1]) > 0 then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "end;",
Arrays.<Object>asList(getRawName(), getChannelName()), permits);
Arrays.<Object>asList(getRawName(), channelName), permits);
}
@Override
@ -702,7 +693,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.asList(getRawName(), timeoutName, getChannelName()),
Arrays.asList(getRawName(), timeoutName, channelName),
id, timeoutDate, System.currentTimeMillis());
}

@ -1,21 +1,65 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.NameMapper;
import org.redisson.api.RFuture;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.config.Config;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonPermitExpirableSemaphoreTest extends BaseConcurrentTest {
@Test
public void testGetInClusterNameMapper() throws RedisRunner.FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setNameMapper(new NameMapper() {
@Override
public String map(String name) {
return "test::" + name;
}
@Override
public String unmap(String name) {
return name.replace("test::", "");
}
})
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("semaphore");
s.trySetPermits(1);
String v = s.acquire();
s.release(v);
redisson.shutdown();
process.shutdown();
}
@Test
public void testUpdateLeaseTime() throws InterruptedException {
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("test");

Loading…
Cancel
Save