refactoring

pull/5676/head
Nikita Koksharov 1 year ago
parent e5a70dfc90
commit 91f6db8be4

@ -190,7 +190,7 @@ public class PublishSubscribeService {
return new PubSubPatternStatusListener((PubSubPatternStatusListener) l) { return new PubSubPatternStatusListener((PubSubPatternStatusListener) l) {
@Override @Override
public void onStatus(PubSubType type, CharSequence channel) { public void onStatus(PubSubType type, CharSequence channel) {
if (statusCounter.decrementAndGet() == 0) { if (statusCounter.get() == 0 || statusCounter.decrementAndGet() == 0) {
super.onStatus(type, channel); super.onStatus(type, channel);
} }
} }
@ -224,8 +224,8 @@ public class PublishSubscribeService {
public boolean isMultiEntity(ChannelName channelName) { public boolean isMultiEntity(ChannelName channelName) {
return connectionManager.isClusterMode() return connectionManager.isClusterMode()
&& (channelName.toString().startsWith("__keyspace@") && (channelName.toString().startsWith("__keyspace")
|| channelName.toString().startsWith("__keyevent@")); || channelName.toString().startsWith("__keyevent"));
} }
public CompletableFuture<PubSubConnectionEntry> subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, public CompletableFuture<PubSubConnectionEntry> subscribe(MasterSlaveEntry entry, ClientConnectionsEntry clientEntry,
@ -356,7 +356,7 @@ public class PublishSubscribeService {
return new PubSubStatusListener(((PubSubStatusListener) l).getListener(), ((PubSubStatusListener) l).getName()) { return new PubSubStatusListener(((PubSubStatusListener) l).getListener(), ((PubSubStatusListener) l).getName()) {
@Override @Override
public void onStatus(PubSubType type, CharSequence channel) { public void onStatus(PubSubType type, CharSequence channel) {
if (statusCounter.decrementAndGet() == 0) { if (statusCounter.get() == 0 || statusCounter.decrementAndGet() == 0) {
super.onStatus(type, channel); super.onStatus(type, channel);
} }
} }

@ -1,5 +1,10 @@
package org.redisson; package org.redisson;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.redisson.api.NatMapper; import org.redisson.api.NatMapper;
@ -7,20 +12,24 @@ import org.redisson.api.RedissonClient;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.Protocol; import org.redisson.config.Protocol;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import java.io.File;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays; import java.util.function.BiConsumer;
import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
public class RedisDockerTest { public class RedisDockerTest {
protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events"; protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events";
private static final GenericContainer<?> REDIS = createRedis(); protected static final GenericContainer<?> REDIS = createRedis();
protected static final Protocol protocol = Protocol.RESP2; protected static final Protocol protocol = Protocol.RESP2;
@ -35,9 +44,16 @@ public class RedisDockerTest {
} }
protected static GenericContainer<?> createRedis(String version) { protected static GenericContainer<?> createRedis(String version) {
return createRedis(version, "--save", "");
}
protected static GenericContainer<?> createRedis(String version, String... params) {
return new GenericContainer<>("redis:" + version) return new GenericContainer<>("redis:" + version)
.withCreateContainerCmdModifier(cmd -> { .withCreateContainerCmdModifier(cmd -> {
cmd.withCmd("redis-server", "--save", "''"); List<String> args = new ArrayList<>();
args.add("redis-server");
args.addAll(Arrays.asList(params));
cmd.withCmd(args);
}) })
.withExposedPorts(6379); .withExposedPorts(6379);
} }
@ -65,10 +81,14 @@ public class RedisDockerTest {
} }
protected static Config createConfig() { protected static Config createConfig() {
return createConfig(REDIS);
}
protected static Config createConfig(GenericContainer<?> container) {
Config config = new Config(); Config config = new Config();
config.setProtocol(protocol); config.setProtocol(protocol);
config.useSingleServer() config.useSingleServer()
.setAddress("redis://127.0.0.1:" + REDIS.getFirstMappedPort()); .setAddress("redis://127.0.0.1:" + container.getFirstMappedPort());
return config; return config;
} }
@ -77,6 +97,29 @@ public class RedisDockerTest {
return Redisson.create(config); return Redisson.create(config);
} }
protected void withRedisParams(Consumer<Config> redissonCallback, String... params) {
GenericContainer<?> redis =
new GenericContainer<>("redis:7.2")
.withCreateContainerCmdModifier(cmd -> {
List<String> args = new ArrayList<>();
args.add("redis-server");
args.addAll(Arrays.asList(params));
cmd.withCmd(args);
})
.withExposedPorts(6379);
redis.start();
Config config = new Config();
config.setProtocol(protocol);
config.useSingleServer().setAddress("redis://127.0.0.1:" + redis.getFirstMappedPort());
try {
redissonCallback.accept(config);
} finally {
redis.stop();
}
}
protected void testWithParams(Consumer<RedissonClient> redissonCallback, String... params) { protected void testWithParams(Consumer<RedissonClient> redissonCallback, String... params) {
GenericContainer<?> redis = GenericContainer<?> redis =
new GenericContainer<>("redis:7.2") new GenericContainer<>("redis:7.2")
@ -100,7 +143,6 @@ public class RedisDockerTest {
redisson.shutdown(); redisson.shutdown();
redis.stop(); redis.stop();
} }
} }
protected void testInCluster(Consumer<RedissonClient> redissonCallback) { protected void testInCluster(Consumer<RedissonClient> redissonCallback) {
@ -137,4 +179,200 @@ public class RedisDockerTest {
} }
} }
protected void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("redis")
.withExposedPorts(6379);
master.start();
assert master.getNetwork() == network;
int masterPort = master.getFirstMappedPort();
master.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
cmd.getExposedPorts()[0]));
});
nodes.add(master);
for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave" + i)
.withExposedPorts(6379);
slave.start();
int slavePort = slave.getFirstMappedPort();
slave.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
cmd.getExposedPorts()[0]));
});
nodes.add(slave);
}
GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
int sentinel1Port = sentinel1.getFirstMappedPort();
sentinel1.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel1);
GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
int sentinel2Port = sentinel2.getFirstMappedPort();
sentinel2.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel2);
GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
int sentinel3Port = sentinel3.getFirstMappedPort();
sentinel3.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel3);
Thread.sleep(5000);
Config config = new Config();
config.setProtocol(protocol);
config.useSentinelServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
}
Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");
callback.accept(nodes, config);
nodes.forEach(n -> n.stop());
network.close();
}
protected void withNewCluster(Consumer<RedissonClient> callback) {
List<InspectContainerResponse> nodes = new ArrayList<>();
LogMessageWaitStrategy wait2 = new LogMessageWaitStrategy().withRegEx(".*REPLICA\ssync\\:\sFinished\swith\ssuccess.*");
DockerComposeContainer environment =
new DockerComposeContainer(new File("src/test/resources/docker-compose.yml"))
.withExposedService("redis-node-0", 6379)
.withExposedService("redis-node-1", 6379)
.withExposedService("redis-node-2", 6379)
.withExposedService("redis-node-3", 6379)
.withExposedService("redis-node-4", 6379)
.withExposedService("redis-node-5", 6379, wait2);
environment.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < 6; i++) {
Optional<ContainerState> cc = environment.getContainerByServiceName("redis-node-" + i);
nodes.add(cc.get().getContainerInfo());
}
Optional<ContainerState> cc2 = environment.getContainerByServiceName("redis-node-0");
Config config = new Config();
config.useClusterServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (InspectContainerResponse node : nodes) {
Ports.Binding[] mappedPort = node.getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addNodeAddress("redis://127.0.0.1:" + cc2.get().getFirstMappedPort());
RedissonClient redisson = Redisson.create(config);
callback.accept(redisson);
redisson.shutdown();
environment.stop();
}
protected void restart(GenericContainer<?> redis) {
redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
redis.stop();
redis.start();
}
} }

@ -20,9 +20,6 @@ import org.redisson.codec.JsonJacksonCodec;
import org.redisson.command.BatchPromise; import org.redisson.command.BatchPromise;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
@ -45,45 +42,32 @@ public class RedissonBatchTest extends RedisDockerTest {
@ParameterizedTest @ParameterizedTest
@MethodSource("data") @MethodSource("data")
public void testSlotMigrationInCluster(BatchOptions batchOptions) throws Exception { public void testSlotMigrationInCluster(BatchOptions batchOptions) {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); withNewCluster(redissonClient -> {
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); Config config = redissonClient.getConfig();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers() config.useClusterServers()
.setScanInterval(1000) .setScanInterval(1000)
.setSubscriptionMode(SubscriptionMode.MASTER) .setSubscriptionMode(SubscriptionMode.MASTER);
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RedisClientConfig cfg = new RedisClientConfig(); RedisClientConfig cfg = new RedisClientConfig();
cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); cfg.setAddress(config.useClusterServers().getNodeAddresses().get(0));
RedisClient c = RedisClient.create(cfg); RedisClient c = RedisClient.create(cfg);
RedisConnection cc = c.connect(); RedisConnection cc = c.connect();
List<ClusterNodeInfo> mastersList = cc.sync(RedisCommands.CLUSTER_NODES); List<ClusterNodeInfo> mastersList = cc.sync(RedisCommands.CLUSTER_NODES);
mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList()); mastersList = mastersList.stream().filter(i -> i.containsFlag(ClusterNodeInfo.Flag.MASTER)).collect(Collectors.toList());
c.shutdown(); c.shutdown();
ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() != 10922).findAny().get(); ClusterNodeInfo destination = mastersList.stream().filter(i -> i.getSlotRanges().stream().noneMatch(s -> s.hasSlot(10922))).findAny().get();
ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().iterator().next().getStartSlot() == 10922).findAny().get(); ClusterNodeInfo source = mastersList.stream().filter(i -> i.getSlotRanges().stream().anyMatch(s -> s.hasSlot(10922))).findAny().get();
RedisClientConfig sourceCfg = new RedisClientConfig(); RedisClientConfig sourceCfg = new RedisClientConfig();
sourceCfg.setAddress(source.getAddress()); sourceCfg.setAddress(config.useClusterServers().getNatMapper().map(source.getAddress()));
RedisClient sourceClient = RedisClient.create(sourceCfg); RedisClient sourceClient = RedisClient.create(sourceCfg);
RedisConnection sourceConnection = sourceClient.connect(); RedisConnection sourceConnection = sourceClient.connect();
RedisClientConfig destinationCfg = new RedisClientConfig(); RedisClientConfig destinationCfg = new RedisClientConfig();
destinationCfg.setAddress(destination.getAddress()); destinationCfg.setAddress(config.useClusterServers().getNatMapper().map(destination.getAddress()));
RedisClient destinationClient = RedisClient.create(destinationCfg); RedisClient destinationClient = RedisClient.create(destinationCfg);
RedisConnection destinationConnection = destinationClient.connect(); RedisConnection destinationConnection = destinationClient.connect();
@ -112,14 +96,18 @@ public class RedissonBatchTest extends RedisDockerTest {
for (ClusterNodeInfo node : mastersList) { for (ClusterNodeInfo node : mastersList) {
RedisClientConfig cc1 = new RedisClientConfig(); RedisClientConfig cc1 = new RedisClientConfig();
cc1.setAddress(node.getAddress()); cc1.setAddress(config.useClusterServers().getNatMapper().map(node.getAddress()));
RedisClient ccc = RedisClient.create(cc1); RedisClient ccc = RedisClient.create(cc1);
RedisConnection connection = ccc.connect(); RedisConnection connection = ccc.connect();
connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId()); connection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "NODE", destination.getNodeId());
ccc.shutdownAsync(); ccc.shutdownAsync();
} }
try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
batch.execute(); batch.execute();
@ -128,17 +116,15 @@ public class RedissonBatchTest extends RedisDockerTest {
f.toCompletableFuture().get(1, TimeUnit.MILLISECONDS); f.toCompletableFuture().get(1, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
org.junit.jupiter.api.Assertions.fail(e); org.junit.jupiter.api.Assertions.fail(e);
} catch (ExecutionException e) { } catch (Exception e) {
e.printStackTrace(); // skip
} catch (InterruptedException e) {
e.printStackTrace();
} }
}); });
sourceClient.shutdown(); sourceClient.shutdown();
destinationClient.shutdown(); destinationClient.shutdown();
redisson.shutdown(); redisson.shutdown();
process.shutdown(); });
} }
@ParameterizedTest @ParameterizedTest

@ -2,17 +2,21 @@ package org.redisson;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.Timeout;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.Entry; import org.redisson.api.Entry;
import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.connection.balancer.RandomLoadBalancer;
import org.testcontainers.containers.GenericContainer;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
@ -42,26 +46,19 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} }
@Test @Test
public void testPollWithBrokenConnection() throws IOException, InterruptedException, ExecutionException { public void testPollWithBrokenConnection() throws InterruptedException, ExecutionException {
RedisProcess runner = new RedisRunner() GenericContainer<?> redis = createRedis();
.nosave() redis.start();
.randomDir()
.randomPort()
.run();
Config config = new Config(); Config config = createConfig(redis);
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
final RBlockingQueue<Integer> queue1 = getQueue(redisson); RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.pollAsync(5, TimeUnit.SECONDS); RFuture<Integer> f = queue1.pollAsync(5, TimeUnit.SECONDS);
try { Assertions.assertThrows(TimeoutException.class, () -> {
f.toCompletableFuture().get(1, TimeUnit.SECONDS); f.toCompletableFuture().get(1, TimeUnit.SECONDS);
Assertions.fail(); });
} catch (TimeoutException e) { redis.stop();
// skip
}
runner.stop();
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
assertThat(f.get()).isNull(); assertThat(f.get()).isNull();
@ -79,17 +76,12 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} }
@Test @Test
public void testPollReattach() throws InterruptedException, IOException { public void testPollReattach() throws InterruptedException {
RedisProcess runner = new RedisRunner() GenericContainer<?> redis = createRedis("latest","--requirepass", "1234");
.nosave() redis.start();
.randomDir()
.randomPort()
.requirepass("1234")
.run();
Config config = new Config(); Config config = createConfig(redis);
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort()) config.useSingleServer().setPassword("1234");
.setPassword("1234");
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean(); final AtomicBoolean executed = new AtomicBoolean();
@ -112,14 +104,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
t.start(); t.start();
t.join(1000); t.join(1000);
runner.stop();
runner = new RedisRunner() restart(redis);
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.requirepass("1234")
.run();
Thread.sleep(1000); Thread.sleep(1000);
@ -131,19 +117,15 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
await().atMost(7, TimeUnit.SECONDS).untilTrue(executed); await().atMost(7, TimeUnit.SECONDS).untilTrue(executed);
redisson.shutdown(); redisson.shutdown();
runner.stop(); redis.stop();
} }
@Test @Test
public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner() GenericContainer<?> redis = createRedis();
.nosave() redis.start();
.randomDir()
.randomPort()
.run();
Config config = new Config(); Config config = createConfig(redis);
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson); RBlockingQueue<Integer> queue1 = getQueue(redisson);
@ -153,13 +135,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} catch (ExecutionException | TimeoutException e) { } catch (ExecutionException | TimeoutException e) {
// skip // skip
} }
runner.stop();
runner = new RedisRunner() restart(redis);
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.run();
queue1.put(123); queue1.put(123);
// check connection rotation // check connection rotation
@ -172,60 +150,54 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
assertThat(result).isEqualTo(123); assertThat(result).isEqualTo(123);
redisson.shutdown(); redisson.shutdown();
runner.stop(); redis.stop();
} }
@Test @Test
public void testTakeReattachCluster() throws IOException, InterruptedException { public void testTakeReattachCluster() {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); withNewCluster(redisson -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(1000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get();
List<RFuture<Integer>> futures = new ArrayList<>(); List<RFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i); RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
RFuture<Integer> f = queue.takeAsync(); RFuture<Integer> f = queue.takeAsync();
try {
f.toCompletableFuture().get(1, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
// skip
}
futures.add(f); futures.add(f);
} }
master.stop(); try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
Optional<RedisClusterMaster> ff = rnc.getMasters().stream().findFirst();
RedisClusterMaster master = ff.get();
RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
Thread.sleep(TimeUnit.SECONDS.toMillis(80)); try {
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i); RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue" + i);
try {
queue.put(i*100); queue.put(i*100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
RFuture<Integer> f = futures.get(i); RFuture<Integer> f = futures.get(i);
try { try {
f.toCompletableFuture().get(20, TimeUnit.SECONDS); f.toCompletableFuture().get(20, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) { } catch (Exception e) {
// skip // skip
} }
Integer result = f.toCompletableFuture().getNow(null); Integer result = f.toCompletableFuture().getNow(null);
@ -233,7 +205,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} }
redisson.shutdown(); redisson.shutdown();
process.shutdown();
});
} }
@Test @Test
@ -319,14 +292,10 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
@Test @Test
public void testTakeReattach() throws Exception { public void testTakeReattach() throws Exception {
RedisProcess runner = new RedisRunner() GenericContainer<?> redis = createRedis();
.nosave() redis.start();
.randomDir()
.randomPort()
.run();
Config config = new Config(); Config config = createConfig(redis);
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson); RBlockingQueue<Integer> queue1 = getQueue(redisson);
@ -336,13 +305,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} catch (ExecutionException | TimeoutException e) { } catch (ExecutionException | TimeoutException e) {
e.printStackTrace(); e.printStackTrace();
} }
runner.stop();
runner = new RedisRunner() restart(redis);
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.run();
queue1.put(123); queue1.put(123);
// check connection rotation // check connection rotation
@ -353,9 +317,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
Integer result = f.get(1, TimeUnit.SECONDS); Integer result = f.get(1, TimeUnit.SECONDS);
assertThat(result).isEqualTo(123); assertThat(result).isEqualTo(123);
runner.stop();
redisson.shutdown(); redisson.shutdown();
redis.stop();
} }
@Test @Test
@ -444,28 +408,8 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} }
@Test @Test
public void testPollFromAnyInCluster() throws Exception { public void testPollFromAnyInCluster() {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); testInCluster(redissonClient -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> { Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1"); RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
@ -483,9 +427,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
int value = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); int value = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
assertThat(value).isEqualTo(1); assertThat(value).isEqualTo(1);
}); });
});
redisson.shutdown();
process.shutdown();
} }
@Test @Test
@ -557,12 +499,9 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
@Test @Test
public void testPollLastFromAny() throws InterruptedException { public void testPollLastFromAny() throws InterruptedException {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany"); RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1"); RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2"); RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
Assertions.assertDoesNotThrow(() -> {
queue3.put(1); queue3.put(1);
queue3.put(2); queue3.put(2);
queue3.put(3); queue3.put(3);
@ -572,7 +511,6 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
queue2.put(7); queue2.put(7);
queue2.put(8); queue2.put(8);
queue2.put(9); queue2.put(9);
});
Map<String, List<Integer>> res = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2"); Map<String, List<Integer>> res = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2");
assertThat(res.get("queue:pollany")).containsExactly(6, 5); assertThat(res.get("queue:pollany")).containsExactly(6, 5);

@ -1,78 +1,49 @@
package org.redisson; package org.redisson;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.testcontainers.containers.GenericContainer;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class RedissonLockExpirationRenewalTest { public class RedissonLockExpirationRenewalTest extends RedisDockerTest {
private static final String LOCK_KEY = "LOCK_KEY"; private static final String LOCK_KEY = "LOCK_KEY";
public static final long LOCK_WATCHDOG_TIMEOUT = 1_000L; public static final long LOCK_WATCHDOG_TIMEOUT = 1_000L;
private RedissonClient redisson; @Test
public void testExpirationRenewalIsWorkingAfterTimeout() throws InterruptedException {
@BeforeEach GenericContainer<?> redis = createRedis();
public void before() throws IOException, InterruptedException { redis.start();
RedisRunner.startDefaultRedisServerInstance();
redisson = createInstance();
}
@AfterEach Config c = createConfig(redis);
public void after() throws InterruptedException { c.setLockWatchdogTimeout(LOCK_WATCHDOG_TIMEOUT);
redisson.shutdown(); RedissonClient redisson = Redisson.create(c);
RedisRunner.shutDownDefaultRedisServerInstance();
}
@Test
public void testExpirationRenewalIsWorkingAfterTimeout() throws IOException, InterruptedException {
{
RLock lock = redisson.getLock(LOCK_KEY); RLock lock = redisson.getLock(LOCK_KEY);
lock.lock(); lock.lock();
try { try {
// force expiration renewal error // force expiration renewal error
restartRedisServer(); restart(redis);
// wait for timeout // wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally { } finally {
assertThatThrownBy(lock::unlock).isInstanceOf(IllegalMonitorStateException.class); assertThatThrownBy(lock::unlock).isInstanceOf(IllegalMonitorStateException.class);
} }
}
{ RLock lock2 = redisson.getLock(LOCK_KEY);
RLock lock = redisson.getLock(LOCK_KEY); lock2.lock();
lock.lock();
try { try {
// wait for timeout // wait for timeout
Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2); Thread.sleep(LOCK_WATCHDOG_TIMEOUT * 2);
} finally { } finally {
lock.unlock(); lock2.unlock();
}
}
} }
private void restartRedisServer() throws InterruptedException, IOException { redisson.shutdown();
int currentPort = RedisRunner.defaultRedisInstance.getRedisServerPort(); redis.stop();
RedisRunner.shutDownDefaultRedisServerInstance();
RedisRunner.defaultRedisInstance = new RedisRunner().nosave().randomDir().port(currentPort).run();
}
public static Config createConfig() {
Config config = new Config();
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
config.setLockWatchdogTimeout(LOCK_WATCHDOG_TIMEOUT);
return config;
} }
public static RedissonClient createInstance() {
Config config = createConfig();
return Redisson.create(config);
}
} }

@ -14,7 +14,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class RedissonLockHeavyTest extends BaseTest { public class RedissonLockHeavyTest extends RedisDockerTest {
public static Collection<Arguments> data() { public static Collection<Arguments> data() {
return Arrays.asList(Arguments.of(2, 5000), return Arrays.asList(Arguments.of(2, 5000),

@ -1,20 +1,20 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RBlockingQueue; import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.testcontainers.containers.GenericContainer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest { public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest {
@ -34,15 +34,11 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
} }
@Test @Test
public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException { public void testPollAsyncReattach() throws InterruptedException, ExecutionException {
RedisProcess runner = new RedisRunner() GenericContainer<?> redis = createRedis();
.nosave() redis.start();
.randomDir()
.randomPort() Config config = createConfig(redis);
.run();
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson); RBlockingQueue<Integer> queue1 = getQueue(redisson);
@ -52,13 +48,10 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
} catch (ExecutionException | TimeoutException e) { } catch (ExecutionException | TimeoutException e) {
// skip // skip
} }
runner.stop(); redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
redis.stop();
runner = new RedisRunner() redis.start();
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.run();
queue1.put(123); queue1.put(123);
// check connection rotation // check connection rotation
@ -71,20 +64,17 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
assertThat(result).isEqualTo(123); assertThat(result).isEqualTo(123);
redisson.shutdown(); redisson.shutdown();
runner.stop(); redis.stop();
} }
@Test @Test
public void testTakeReattach() throws Exception { public void testTakeReattach() throws Exception {
RedisProcess runner = new RedisRunner() GenericContainer<?> redis = createRedis();
.nosave() redis.start();
.randomDir()
.randomPort() Config config = createConfig(redis);
.run();
Config config = new Config();
config.useSingleServer().setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = getQueue(redisson); RBlockingQueue<Integer> queue1 = getQueue(redisson);
RFuture<Integer> f = queue1.takeAsync(); RFuture<Integer> f = queue1.takeAsync();
try { try {
@ -92,13 +82,10 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
} catch (ExecutionException | TimeoutException e) { } catch (ExecutionException | TimeoutException e) {
// skip // skip
} }
runner.stop(); redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
redis.stop();
runner = new RedisRunner() redis.start();
.port(runner.getRedisServerPort())
.nosave()
.randomDir()
.run();
queue1.put(123); queue1.put(123);
// check connection rotation // check connection rotation
@ -109,9 +96,9 @@ public class RedissonPriorityBlockingQueueTest extends RedissonBlockingQueueTest
Integer result = f.get(); Integer result = f.get();
assertThat(result).isEqualTo(123); assertThat(result).isEqualTo(123);
assertThat(queue1.size()).isEqualTo(10); assertThat(queue1.size()).isEqualTo(10);
runner.stop();
redisson.shutdown(); redisson.shutdown();
redis.stop();
} }

@ -14,7 +14,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.RQueue; import org.redisson.api.RQueue;
public class RedissonQueueTest extends BaseTest { public class RedissonQueueTest extends RedisDockerTest {
<T> RQueue<T> getQueue() { <T> RQueue<T> getQueue() {
return redisson.getQueue("queue"); return redisson.getQueue("queue");

@ -1,5 +1,6 @@
package org.redisson; package org.redisson;
import com.github.dockerjava.api.model.ContainerNetwork;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
@ -7,12 +8,8 @@ import org.redisson.api.redisnode.RedisNodes;
import org.redisson.api.redisnode.*; import org.redisson.api.redisnode.*;
import org.redisson.client.protocol.Time; import org.redisson.client.protocol.Time;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -24,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class RedissonRedisNodesTest extends BaseTest { public class RedissonRedisNodesTest extends RedisDockerTest {
@Test @Test
public void testNode() { public void testNode() {
@ -100,96 +97,26 @@ public class RedissonRedisNodesTest extends BaseTest {
} }
@Test @Test
public void testSentinelFailover() throws IOException, InterruptedException { public void testSentinelFailover() throws InterruptedException {
RedisRunner.RedisProcess master = new RedisRunner() withSentinel((nns, config) -> {
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
long t = System.currentTimeMillis();
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE); RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE);
RedisSentinel sentinel = nodes.getSentinels().iterator().next(); RedisSentinel sentinel = nodes.getSentinels().iterator().next();
sentinel.failover("myMaster"); sentinel.failover(config.useSentinelServers().getMasterName());
redisson.shutdown(); redisson.shutdown();
}, 2);
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
} }
@Test @Test
public void testCluster() throws Exception { public void testCluster() {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); testInCluster(redisson -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
RedisRunner slave4 = new RedisRunner().port(6903).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1, slave4)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
assertThat(nodes.getMaster("redis://127.0.0.1:6890")).isNotNull(); RedisClusterMaster n = nodes.getMasters().iterator().next();
assertThat(nodes.getMaster("redis://" + n.getAddr().getHostString() + ":" + n.getAddr().getPort())).isNotNull();
assertThat(nodes.getMaster("redis://127.0.0.1:6899")).isNull(); assertThat(nodes.getMaster("redis://127.0.0.1:6899")).isNull();
assertThat(nodes.getMasters()).hasSize(3); assertThat(nodes.getMasters()).hasSize(3);
assertThat(nodes.getSlaves()).hasSize(4); assertThat(nodes.getSlaves()).hasSize(3);
for (RedisClusterMaster master : nodes.getMasters()) { for (RedisClusterMaster master : nodes.getMasters()) {
master.clusterDeleteSlots(1, 2); master.clusterDeleteSlots(1, 2);
@ -217,56 +144,12 @@ public class RedissonRedisNodesTest extends BaseTest {
Map<ClusterSlotRange, Set<String>> slots = slave.clusterSlots(); Map<ClusterSlotRange, Set<String>> slots = slave.clusterSlots();
assertThat(slots.entrySet().size()).isBetween(3, 5); assertThat(slots.entrySet().size()).isBetween(3, 5);
} }
redisson.shutdown(); });
process.shutdown();
} }
@Test @Test
public void testSentinel() throws IOException, InterruptedException { public void testSentinel() throws InterruptedException {
RedisRunner.RedisProcess master = new RedisRunner() withSentinel((nns, config) -> {
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
long t = System.currentTimeMillis();
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE); RedisSentinelMasterSlave nodes = redisson.getRedisNodes(RedisNodes.SENTINEL_MASTER_SLAVE);
@ -276,20 +159,25 @@ public class RedissonRedisNodesTest extends BaseTest {
for (RedisSentinel sentinel : nodes.getSentinels()) { for (RedisSentinel sentinel : nodes.getSentinels()) {
Assertions.assertTrue(sentinel.ping()); Assertions.assertTrue(sentinel.ping());
RedisURI addr = sentinel.getMasterAddr("myMaster"); RedisURI addr = sentinel.getMasterAddr(config.useSentinelServers().getMasterName());
assertThat(addr.getHost()).isEqualTo("127.0.0.1");
assertThat(addr.getPort()).isEqualTo(master.getRedisServerPort()); Map<String, ContainerNetwork> ss = nns.get(0).getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
Integer port = nns.get(0).getExposedPorts().get(0);
Map<String, String> masterMap = sentinel.getMaster("myMaster"); assertThat(addr.getHost()).isEqualTo(s.getIpAddress());
assertThat(addr.getPort()).isEqualTo(port);
Map<String, String> masterMap = sentinel.getMaster(config.useSentinelServers().getMasterName());
assertThat(masterMap).isNotEmpty(); assertThat(masterMap).isNotEmpty();
List<Map<String, String>> masters = sentinel.getMasters(); List<Map<String, String>> masters = sentinel.getMasters();
assertThat(masters).hasSize(1); assertThat(masters).hasSize(1);
Map<String, String> m = masters.get(0); Map<String, String> m = masters.get(0);
assertThat(m.get("ip")).isEqualTo("127.0.0.1"); assertThat(m.get("ip")).isEqualTo(s.getIpAddress());
assertThat(Integer.valueOf(m.get("port"))).isEqualTo(master.getRedisServerPort()); assertThat(Integer.valueOf(m.get("port"))).isEqualTo(port);
List<Map<String, String>> slaves = sentinel.getSlaves("myMaster"); List<Map<String, String>> slaves = sentinel.getSlaves(config.useSentinelServers().getMasterName());
assertThat(slaves).hasSize(2); assertThat(slaves).hasSize(2);
} }
nodes.getSlaves().forEach((node) -> { nodes.getSlaves().forEach((node) -> {
@ -297,37 +185,12 @@ public class RedissonRedisNodesTest extends BaseTest {
}); });
redisson.shutdown(); redisson.shutdown();
}, 2);
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
} }
@Test @Test
public void testNodesInCluster() throws Exception { public void testNodesInCluster() {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); testInCluster(redisson -> {
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER); RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
assertThat(nodes.getMasters()).hasSize(3); assertThat(nodes.getMasters()).hasSize(3);
for (RedisClusterMaster node : nodes.getMasters()) { for (RedisClusterMaster node : nodes.getMasters()) {
@ -337,9 +200,7 @@ public class RedissonRedisNodesTest extends BaseTest {
for (RedisClusterSlave node : nodes.getSlaves()) { for (RedisClusterSlave node : nodes.getSlaves()) {
assertThat(node.info(RedisNode.InfoSection.ALL)).isNotEmpty(); assertThat(node.info(RedisNode.InfoSection.ALL)).isNotEmpty();
} }
});
redisson.shutdown();
process.shutdown();
} }
@Test @Test

@ -1,36 +1,22 @@
package org.redisson; package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.FailedToStartRedisException; import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RList; import org.redisson.api.RList;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder; import org.redisson.api.SortOrder;
import org.redisson.client.codec.IntegerCodec; import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer; import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonSetTest extends RedisDockerTest { public class RedissonSetTest extends RedisDockerTest {

@ -8,8 +8,6 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.*; import org.redisson.api.*;
import org.redisson.api.redisnode.RedisClusterMaster; import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisMaster; import org.redisson.api.redisnode.RedisMaster;
@ -27,11 +25,17 @@ import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.cluster.ClusterNodeInfo.Flag; import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec; import org.redisson.codec.SerializationCodec;
import org.redisson.config.*; import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.config.Credentials;
import org.redisson.config.CredentialsResolver;
import org.redisson.connection.CRC16; import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -46,7 +50,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
public class RedissonTest extends BaseTest { public class RedissonTest extends RedisDockerTest {
@Test @Test
public void testVirtualThreads() { public void testVirtualThreads() {
@ -110,7 +114,7 @@ public class RedissonTest extends BaseTest {
} }
@Test @Test
public void testLazyInitialization() throws IOException, InterruptedException { public void testLazyInitialization() {
Config config = new Config(); Config config = new Config();
config.setLazyInitialization(true); config.setLazyInitialization(true);
config.useSingleServer() config.useSingleServer()
@ -125,16 +129,14 @@ public class RedissonTest extends BaseTest {
redisson.getStream("test").createGroup(StreamCreateGroupArgs.name("test").makeStream()); redisson.getStream("test").createGroup(StreamCreateGroupArgs.name("test").makeStream());
}); });
RedisProcess pp = new RedisRunner() FixedHostPortGenericContainer c = new FixedHostPortGenericContainer("redis:7.2")
.nosave() .withFixedExposedPort(4431, 6379);
.port(4431) c.start();
.randomDir()
.run();
redisson.getStream("test").createGroup(StreamCreateGroupArgs.name("test").makeStream()); redisson.getStream("test").createGroup(StreamCreateGroupArgs.name("test").makeStream());
redisson.shutdown(); redisson.shutdown();
pp.stop(); c.stop();
} }
@Test @Test
@ -152,20 +154,19 @@ public class RedissonTest extends BaseTest {
} }
ex.shutdown(); ex.shutdown();
assertThat(ex.awaitTermination(8, TimeUnit.SECONDS)).isTrue(); assertThat(ex.awaitTermination(20, TimeUnit.SECONDS)).isTrue();
inst.shutdown(); inst.shutdown();
} }
@Test @Test
public void testResponseHandling2() throws InterruptedException { public void testResponseHandling2() throws InterruptedException {
Config config = new Config(); Config config = createConfig();
config.useSingleServer() config.useSingleServer()
.setTimeout(10) .setTimeout(10)
.setRetryAttempts(0) .setRetryAttempts(0)
.setConnectionPoolSize(1) .setConnectionPoolSize(1)
.setConnectionMinimumIdleSize(1) .setConnectionMinimumIdleSize(1)
.setPingConnectionInterval(0) .setPingConnectionInterval(0);
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
@ -225,7 +226,7 @@ public class RedissonTest extends BaseTest {
}); });
} }
e.shutdown(); e.shutdown();
assertThat(e.awaitTermination(40, TimeUnit.SECONDS)).isTrue(); assertThat(e.awaitTermination(70, TimeUnit.SECONDS)).isTrue();
assertThat(counter.get()).isEqualTo(10000 * 100); assertThat(counter.get()).isEqualTo(10000 * 100);
} }
@ -247,11 +248,10 @@ public class RedissonTest extends BaseTest {
@Test @Test
public void testSmallPool() throws InterruptedException { public void testSmallPool() throws InterruptedException {
Config config = new Config(); Config config = createConfig();
config.useSingleServer() config.useSingleServer()
.setConnectionMinimumIdleSize(3) .setConnectionMinimumIdleSize(3)
.setConnectionPoolSize(3) .setConnectionPoolSize(3);
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient localRedisson = Redisson.create(config); RedissonClient localRedisson = Redisson.create(config);
@ -283,12 +283,11 @@ public class RedissonTest extends BaseTest {
} }
@Test @Test
public void testNextResponseAfterDecoderError() throws Exception { public void testNextResponseAfterDecoderError() {
Config config = new Config(); Config config = createConfig();
config.useSingleServer() config.useSingleServer()
.setConnectionMinimumIdleSize(1) .setConnectionMinimumIdleSize(1)
.setConnectionPoolSize(1) .setConnectionPoolSize(1);
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
@ -299,7 +298,7 @@ public class RedissonTest extends BaseTest {
RBuckets buckets = redisson.getBuckets(new JsonJacksonCodec()); RBuckets buckets = redisson.getBuckets(new JsonJacksonCodec());
buckets.get("test2", "test1"); buckets.get("test2", "test1");
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); // skip
} }
assertThat(getStringValue(redisson, "test3")).isEqualTo("\"test3\""); assertThat(getStringValue(redisson, "test3")).isEqualTo("\"test3\"");
@ -324,81 +323,76 @@ public class RedissonTest extends BaseTest {
@Test @Test
public void testSer() { public void testSer() {
Config config = new Config(); Config config = createConfig();
config.useSingleServer().setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
config.setCodec(new SerializationCodec()); config.setCodec(new SerializationCodec());
Assertions.assertThrows(IllegalArgumentException.class, () -> {
RedissonClient r = Redisson.create(config); RedissonClient r = Redisson.create(config);
Assertions.assertThrows(IllegalArgumentException.class, () -> {
r.getMap("test").put("1", new Dummy()); r.getMap("test").put("1", new Dummy());
}); });
r.shutdown();
} }
@Test @Test
public void testMemoryScript() throws IOException, InterruptedException { public void testMemoryScript() {
RedisProcess p = redisTestSmallMemory(); testWithParams(redissonClient -> {
Config c = redissonClient.getConfig();
Config config = new Config(); c.useSingleServer().setTimeout(100000);
config.useSingleServer().setAddress(p.getRedisServerAddressAndPort()).setTimeout(100000);
Assertions.assertThrows(RedisOutOfMemoryException.class, () -> { Assertions.assertThrows(RedisOutOfMemoryException.class, () -> {
RedissonClient r = null; RedissonClient r = null;
try { try {
r = Redisson.create(config); r = Redisson.create(c);
r.getKeys().flushall();
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
r.getMap("test").put("" + i, "" + i); r.getMap("test").put("" + i, "" + i);
} }
} finally { } finally {
r.shutdown(); r.shutdown();
p.stop();
} }
}); });
}, "--maxmemory", "1mb");
} }
@Test @Test
public void testMemoryCommand() throws IOException, InterruptedException { public void testMemoryCommand() {
RedisProcess p = redisTestSmallMemory(); testWithParams(redissonClient -> {
Config c = redissonClient.getConfig();
Config config = new Config(); c.useSingleServer().setTimeout(100000);
config.useSingleServer().setAddress(p.getRedisServerAddressAndPort()).setTimeout(100000);
Assertions.assertThrows(RedisOutOfMemoryException.class, () -> { Assertions.assertThrows(RedisOutOfMemoryException.class, () -> {
RedissonClient r = null; RedissonClient r = null;
try { try {
r = Redisson.create(config); r = Redisson.create(c);
r.getKeys().flushall();
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
r.getMap("test").fastPut("" + i, "" + i); r.getMap("test").fastPut("" + i, "" + i);
} }
} finally { } finally {
r.shutdown(); r.shutdown();
p.stop();
} }
}); });
}, "--maxmemory", "1mb");
} }
@Test @Test
public void testConfigValidation() { public void testConfigValidation() {
Assertions.assertThrows(IllegalArgumentException.class, () -> { Assertions.assertThrows(IllegalArgumentException.class, () -> {
Config redissonConfig = new Config(); Config redissonConfig = createConfig();
redissonConfig.useSingleServer() redissonConfig.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort())
.setConnectionPoolSize(2); .setConnectionPoolSize(2);
Redisson.create(redissonConfig); Redisson.create(redissonConfig);
}); });
} }
@Test @Test
public void testConnectionListener() throws IOException, InterruptedException, TimeoutException { public void testConnectionListener() {
GenericContainer<?> redis = createRedis();
final RedisProcess p = redisTestConnection(); redis.start();
final AtomicInteger connectCounter = new AtomicInteger(); final AtomicInteger connectCounter = new AtomicInteger();
final AtomicInteger disconnectCounter = new AtomicInteger(); final AtomicInteger disconnectCounter = new AtomicInteger();
Config config = new Config(); Config config = createConfig(redis);
config.useSingleServer().setAddress(p.getRedisServerAddressAndPort());
config.setConnectionListener(new ConnectionListener() { config.setConnectionListener(new ConnectionListener() {
@Override @Override
@ -407,7 +401,7 @@ public class RedissonTest extends BaseTest {
@Override @Override
public void onDisconnect(InetSocketAddress addr, NodeType nodeType) { public void onDisconnect(InetSocketAddress addr, NodeType nodeType) {
assertThat(addr).isEqualTo(new InetSocketAddress(p.getRedisServerBindAddress(), p.getRedisServerPort())); assertThat(addr).isEqualTo(new InetSocketAddress(redis.getHost(), redis.getFirstMappedPort()));
assertThat(nodeType).isEqualTo(NodeType.MASTER); assertThat(nodeType).isEqualTo(NodeType.MASTER);
disconnectCounter.incrementAndGet(); disconnectCounter.incrementAndGet();
} }
@ -418,7 +412,7 @@ public class RedissonTest extends BaseTest {
@Override @Override
public void onConnect(InetSocketAddress addr, NodeType nodeType) { public void onConnect(InetSocketAddress addr, NodeType nodeType) {
assertThat(addr).isEqualTo(new InetSocketAddress(p.getRedisServerBindAddress(), p.getRedisServerPort())); assertThat(addr).isEqualTo(new InetSocketAddress(redis.getHost(), redis.getFirstMappedPort()));
assertThat(nodeType).isEqualTo(NodeType.MASTER); assertThat(nodeType).isEqualTo(NodeType.MASTER);
connectCounter.incrementAndGet(); connectCounter.incrementAndGet();
} }
@ -427,23 +421,21 @@ public class RedissonTest extends BaseTest {
RedissonClient r = Redisson.create(config); RedissonClient r = Redisson.create(config);
r.getBucket("1").get(); r.getBucket("1").get();
Assertions.assertEquals(0, p.stop()); redis.setPortBindings(Arrays.asList(redis.getFirstMappedPort() + ":6379"));
redis.stop();
await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1); await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1);
try { try {
r.getBucket("1").get(); r.getBucket("1").get();
} catch (Exception e) { } catch (Exception e) {
// skip
} }
assertThat(connectCounter.get()).isEqualTo(1); assertThat(connectCounter.get()).isEqualTo(1);
assertThat(disconnectCounter.get()).isEqualTo(1); assertThat(disconnectCounter.get()).isEqualTo(1);
RedisProcess pp = new RedisRunner() redis.start();
.nosave()
.port(p.getRedisServerPort())
.randomDir()
.run();
r.getBucket("1").get(); r.getBucket("1").get();
@ -451,7 +443,7 @@ public class RedissonTest extends BaseTest {
assertThat(disconnectCounter.get()).isEqualTo(1); assertThat(disconnectCounter.get()).isEqualTo(1);
r.shutdown(); r.shutdown();
Assertions.assertEquals(0, pp.stop()); redis.stop();
} }
public static class SlowCodec extends BaseCodec { public static class SlowCodec extends BaseCodec {
@ -501,27 +493,20 @@ public class RedissonTest extends BaseTest {
} }
@Test @Test
public void testReconnection() throws IOException, InterruptedException, TimeoutException { public void testReconnection() {
RedisProcess runner = new RedisRunner() Config config = redisson.getConfig();
.appendonly(true)
.randomDir()
.randomPort()
.run();
Config config = new Config();
config.useSingleServer() config.useSingleServer()
.setConnectionMinimumIdleSize(20) .setConnectionMinimumIdleSize(20)
.setConnectionPoolSize(20) .setConnectionPoolSize(20)
.setSubscriptionConnectionMinimumIdleSize(20) .setSubscriptionConnectionMinimumIdleSize(20)
.setSubscriptionConnectionPoolSize(20) .setSubscriptionConnectionPoolSize(20);
.setAddress(runner.getRedisServerAddressAndPort());
RedissonClient r = Redisson.create(config); RedissonClient r = Redisson.create(config);
r.getBucket("myBucket").set(1); r.getBucket("myBucket").set(1);
assertThat(r.getBucket("myBucket").get()).isEqualTo(1); assertThat(r.getBucket("myBucket").get()).isEqualTo(1);
Assertions.assertEquals(0, runner.stop()); REDIS.getDockerClient().pauseContainerCmd(REDIS.getContainerId()).exec();
AtomicBoolean hasError = new AtomicBoolean(); AtomicBoolean hasError = new AtomicBoolean();
try { try {
@ -533,17 +518,11 @@ public class RedissonTest extends BaseTest {
assertThat(hasError.get()).isTrue(); assertThat(hasError.get()).isTrue();
RedisProcess pp = new RedisRunner() REDIS.getDockerClient().unpauseContainerCmd(REDIS.getContainerId()).exec();
.appendonly(true)
.port(runner.getRedisServerPort())
.dir(runner.getDefaultDir())
.run();
assertThat(r.getBucket("myBucket").get()).isEqualTo(1); assertThat(r.getBucket("myBucket").get()).isEqualTo(1);
r.shutdown(); r.shutdown();
Assertions.assertEquals(0, pp.stop());
} }
@ -565,10 +544,7 @@ public class RedissonTest extends BaseTest {
@Test @Test
public void testShutdown() { public void testShutdown() {
Config config = new Config(); RedissonClient r = createInstance();
config.useSingleServer().setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient r = Redisson.create(config);
Assertions.assertFalse(r.isShuttingDown()); Assertions.assertFalse(r.isShuttingDown());
Assertions.assertFalse(r.isShutdown()); Assertions.assertFalse(r.isShutdown());
r.shutdown(); r.shutdown();
@ -577,30 +553,22 @@ public class RedissonTest extends BaseTest {
} }
@Test @Test
public void testCredentials() throws IOException, InterruptedException { public void testCredentials() {
RedisProcess runner = new RedisRunner() withRedisParams(config -> {
.nosave()
.randomDir()
.randomPort()
.requirepass("1234")
.run();
Config config = new Config();
config.useSingleServer() config.useSingleServer()
.setCredentialsResolver(new CredentialsResolver() { .setCredentialsResolver(new CredentialsResolver() {
@Override @Override
public CompletionStage<Credentials> resolve(InetSocketAddress address) { public CompletionStage<Credentials> resolve(InetSocketAddress address) {
return CompletableFuture.completedFuture(new Credentials(null, "1234")); return CompletableFuture.completedFuture(new Credentials(null, "1234"));
} }
}) });
.setAddress(runner.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RBucket<String> b = redisson.getBucket("test"); RBucket<String> b = redisson.getBucket("test");
b.set("123"); b.set("123");
redisson.shutdown(); redisson.shutdown();
runner.stop(); }, "--requirepass", "1234");
} }
@Test @Test
@ -617,29 +585,23 @@ public class RedissonTest extends BaseTest {
RedisException e = Assertions.assertThrows(RedisException.class, () -> { RedisException e = Assertions.assertThrows(RedisException.class, () -> {
b.compareAndSet("test", "v1"); b.compareAndSet("test", "v1");
}); });
assertThat(e.getMessage()).startsWith("ERR unknown command `EVAL_111`"); assertThat(e.getMessage()).startsWith("ERR unknown command 'EVAL_111'");
redisson.shutdown(); redisson.shutdown();
} }
@Test @Test
public void testURIPassword() throws InterruptedException, IOException { public void testURIPassword() {
RedisProcess runner = new RedisRunner() withRedisParams(config -> {
.nosave() RedisURI ur = new RedisURI(config.useSingleServer().getAddress());
.randomDir()
.randomPort()
.requirepass("1234")
.run();
Config config = new Config();
config.useSingleServer() config.useSingleServer()
.setAddress("redis://:1234@" + runner.getRedisServerBindAddress() + ":" + runner.getRedisServerPort()); .setAddress("redis://:1234@" + ur.getHost() + ":" + ur.getPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RBucket<String> b = redisson.getBucket("test"); RBucket<String> b = redisson.getBucket("test");
b.set("123"); b.set("123");
redisson.shutdown(); redisson.shutdown();
runner.stop(); }, "--requirepass", "1234");
} }
@Test @Test
@ -716,67 +678,17 @@ public class RedissonTest extends BaseTest {
@Test @Test
public void testSentinelStartup() throws Exception { public void testSentinelStartup() throws Exception {
RedisRunner.RedisProcess master = new RedisRunner() withSentinel((nodes, config) -> {
.nosave()
.randomDir()
.run();
RedisRunner.RedisProcess slave1 = new RedisRunner()
.port(6380)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess slave2 = new RedisRunner()
.port(6381)
.nosave()
.randomDir()
.slaveof("127.0.0.1", 6379)
.run();
RedisRunner.RedisProcess sentinel1 = new RedisRunner()
.nosave()
.randomDir()
.port(26379)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel2 = new RedisRunner()
.nosave()
.randomDir()
.port(26380)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
RedisRunner.RedisProcess sentinel3 = new RedisRunner()
.nosave()
.randomDir()
.port(26381)
.sentinel()
.sentinelMonitor("myMaster", "127.0.0.1", 6379, 2)
.run();
Thread.sleep(5000);
Config config = new Config();
config.useSentinelServers()
.setLoadBalancer(new RandomLoadBalancer())
.addSentinelAddress(sentinel3.getRedisServerAddressAndPort()).setMasterName("myMaster");
long t = System.currentTimeMillis(); long t = System.currentTimeMillis();
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
assertThat(System.currentTimeMillis() - t).isLessThan(2000L); assertThat(System.currentTimeMillis() - t).isLessThan(2000L);
redisson.shutdown(); redisson.shutdown();
}, 2);
sentinel1.stop();
sentinel2.stop();
sentinel3.stop();
master.stop();
slave1.stop();
slave2.stop();
} }
@Test @Test
public void testSingleConfigYAML() throws IOException { public void testSingleConfigYAML() throws IOException {
RedissonClient r = BaseTest.createInstance(); RedissonClient r = createInstance();
String t = r.getConfig().toYAML(); String t = r.getConfig().toYAML();
Config c = Config.fromYAML(t); Config c = Config.fromYAML(t);
assertThat(c.toYAML()).isEqualTo(t); assertThat(c.toYAML()).isEqualTo(t);
@ -802,27 +714,11 @@ public class RedissonTest extends BaseTest {
} }
@Test @Test
public void testEvalCache() throws InterruptedException, IOException { public void testEvalCache() {
RedisRunner master1 = new RedisRunner().port(6896).randomDir().nosave(); testInCluster(redissonClient -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave(); Config config = redissonClient.getConfig();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.setUseScriptCache(true); config.setUseScriptCache(true);
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RTimeSeries<String, Object> t = redisson.getTimeSeries("test"); RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
@ -831,32 +727,26 @@ public class RedissonTest extends BaseTest {
t.add(1, "10", 1, TimeUnit.SECONDS); t.add(1, "10", 1, TimeUnit.SECONDS);
t.size(); t.size();
redisson.shutdown();
});
} }
@Test @Test
public void testMovedRedirectInCluster() throws Exception { public void testMovedRedirectInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); withNewCluster(redissonClient -> {
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); Config config = redissonClient.getConfig();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers() config.useClusterServers()
.setScanInterval(100000) .setScanInterval(100000);
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
Collection<RedisClusterMaster> ms = redisson.getRedisNodes(RedisNodes.CLUSTER).getMasters();
RedisClusterMaster m = ms.iterator().next();
RedisURI a = config.useClusterServers().getNatMapper().map(
new RedisURI("redis://" + m.getAddr().getHostString() + ":" + m.getAddr().getPort()));
RedisClientConfig cfg = new RedisClientConfig(); RedisClientConfig cfg = new RedisClientConfig();
cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort()); cfg.setAddress(a);
RedisClient c = RedisClient.create(cfg); RedisClient c = RedisClient.create(cfg);
RedisConnection cc = c.connect(); RedisConnection cc = c.connect();
List<ClusterNodeInfo> cn = cc.sync(RedisCommands.CLUSTER_NODES); List<ClusterNodeInfo> cn = cc.sync(RedisCommands.CLUSTER_NODES);
@ -869,12 +759,12 @@ public class RedissonTest extends BaseTest {
ClusterNodeInfo destination = nodesIter.next(); ClusterNodeInfo destination = nodesIter.next();
RedisClientConfig sourceCfg = new RedisClientConfig(); RedisClientConfig sourceCfg = new RedisClientConfig();
sourceCfg.setAddress(source.getAddress()); sourceCfg.setAddress(config.useClusterServers().getNatMapper().map(source.getAddress()));
RedisClient sourceClient = RedisClient.create(sourceCfg); RedisClient sourceClient = RedisClient.create(sourceCfg);
RedisConnection sourceConnection = sourceClient.connect(); RedisConnection sourceConnection = sourceClient.connect();
RedisClientConfig destinationCfg = new RedisClientConfig(); RedisClientConfig destinationCfg = new RedisClientConfig();
destinationCfg.setAddress(destination.getAddress()); destinationCfg.setAddress(config.useClusterServers().getNatMapper().map(destination.getAddress()));
RedisClient destinationClient = RedisClient.create(destinationCfg); RedisClient destinationClient = RedisClient.create(destinationCfg);
RedisConnection destinationConnection = destinationClient.connect(); RedisConnection destinationConnection = destinationClient.connect();
@ -906,7 +796,7 @@ public class RedissonTest extends BaseTest {
for (ClusterNodeInfo node : cn) { for (ClusterNodeInfo node : cn) {
RedisClientConfig cc1 = new RedisClientConfig(); RedisClientConfig cc1 = new RedisClientConfig();
cc1.setAddress(node.getAddress()); cc1.setAddress(config.useClusterServers().getNatMapper().map(node.getAddress()));
RedisClient ccc = RedisClient.create(cc1); RedisClient ccc = RedisClient.create(cc1);
RedisConnection connection = ccc.connect(); RedisConnection connection = ccc.connect();
connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId()); connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId());
@ -919,7 +809,7 @@ public class RedissonTest extends BaseTest {
sourceClient.shutdown(); sourceClient.shutdown();
destinationClient.shutdown(); destinationClient.shutdown();
redisson.shutdown(); redisson.shutdown();
process.shutdown(); });
} }
@ -995,31 +885,13 @@ public class RedissonTest extends BaseTest {
@Test @Test
public void testManyConnections() { public void testManyConnections() {
Assumptions.assumeFalse(RedissonRuntimeEnvironment.isTravis); Config redisConfig = createConfig();
Config redisConfig = new Config();
redisConfig.useSingleServer() redisConfig.useSingleServer()
.setConnectionMinimumIdleSize(5000) .setConnectionMinimumIdleSize(5000)
.setConnectionPoolSize(5000) .setConnectionPoolSize(5000);
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient r = Redisson.create(redisConfig); RedissonClient r = Redisson.create(redisConfig);
r.shutdown(); r.shutdown();
} }
private RedisProcess redisTestSmallMemory() throws IOException, InterruptedException {
return new RedisRunner()
.maxmemory("1mb")
.nosave()
.randomDir()
.randomPort()
.run();
}
private RedisProcess redisTestConnection() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.run();
}
} }

@ -404,6 +404,8 @@ public class RedissonTopicPatternTest extends RedisDockerTest {
.addNode(master3, slave3); .addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run(); ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(7000);
Config config = new Config(); Config config = new Config();
config.useClusterServers() config.useClusterServers()
.setSubscriptionMode(subscriptionMode) .setSubscriptionMode(subscriptionMode)

@ -3,25 +3,23 @@ package org.redisson;
import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork; import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort; import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports; import com.github.dockerjava.api.model.Ports;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.*; import org.redisson.api.*;
import org.redisson.api.listener.*; import org.redisson.api.listener.*;
import org.redisson.api.redisnode.RedisCluster; import org.redisson.api.redisnode.RedisCluster;
import org.redisson.api.redisnode.RedisClusterMaster; import org.redisson.api.redisnode.RedisClusterMaster;
import org.redisson.api.redisnode.RedisClusterSlave; import org.redisson.api.redisnode.RedisClusterSlave;
import org.redisson.api.redisnode.RedisNodes; import org.redisson.api.redisnode.RedisNodes;
import org.redisson.client.*; import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
@ -30,11 +28,10 @@ import org.redisson.misc.RedisURI;
import org.testcontainers.containers.ContainerState; import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.*;
@ -43,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -87,7 +83,7 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testCluster() throws IOException, InterruptedException { public void testCluster() throws InterruptedException {
GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster") GenericContainer redisCluster = new GenericContainer<>("vishnunair/docker-redis-cluster")
.withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384) .withExposedPorts(6379, 6380, 6381, 6382, 6383, 6384)
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))); .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)));
@ -275,7 +271,7 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testTopicState() throws InterruptedException { public void testTopicState() {
RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
AtomicInteger stringMessageReceived = new AtomicInteger(); AtomicInteger stringMessageReceived = new AtomicInteger();
@ -305,7 +301,7 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testMultiTypeConnection() throws InterruptedException { public void testMultiTypeConnection() {
RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE); RTopic stringTopic = redisson.getTopic("test1", StringCodec.INSTANCE);
AtomicBoolean stringMessageReceived = new AtomicBoolean(); AtomicBoolean stringMessageReceived = new AtomicBoolean();
stringTopic.addListener(String.class, new MessageListener<String>() { stringTopic.addListener(String.class, new MessageListener<String>() {
@ -585,7 +581,7 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testRemoveAllListeners2() throws InterruptedException { public void testRemoveAllListeners2() {
RTopic topic1 = redisson.getTopic("topic1"); RTopic topic1 = redisson.getTopic("topic1");
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
@ -605,7 +601,7 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testRemoveByInstance() throws InterruptedException { public void testRemoveByInstance() {
RTopic topic1 = redisson.getTopic("topic1"); RTopic topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() { MessageListener listener = new MessageListener() {
@Override @Override
@ -724,9 +720,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test @Test
public void testReattach() throws Exception { public void testReattach() throws Exception {
GenericContainer<?> redis = GenericContainer<?> redis = createRedis();
new GenericContainer<>("redis:7.2")
.withExposedPorts(6379);
redis.start(); redis.start();
Config config = new Config(); Config config = new Config();
@ -774,9 +768,7 @@ public class RedissonTopicTest extends RedisDockerTest {
@Test @Test
public void testAddListenerFailover() throws Exception { public void testAddListenerFailover() throws Exception {
GenericContainer<?> redis = GenericContainer<?> redis = createRedis();
new GenericContainer<>("redis:7.2")
.withExposedPorts(6379);
redis.start(); redis.start();
Config config = new Config(); Config config = new Config();
@ -908,148 +900,17 @@ public class RedissonTopicTest extends RedisDockerTest {
assertThat(status.peekLast()).isEqualTo("ok"); assertThat(status.peekLast()).isEqualTo("ok");
executor1.shutdown(); executor1.shutdown();
try {
assertThat(executor1.awaitTermination(20, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
redissonClient.shutdown(); redissonClient.shutdown();
}, 1); }, 1);
} }
private void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("redis")
.withExposedPorts(6379);
master.start();
assert master.getNetwork() == network;
int masterPort = master.getFirstMappedPort();
master.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
cmd.getExposedPorts()[0]));
});
nodes.add(master);
for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave" + i)
.withExposedPorts(6379);
slave.start();
int slavePort = slave.getFirstMappedPort();
slave.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
cmd.getExposedPorts()[0]));
});
nodes.add(slave);
}
GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
int sentinel1Port = sentinel1.getFirstMappedPort();
sentinel1.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel1);
GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
int sentinel2Port = sentinel2.getFirstMappedPort();
sentinel2.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel2);
GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
int sentinel3Port = sentinel3.getFirstMappedPort();
sentinel3.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel3);
Thread.sleep(5000);
Config config = new Config();
config.setProtocol(protocol);
config.useSentinelServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
}
Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if ("redis".equals(uri.getHost())
&& node.getNetworkAliases().contains(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");
callback.accept(nodes, config);
nodes.forEach(n -> n.stop());
network.close();
}
@Test @Test
public void testReattachInSentinel() throws Exception { public void testReattachInSentinel() throws Exception {
withSentinel((nodes, config) -> { withSentinel((nodes, config) -> {
@ -1289,7 +1150,7 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testClusterSharding() throws IOException, InterruptedException { public void testClusterSharding() {
testInCluster(redisson -> { testInCluster(redisson -> {
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -1454,6 +1315,11 @@ public class RedissonTopicTest extends RedisDockerTest {
assertThat(status.peekLast()).isEqualTo("ok"); assertThat(status.peekLast()).isEqualTo("ok");
executor1.shutdown(); executor1.shutdown();
try {
assertThat(executor1.awaitTermination(20, TimeUnit.SECONDS)).isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
redissonClient.shutdown(); redissonClient.shutdown();
@ -1461,26 +1327,8 @@ public class RedissonTopicTest extends RedisDockerTest {
} }
@Test @Test
public void testReattachInClusterMaster2() throws Exception { public void testReattachInClusterMaster2() {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave(); withClusterFailover(redisson -> {
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(7000);
Config config = new Config();
config.useClusterServers()
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
Queue<String> messages = new ConcurrentLinkedQueue<>(); Queue<String> messages = new ConcurrentLinkedQueue<>();
Queue<String> subscriptions = new ConcurrentLinkedQueue<>(); Queue<String> subscriptions = new ConcurrentLinkedQueue<>();
@ -1503,20 +1351,31 @@ public class RedissonTopicTest extends RedisDockerTest {
topic.addListener(String.class, (channel, msg) -> messages.add(msg)); topic.addListener(String.class, (channel, msg) -> messages.add(msg));
} }
RedisRunner.RedisProcess master = process.getNodes().stream().filter(x -> x.getRedisServerPort() == master1.getPort()).findFirst().get(); RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
master.stop(); Optional<RedisClusterMaster> f = rnc.getMasters().stream().findFirst();
RedisClusterMaster master = f.get();
Thread.sleep(TimeUnit.SECONDS.toMillis(40)); RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
c.connect().async(RedisCommands.SHUTDOWN);
c.shutdown();
assertThat(subscriptions).hasSize(140); Awaitility.waitAtMost(Duration.ofSeconds(40)).untilAsserted(() -> {
assertThat(subscriptions).hasSizeGreaterThan(125);
});
for (int i = 0; i < topicsAmount; i++) { for (int i = 0; i < topicsAmount; i++) {
RTopic topic = redisson.getTopic("topic" + i); RTopic topic = redisson.getTopic("topic" + i);
topic.publish("topic" + i); topic.publish("topic" + i);
} }
try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(messages).hasSize(topicsAmount); assertThat(messages).hasSize(topicsAmount);
});
} }
public void withCluster(Consumer<RedissonClient> callback) { public void withCluster(Consumer<RedissonClient> callback) {
@ -1545,9 +1404,11 @@ public class RedissonTopicTest extends RedisDockerTest {
redisCluster.stop(); redisCluster.stop();
} }
public void withCluster2(Consumer<RedissonClient> callback) { protected void withClusterFailover(Consumer<RedissonClient> callback) {
List<InspectContainerResponse> nodes = new ArrayList<>(); List<InspectContainerResponse> nodes = new ArrayList<>();
LogMessageWaitStrategy wait2 = new LogMessageWaitStrategy().withRegEx(".*REPLICA\ssync\\:\sFinished\swith\ssuccess.*");
DockerComposeContainer environment = DockerComposeContainer environment =
new DockerComposeContainer(new File("src/test/resources/docker-compose.yml")) new DockerComposeContainer(new File("src/test/resources/docker-compose.yml"))
.withExposedService("redis-node-0", 6379) .withExposedService("redis-node-0", 6379)
@ -1555,19 +1416,17 @@ public class RedissonTopicTest extends RedisDockerTest {
.withExposedService("redis-node-2", 6379) .withExposedService("redis-node-2", 6379)
.withExposedService("redis-node-3", 6379) .withExposedService("redis-node-3", 6379)
.withExposedService("redis-node-4", 6379) .withExposedService("redis-node-4", 6379)
.withExposedService("redis-node-5", 6379) .withExposedService("redis-node-5", 6379, wait2);
.withExposedService("redis-node-6", 6379)
.withExposedService("redis-node-7", 6379);
environment.start(); environment.start();
try { try {
Thread.sleep(25000); Thread.sleep(5000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
for (int i = 0; i < 8; i++) { for (int i = 0; i < 6; i++) {
Optional<ContainerState> cc = environment.getContainerByServiceName("redis-node-" + i); Optional<ContainerState> cc = environment.getContainerByServiceName("redis-node-" + i);
nodes.add(cc.get().getContainerInfo()); nodes.add(cc.get().getContainerInfo());
} }
@ -1599,44 +1458,18 @@ public class RedissonTopicTest extends RedisDockerTest {
RedissonClient redisson = Redisson.create(config); RedissonClient redisson = Redisson.create(config);
RedisCluster nodes2 = redisson.getRedisNodes(RedisNodes.CLUSTER);
for (RedisClusterSlave slave : nodes2.getSlaves()) {
slave.setConfig("cluster-node-timeout", "1000");
}
for (RedisClusterMaster master : nodes2.getMasters()) {
master.setConfig("cluster-node-timeout", "1000");
}
callback.accept(redisson); callback.accept(redisson);
redisson.shutdown(); redisson.shutdown();
environment.stop(); environment.stop();
} }
@Test @Test
public void testReattachInClusterMaster() throws Exception { public void testReattachInClusterMaster() {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); withClusterFailover(redissonClient -> {
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); Config cfg = redissonClient.getConfig();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); cfg.useClusterServers().setSubscriptionMode(SubscriptionMode.MASTER);
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setSubscriptionMode(SubscriptionMode.MASTER)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedissonClient redisson = Redisson.create(cfg);
final AtomicBoolean executed = new AtomicBoolean(); final AtomicBoolean executed = new AtomicBoolean();
final AtomicInteger subscriptions = new AtomicInteger(); final AtomicInteger subscriptions = new AtomicInteger();
@ -1661,18 +1494,24 @@ public class RedissonTopicTest extends RedisDockerTest {
sendCommands(redisson, "3"); sendCommands(redisson, "3");
process.getNodes().stream().filter(x -> master1.getPort() == x.getRedisServerPort()) RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
.forEach(x -> { for (RedisClusterMaster master : rnc.getMasters()) {
try { RedisClientConfig cc = new RedisClientConfig();
x.stop(); cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
Thread.sleep(18000); RedisClient c = RedisClient.create(cc);
} catch (InterruptedException e) { RedisConnection cn = c.connect();
// TODO Auto-generated catch block List<String> channels = cn.sync(RedisCommands.PUBSUB_CHANNELS);
e.printStackTrace(); if (channels.contains("3")) {
cn.async(RedisCommands.SHUTDOWN);
}
c.shutdown();
} }
});
try {
Thread.sleep(25000); Thread.sleep(25000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
redisson.getTopic("3").publish(1); redisson.getTopic("3").publish(1);
@ -1680,31 +1519,24 @@ public class RedissonTopicTest extends RedisDockerTest {
assertThat(executed.get()).isTrue(); assertThat(executed.get()).isTrue();
redisson.shutdown(); redisson.shutdown();
process.shutdown(); });
} }
@Test @Test
public void testReattachPatternTopicListenersOnClusterFailover() throws Exception { public void testReattachPatternTopicListenersOnClusterFailover() {
final KEYSPACE_EVENTS_OPTIONS keyspaceEvents[] = withClusterFailover(redisson -> {
{KEYSPACE_EVENTS_OPTIONS.K, KEYSPACE_EVENTS_OPTIONS.E, KEYSPACE_EVENTS_OPTIONS.A}; RedisCluster nodes = redisson.getRedisNodes(RedisNodes.CLUSTER);
final RedisRunner master = new RedisRunner().randomPort().randomDir().nosave() for (RedisClusterMaster master : nodes.getMasters()) {
.notifyKeyspaceEvents(keyspaceEvents); master.setConfig("notify-keyspace-events", "K$");
final RedisRunner slave = new RedisRunner().randomPort().randomDir().nosave() }
.notifyKeyspaceEvents(keyspaceEvents); for (RedisClusterSlave slave : nodes.getSlaves()) {
slave.setConfig("notify-keyspace-events", "K$");
final ClusterRunner clusterRunner = new ClusterRunner().addNode(master, slave); }
final ClusterProcesses process = clusterRunner.run();
final Config config = new Config();
config.useClusterServers().addNodeAddress(
process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
final RedissonClient redisson = Redisson.create(config);
final AtomicInteger subscriptions = new AtomicInteger(); AtomicInteger subscriptions = new AtomicInteger();
final AtomicInteger messagesReceived = new AtomicInteger(); AtomicInteger messagesReceived = new AtomicInteger();
final RPatternTopic topic = RPatternTopic topic =
redisson.getPatternTopic("__keyspace*__:i*", StringCodec.INSTANCE); redisson.getPatternTopic("__keyspace*__:i*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() { topic.addListener(new PatternStatusListener() {
@Override @Override
@ -1719,42 +1551,46 @@ public class RedissonTopicTest extends RedisDockerTest {
(pattern, channel, msg) -> messagesReceived.incrementAndGet()); (pattern, channel, msg) -> messagesReceived.incrementAndGet());
assertThat(subscriptions.get()).isEqualTo(1); assertThat(subscriptions.get()).isEqualTo(1);
try {
sendCommands(redisson, "dummy").join(); sendCommands(redisson, "dummy").join();
await().atMost(30, TimeUnit.SECONDS).until(() -> messagesReceived.get() == 100); } catch (InterruptedException e) {
throw new RuntimeException(e);
failover(process, master, slave);
redisson.getBucket("i100").set("");
await().atMost(30, TimeUnit.SECONDS).until(() -> subscriptions.get() == 2);
await().atMost(5, TimeUnit.SECONDS).until(() -> messagesReceived.get() == 101);
redisson.shutdown();
process.shutdown();
} }
await().atMost(30, TimeUnit.SECONDS).until(() -> {
return messagesReceived.get() == 100;
});
private void failover(ClusterProcesses processes, RedisRunner master, RedisRunner slave) RedisCluster rnc = redisson.getRedisNodes(RedisNodes.CLUSTER);
throws InterruptedException { for (RedisClusterMaster master : rnc.getMasters()) {
final RedisClient masterClient = connect(processes, master); RedisClientConfig cc = new RedisClientConfig();
cc.setAddress("redis://" + master.getAddr().getHostString() + ":" + master.getAddr().getPort());
RedisClient c = RedisClient.create(cc);
RedisConnection cn = c.connect();
try { try {
masterClient.connect().sync(new RedisStrictCommand<Void>("DEBUG", "SEGFAULT")); Boolean res = cn.sync(RedisCommands.EXISTS, "i99");
} catch (RedisTimeoutException e) { if (res) {
// node goes down, so this command times out waiting for the response cn.async(RedisCommands.SHUTDOWN);
}
} catch (Exception e) {
// skip
} }
Thread.sleep(java.time.Duration.ofSeconds(25).toMillis()); c.shutdown();
}
await().atMost(30, TimeUnit.SECONDS).until(() -> {
return subscriptions.get() == 2;
});
final RedisClient slaveClient = connect(processes, slave); for (RedisClusterMaster master : nodes.getMasters()) {
slaveClient.connect().sync(new RedisStrictCommand<Void>("CLUSTER", "FAILOVER"), "TAKEOVER"); master.setConfig("notify-keyspace-events", "K$");
Thread.sleep(java.time.Duration.ofSeconds(25).toMillis());
} }
private RedisClient connect(ClusterProcesses processes, RedisRunner runner) { redisson.getBucket("i99").set("");
return RedisClient.create(new RedisClientConfig() await().atMost(1, TimeUnit.SECONDS).until(() -> {
.setAddress(processes.getNodes().stream() System.out.println("messagesReceived.get() " + messagesReceived.get());
.filter(node -> node.getRedisServerPort() == runner.getPort()) return messagesReceived.get() == 101;
.findFirst() });
.map(RedisProcess::getRedisServerAddressAndPort) });
.orElseThrow(() -> new IllegalArgumentException(
"Failed to find node running at port: " + runner.getPort()
+ " in cluster processes"))));
} }
} }

@ -8,6 +8,7 @@ import java.util.Map;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.RedisDockerTest;
import org.redisson.RedisRunner; import org.redisson.RedisRunner;
import org.redisson.RedisRunner.RedisProcess; import org.redisson.RedisRunner.RedisProcess;
import org.redisson.Redisson; import org.redisson.Redisson;
@ -15,28 +16,33 @@ import org.redisson.api.RedissonClient;
import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.WriteRedisConnectionException;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.testcontainers.containers.GenericContainer;
public class WeightedRoundRobinBalancerTest { public class WeightedRoundRobinBalancerTest extends RedisDockerTest {
@Test @Test
public void testUseMasterForReadsIfNoConnectionsToSlaves() { public void testUseMasterForReadsIfNoConnectionsToSlaves() {
Assertions.assertThrows(WriteRedisConnectionException.class, () -> { GenericContainer<?> master = null;
RedisProcess master = null; GenericContainer<?> slave = null;
RedisProcess slave = null;
RedissonClient client = null; RedissonClient client = null;
try { try {
master = redisTestInstance(); master = createRedis();
slave = redisTestInstance(); master.start();
slave = createRedis();
slave.start();
String masterurl = "redis://" + master.getHost() + ":" + master.getFirstMappedPort();
String slaveurl = "redis://" + slave.getHost() + ":" + slave.getFirstMappedPort();
Map<String, Integer> weights = new HashMap<>(); Map<String, Integer> weights = new HashMap<>();
weights.put(master.getRedisServerAddressAndPort(), 1); weights.put(masterurl, 1);
weights.put(slave.getRedisServerAddressAndPort(), 2); weights.put(slaveurl, 2);
Config config = new Config(); Config config = new Config();
config.useMasterSlaveServers() config.useMasterSlaveServers()
.setReadMode(ReadMode.SLAVE) .setReadMode(ReadMode.SLAVE)
.setMasterAddress(master.getRedisServerAddressAndPort()) .setMasterAddress(masterurl)
.addSlaveAddress(slave.getRedisServerAddressAndPort()) .addSlaveAddress(slaveurl)
.setLoadBalancer(new WeightedRoundRobinBalancer(weights, 1)); .setLoadBalancer(new WeightedRoundRobinBalancer(weights, 1));
client = Redisson.create(config); client = Redisson.create(config);
@ -47,7 +53,9 @@ public class WeightedRoundRobinBalancerTest {
slave.stop(); slave.stop();
RedissonClient clientCopy = client; RedissonClient clientCopy = client;
assertThat(clientCopy.getBucket("key").get()).isNull(); Assertions.assertThrows(WriteRedisConnectionException.class, () -> {
clientCopy.getBucket("key").get();
});
} finally { } finally {
if (master != null) { if (master != null) {
master.stop(); master.stop();
@ -59,14 +67,6 @@ public class WeightedRoundRobinBalancerTest {
client.shutdown(); client.shutdown();
} }
} }
});
} }
private RedisProcess redisTestInstance() throws IOException, InterruptedException {
return new RedisRunner()
.nosave()
.randomDir()
.randomPort()
.run();
}
} }

@ -1,33 +1,28 @@
package org.redisson.executor; package org.redisson.executor;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import mockit.Invocation; import mockit.Invocation;
import mockit.Mock; import mockit.Mock;
import mockit.MockUp; import mockit.MockUp;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.redisson.*; import org.redisson.RedisDockerTest;
import org.redisson.Redisson;
import org.redisson.RedissonNode;
import org.redisson.api.*; import org.redisson.api.*;
import org.redisson.api.annotation.RInject; import org.redisson.api.annotation.RInject;
import org.redisson.api.executor.TaskFinishedListener; import org.redisson.api.executor.TaskFinishedListener;
import org.redisson.api.executor.TaskStartedListener; import org.redisson.api.executor.TaskStartedListener;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.RedissonNodeConfig; import org.redisson.config.RedissonNodeConfig;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
@ -37,7 +32,7 @@ public class RedissonExecutorServiceTest extends RedisDockerTest {
private static RedissonNode node; private static RedissonNode node;
@BeforeEach @BeforeEach
public void before() throws IOException, InterruptedException { public void before() {
Config config = createConfig(); Config config = createConfig();
RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config); RedissonNodeConfig nodeConfig = new RedissonNodeConfig(config);
nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1)); nodeConfig.setExecutorServiceWorkers(Collections.singletonMap("test", 1));
@ -674,140 +669,4 @@ public class RedissonExecutorServiceTest extends RedisDockerTest {
}); });
} }
private void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("redis")
.withExposedPorts(6379);
master.start();
assert master.getNetwork() == network;
int masterPort = master.getFirstMappedPort();
master.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
cmd.getExposedPorts()[0]));
});
nodes.add(master);
for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave" + i)
.withExposedPorts(6379);
slave.start();
int slavePort = slave.getFirstMappedPort();
slave.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
cmd.getExposedPorts()[0]));
});
nodes.add(slave);
}
GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
int sentinel1Port = sentinel1.getFirstMappedPort();
sentinel1.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel1);
GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
int sentinel2Port = sentinel2.getFirstMappedPort();
sentinel2.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel2);
GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
int sentinel3Port = sentinel3.getFirstMappedPort();
sentinel3.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel3);
Thread.sleep(5000);
Config config = new Config();
config.setProtocol(protocol);
config.useSentinelServers()
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
}
Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (uri.getPort() == 6379 && node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if ("redis".equals(uri.getHost())
&& node.getNetworkAliases().contains(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");
callback.accept(nodes, config);
nodes.forEach(n -> n.stop());
network.close();
}
} }

@ -0,0 +1,100 @@
# Copyright VMware, Inc.
# SPDX-License-Identifier: APACHE-2.0
version: '2'
networks:
app-tier:
driver: bridge
services:
redis-node-0:
image: docker.io/bitnami/redis-cluster:7.2.4
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
ports:
- '6379'
networks:
- app-tier
redis-node-1:
image: docker.io/bitnami/redis-cluster:7.2.4
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
ports:
- '6379'
networks:
- app-tier
redis-node-2:
image: docker.io/bitnami/redis-cluster:7.2.4
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
ports:
- '6379'
networks:
- app-tier
redis-node-3:
image: docker.io/bitnami/redis-cluster:7.2.4
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
ports:
- '6379'
networks:
- app-tier
redis-node-4:
image: docker.io/bitnami/redis-cluster:7.2.4
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
ports:
- '6379'
networks:
- app-tier
# redis-node-5:
# image: docker.io/bitnami/redis-cluster:7.2.4
# environment:
# - 'ALLOW_EMPTY_PASSWORD=yes'
# - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
# ports:
# - '6379'
# networks:
# - app-tier
# redis-node-6:
# image: docker.io/bitnami/redis-cluster:7.2.4
# environment:
# - 'ALLOW_EMPTY_PASSWORD=yes'
# - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
# ports:
# - '6379'
# networks:
# - app-tier
redis-node-5:
image: docker.io/bitnami/redis-cluster:7.2.4
depends_on:
- redis-node-0
- redis-node-1
- redis-node-2
- redis-node-3
- redis-node-4
# - redis-node-5
# - redis-node-6
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_CLUSTER_REPLICAS=1'
- 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5'
# - 'REDIS_NODES=redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5 redis-node-6 redis-node-7'
- 'REDIS_CLUSTER_CREATOR=yes'
ports:
- '6379'
networks:
- app-tier
Loading…
Cancel
Save