refactoring

pull/5679/head
Nikita Koksharov 11 months ago
parent 60870f06be
commit 1db68fc164

@ -12,9 +12,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.config.Protocol;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.ArrayList;
import java.util.Arrays;
@ -25,21 +22,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
@Testcontainers
public class RedisClientTest {
@Container
private static final GenericContainer<?> REDIS =
new GenericContainer<>("redis:7.2")
.withExposedPorts(6379);
private static RedisClient redisClient;
@BeforeAll
public static void beforeAll() {
RedisClientConfig config = new RedisClientConfig();
config.setProtocol(Protocol.RESP3);
config.setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort());
config.setAddress("redis://127.0.0.1:" + RedisDockerTest.REDIS.getFirstMappedPort());
redisClient = RedisClient.create(config);
}

@ -5,7 +5,6 @@ import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.redisson.api.NatMapper;
import org.redisson.api.RedissonClient;
@ -20,10 +19,12 @@ import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupC
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class RedisDockerTest {
@ -453,7 +454,7 @@ public class RedisDockerTest {
network.close();
}
protected void withNewCluster(Consumer<RedissonClient> callback) {
protected void withNewCluster(BiConsumer<List<ContainerState>, RedissonClient> callback) {
LogMessageWaitStrategy wait2 = new LogMessageWaitStrategy().withRegEx(".*REPLICA\ssync\\:\sFinished\swith\ssuccess.*");
@ -515,13 +516,53 @@ public class RedisDockerTest {
RedissonClient redisson = Redisson.create(config);
try {
callback.accept(redisson);
callback.accept(nodes, redisson);
} finally {
redisson.shutdown();
environment.stop();
}
}
protected String execute(ContainerState node, String... commands) {
try {
return node.execInContainer(commands).getStdout();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
protected List<ContainerState> getSlaveNodes(List<ContainerState> nodes) {
return nodes.stream().filter(node -> {
if (!node.isRunning()) {
return false;
}
String r = execute(node, "redis-cli", "info", "replication");
if (r.contains("role:slave")) {
return true;
}
return false;
}).collect(Collectors.toList());
}
protected List<ContainerState> getMasterNodes(List<ContainerState> nodes) {
return nodes.stream().filter(node -> {
if (!node.isRunning()) {
return false;
}
String r = execute(node, "redis-cli", "info", "replication");
if (r.contains("role:master")) {
return true;
}
return false;
}).collect(Collectors.toList());
}
protected void stop(ContainerState node) {
execute(node, "redis-cli", "shutdown");
}
protected void restart(GenericContainer<?> redis) {
redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":" + redis.getExposedPorts().get(0)));
redis.stop();

@ -43,7 +43,7 @@ public class RedissonBatchTest extends RedisDockerTest {
@ParameterizedTest
@MethodSource("data")
public void testSlotMigrationInCluster(BatchOptions batchOptions) {
withNewCluster(redissonClient -> {
withNewCluster((nodes, redissonClient) -> {
Config config = redissonClient.getConfig();
config.useClusterServers()
.setScanInterval(1000)

@ -8,13 +8,8 @@ import org.redisson.api.Entry;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer;
import java.io.IOException;
@ -154,7 +149,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
@Test
public void testTakeReattachCluster() {
withNewCluster(redisson -> {
withNewCluster((nodes, redisson) -> {
List<RFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
@ -168,14 +163,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
throw new RuntimeException(e);
}
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
Optional<RedisClusterMaster> ff = rnc.getMasters().stream().findFirst();
RedisClusterMaster master = ff.get();
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
List<ContainerState> masters = getMasterNodes(nodes);
stop(masters.get(0));
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
@ -186,7 +175,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
try {
queue.put(i*100);
queue.put(i * 100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@ -200,7 +189,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
// skip
}
Integer result = f.toCompletableFuture().getNow(null);
assertThat(result).isEqualTo(i*100);
assertThat(result).isEqualTo(i * 100);
}
redisson.shutdown();

@ -1,6 +1,5 @@
package org.redisson;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -9,7 +8,8 @@ import org.redisson.api.redisnode.RedisNodes;
import org.redisson.config.Config;
import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer;
import java.net.InetSocketAddress;
import java.util.*;
@ -17,380 +17,294 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.*;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonFailoverTest {
public class RedissonFailoverTest extends RedisDockerTest {
@Test
public void testFailoverInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1, slave4)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(7000);
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 2000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
public void testFailoverInCluster() {
withNewCluster((nodes, redisson) -> {
List<ContainerState> masters = getMasterNodes(nodes);
List<RFuture<?>> futures = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 2000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (i % 100 == 0) {
System.out.println("step: " + i);
}
}
if (i % 100 == 0) {
System.out.println("step: " + i);
}
}
latch.countDown();
latch.countDown();
};
};
};
t.start();
t.join(1000);
Set<InetSocketAddress> oldMasters = new HashSet<>();
Collection<RedisClusterMaster> masterNodes = redisson.getRedisNodes(org.redisson.api.redisnode.RedisNodes.CLUSTER).getMasters();
for (RedisClusterMaster clusterNode : masterNodes) {
oldMasters.add(clusterNode.getAddr());
}
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!");
TimeUnit.SECONDS.sleep(15);
RedisRunner.RedisProcess newMaster = null;
Collection<RedisClusterMaster> newMasterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters();
for (RedisClusterMaster clusterNode : newMasterNodes) {
if (!oldMasters.contains(clusterNode.getAddr())) {
newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get();
break;
t.start();
try {
t.join(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
assertThat(newMaster).isNotNull();
stop(masters.get(0));
System.out.println("master " + masters.get(0).getFirstMappedPort() + " has been stopped!");
TimeUnit.SECONDS.sleep(30);
newMaster.stop();
try {
TimeUnit.SECONDS.sleep(25);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("new master " + newMaster.getRedisServerAddressAndPort() + " has been stopped!");
List<ContainerState> newMasters = getMasterNodes(nodes);
newMasters.removeAll(masters);
assertThat(newMasters).hasSize(1);
assertThat(latch.await(180, TimeUnit.SECONDS)).isTrue();
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int errors = 0;
int success = 0;
int readonlyErrors = 0;
stop(newMasters.get(0));
System.out.println("new master " + newMasters.get(0).getFirstMappedPort() + " has been stopped!");
for (RFuture<?> rFuture : futures) {
try {
rFuture.toCompletableFuture().join();
success++;
} catch (Exception e) {
errors++;
assertThat(latch.await(180, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
redisson.shutdown();
process.shutdown();
int errors = 0;
int success = 0;
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
try {
rFuture.toCompletableFuture().join();
success++;
} catch (Exception e) {
errors++;
}
}
assertThat(readonlyErrors).isZero();
assertThat(errors).isLessThan(1000);
assertThat(success).isGreaterThan(5000);
assertThat(readonlyErrors).isZero();
assertThat(errors).isLessThan(2900);
assertThat(success).isGreaterThan(3000);
});
}
@Test
public void testFailoverInClusterSlave() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(7000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisRunner.RedisProcess slave = process.getNodes().stream().filter(x -> x.getRedisServerPort() == slave1.getPort()).findFirst().get();
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 600; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
futures.add(f1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
public void testFailoverInClusterSlave() {
withNewCluster((nodes, redisson) -> {
ContainerState slave = getSlaveNodes(nodes).get(0);
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 600; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
futures.add(f1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (i % 100 == 0) {
System.out.println("step: " + i);
}
}
if (i % 100 == 0) {
System.out.println("step: " + i);
}
}
latch.countDown();
latch.countDown();
};
};
};
t.start();
t.join(1000);
slave.restart(20);
System.out.println("slave " + slave.getRedisServerAddressAndPort() + " has been stopped!");
assertThat(latch.await(70, TimeUnit.SECONDS)).isTrue();
t.start();
try {
t.join(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int errors = 0;
int success = 0;
int readonlyErrors = 0;
slave.getDockerClient().pauseContainerCmd(slave.getContainerId()).exec();
System.out.println("slave " + slave.getFirstMappedPort() + " has been stopped!");
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
slave.getDockerClient().unpauseContainerCmd(slave.getContainerId()).exec();
for (RFuture<?> rFuture : futures) {
try {
rFuture.toCompletableFuture().join();
success++;
} catch (Exception e) {
e.printStackTrace();
errors++;
// skip
assertThat(latch.await(70, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
redisson.shutdown();
process.shutdown();
int errors = 0;
int success = 0;
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
try {
rFuture.toCompletableFuture().join();
success++;
} catch (Exception e) {
errors++;
// skip
}
}
assertThat(readonlyErrors).isZero();
assertThat(errors).isLessThan(200);
assertThat(success).isGreaterThan(600 - 200);
assertThat(futures.get(futures.size() - 1).isDone()).isTrue();
assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse();
assertThat(readonlyErrors).isZero();
assertThat(errors).isBetween(15, 200);
assertThat(success).isGreaterThan(600 - 200);
assertThat(futures.get(futures.size() - 1).isDone()).isTrue();
assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse();
});
}
@Test
public void testFailoverInSentinel() throws Exception {
RedisRunner.RedisProcess master = new RedisRunner()
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
RedissonClient redisson = Redisson.create(config);
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 1000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
withSentinel((nodes, config) -> {
RedissonClient redisson = Redisson.create(config);
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
for (int i = 0; i < 1000; i++) {
RFuture<?> f1 = redisson.getBucket("i" + i).getAsync();
RFuture<?> f2 = redisson.getBucket("i" + i).setAsync("");
RFuture<?> f3 = redisson.getTopic("topic").publishAsync("testmsg");
futures.add(f1);
futures.add(f2);
futures.add(f3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
latch.countDown();
latch.countDown();
};
};
};
t.start();
t.join(1000);
master.stop();
System.out.println("master " + master.getRedisServerAddressAndPort() + " stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
master = new RedisRunner()
.port(master.getRedisServerPort())
.nosave()
.randomDir()
.run();
t.start();
try {
t.join(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("master " + master.getRedisServerAddressAndPort() + " started!");
GenericContainer master = nodes.get(0);
master.setPortBindings(Arrays.asList(master.getFirstMappedPort() + ":" + master.getExposedPorts().get(0)));
master.stop();
System.out.println("master has been stopped!");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(70));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Thread.sleep(15000);
master.start();
System.out.println("master has been started!");
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
int errors = 0;
int success = 0;
int readonlyErrors = 0;
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (RFuture<?> rFuture : futures) {
try {
rFuture.toCompletableFuture().join();
success++;
} catch (CompletionException e) {
if (e.getCause().getMessage().contains("READONLY You can't write against")) {
readonlyErrors++;
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int errors = 0;
int success = 0;
int readonlyErrors = 0;
for (RFuture<?> rFuture : futures) {
try {
rFuture.toCompletableFuture().join();
success++;
} catch (CompletionException e) {
if (e.getCause().getMessage().contains("READONLY You can't write against")) {
readonlyErrors++;
}
errors++;
// skip
}
errors++;
// skip
}
}
System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors);
assertThat(futures.get(futures.size() - 1).isDone()).isTrue();
assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse();
assertThat(errors).isLessThan(820);
assertThat(readonlyErrors).isZero();
redisson.shutdown();
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
System.out.println("errors " + errors + " success " + success + " readonly " + readonlyErrors);
assertThat(futures.get(futures.size() - 1).isDone()).isTrue();
assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse();
assertThat(errors).isBetween(150, 820);
assertThat(readonlyErrors).isZero();
redisson.shutdown();
}, 2);
}
@Test
public void testFailoverWithoutErrorsInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setRetryAttempts(30)
.setReadMode(ReadMode.MASTER)
.setSubscriptionMode(SubscriptionMode.MASTER)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<?>> futures = new ArrayList<RFuture<?>>();
Set<InetSocketAddress> oldMasters = new HashSet<>();
Collection<RedisClusterMaster> masterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters();
for (RedisClusterMaster clusterNode : masterNodes) {
oldMasters.add(clusterNode.getAddr());
}
master.stop();
for (int j = 0; j < 2000; j++) {
RFuture<?> f2 = redisson.getBucket("" + j).setAsync("");
futures.add(f2);
}
System.out.println("master " + master.getRedisServerAddressAndPort() + " has been stopped!");
Thread.sleep(TimeUnit.SECONDS.toMillis(40));
RedisRunner.RedisProcess newMaster = null;
Collection<RedisClusterMaster> newMasterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters();
for (RedisClusterMaster clusterNode : newMasterNodes) {
if (!oldMasters.contains(clusterNode.getAddr())) {
newMaster = process.getNodes().stream().filter(x -> x.getRedisServerPort() == clusterNode.getAddr().getPort()).findFirst().get();
break;
public void testFailoverWithoutErrorsInCluster() {
withNewCluster((nodes, redissonClient) -> {
Config config = redissonClient.getConfig();
config.useClusterServers()
.setRetryAttempts(30)
.setReadMode(ReadMode.MASTER)
.setSubscriptionMode(SubscriptionMode.MASTER);
RedissonClient redisson = Redisson.create(config);
List<ContainerState> masters = getMasterNodes(nodes);
Set<InetSocketAddress> oldMasters = new HashSet<>();
Collection<RedisClusterMaster> masterNodes = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters();
for (RedisClusterMaster clusterNode : masterNodes) {
oldMasters.add(clusterNode.getAddr());
}
}
assertThat(newMaster).isNotNull();
stop(masters.get(0));
List<RFuture<?>> futures = new ArrayList<>();
for (int j = 0; j < 2000; j++) {
RFuture<?> f2 = redisson.getBucket("" + j).setAsync("");
futures.add(f2);
}
System.out.println("master " + masters.get(0).getFirstMappedPort() + " has been stopped!");
for (RFuture<?> rFuture : futures) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(40));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
List<ContainerState> newMasters = getMasterNodes(nodes);
newMasters.removeAll(masters);
assertThat(newMasters).hasSize(1);
for (RFuture<?> rFuture : futures) {
rFuture.toCompletableFuture().join();
} catch (Exception e) {
Assertions.fail(e.getMessage());
}
}
redisson.shutdown();
process.shutdown();
redisson.shutdown();
});
}

@ -8,16 +8,10 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.GenericContainer;
import java.time.Duration;
@ -50,7 +44,7 @@ public class RedissonShardedTopicTest extends RedisDockerTest {
@Test
public void testReattachInClusterMaster() {
withNewCluster(redissonClient -> {
withNewCluster((nodes, redissonClient) -> {
Config cfg = redissonClient.getConfig();
cfg.useClusterServers()
.setPingConnectionInterval(0)
@ -81,17 +75,13 @@ public class RedissonShardedTopicTest extends RedisDockerTest {
assertThat(topic.countSubscribers()).isEqualTo(1);
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterMaster master : rnc.getMasters()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
RedisConnection cn = c.connect();
List<String> channels = cn.sync(RedisCommands.PUBSUB_SHARDCHANNELS);
if (channels.contains("3")) {
cn.async(RedisCommands.SHUTDOWN);
List<ContainerState> masters = getMasterNodes(nodes);
for (ContainerState master : masters) {
String r = execute(master, "redis-cli", "pubsub", "shardchannels");
if (r.contains("3")) {
stop(master);
break;
}
c.shutdown();
}
Awaitility.waitAtMost(Duration.ofSeconds(30)).until(() -> subscriptions.get() == 2);

@ -6,7 +6,6 @@ import io.netty.util.CharsetUtil;
import net.bytebuddy.utility.RandomString;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.redisson.api.*;
import org.redisson.api.redisnode.RedisClusterMaster;
@ -32,7 +31,6 @@ import org.redisson.config.CredentialsResolver;
import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
@ -673,7 +671,7 @@ public class RedissonTest extends RedisDockerTest {
@Test
public void testMovedRedirectInCluster() throws Exception {
withNewCluster(redissonClient -> {
withNewCluster((nodes, redissonClient) -> {
Config config = redissonClient.getConfig();
config.useClusterServers()
.setScanInterval(100000);

@ -1,9 +1,5 @@
package org.redisson;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.Ports;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -26,12 +22,9 @@ import org.redisson.config.SubscriptionMode;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import java.io.File;
import java.io.Serializable;
import java.time.Duration;
import java.util.*;
@ -40,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@ -405,7 +397,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testSlotMigrationInCluster() {
withNewCluster(client -> {
withNewCluster((nodes, client) -> {
Config config = client.getConfig();
config.useClusterServers()
.setScanInterval(1000)
@ -1170,7 +1162,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachInClusterSlave() {
withNewCluster(client -> {
withNewCluster((nodes2, client) -> {
Config config = client.getConfig();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.SLAVE);
@ -1203,13 +1195,9 @@ public class RedissonTopicTest extends RedisDockerTest {
assertThat(subscriptions.get()).isEqualTo(1);
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes.getSlaves()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + slave.getAddr().getHostString() + ":" + slave.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
List<ContainerState> slaves = getSlaveNodes(nodes2);
for (ContainerState slave : slaves) {
stop(slave);
}
await().atMost(25, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
@ -1320,7 +1308,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachInClusterMaster2() {
withNewCluster(redisson -> {
withNewCluster((nodes, redisson) -> {
Queue<String> messages = new ConcurrentLinkedQueue<>();
Queue<String> subscriptions = new ConcurrentLinkedQueue<>();
@ -1343,14 +1331,8 @@ public class RedissonTopicTest extends RedisDockerTest {
topic.addListener(String.class, (channel, msg) -> messages.add(msg));
}
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
Optional<RedisClusterMaster> f = rnc.getMasters().stream().findFirst();
RedisClusterMaster master = f.get();
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
List<ContainerState> masters = getMasterNodes(nodes);
stop(masters.get(0));
Awaitility.waitAtMost(Duration.ofSeconds(40)).untilAsserted(() -> {
assertThat(subscriptions).hasSizeGreaterThan(125);
@ -1372,7 +1354,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachInClusterMaster() {
withNewCluster(redissonClient -> {
withNewCluster((nodes, redissonClient) -> {
Config cfg = redissonClient.getConfig();
cfg.useClusterServers().setSubscriptionMode(SubscriptionMode.MASTER);
@ -1401,17 +1383,13 @@ public class RedissonTopicTest extends RedisDockerTest {
sendCommands(redisson, "3");
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterMaster master : rnc.getMasters()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
RedisConnection cn = c.connect();
List<String> channels = cn.sync(RedisCommands.PUBSUB_CHANNELS);
if (channels.contains("3")) {
cn.async(RedisCommands.SHUTDOWN);
List<ContainerState> masters = getMasterNodes(nodes);
for (ContainerState master : masters) {
String r = execute(master, "redis-cli", "pubsub", "channels");
if (r.contains("3")) {
stop(master);
break;
}
c.shutdown();
}
try {
@ -1431,7 +1409,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test
public void testReattachPatternTopicListenersOnClusterFailover() {
withNewCluster(redisson -> {
withNewCluster((nodes2, redisson) -> {
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterMaster master : nodes.getMasters()) {
master.setConfig("notify-keyspace-events", "K$");
@ -1467,21 +1445,13 @@ public class RedissonTopicTest extends RedisDockerTest {
return messagesReceived.get() == 100;
});
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterMaster master : rnc.getMasters()) {
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
RedisConnection cn = c.connect();
try {
Boolean res = cn.sync(RedisCommands.EXISTS, "i99");
if (res) {
cn.async(RedisCommands.SHUTDOWN);
}
} catch (Exception e) {
// skip
List<ContainerState> masters = getMasterNodes(nodes2);
for (ContainerState master : masters) {
String r = execute(master, "redis-cli", "exists", "i99");
if (r.contains("1")) {
stop(master);
break;
}
c.shutdown();
}
await().atMost(30, TimeUnit.SECONDS).until(() -> {

Loading…
Cancel
Save