test added

pull/5863/head
Nikita Koksharov 10 months ago
parent a82bd11f3e
commit 05c5b8a637

@ -26,10 +26,28 @@
<version>1.4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.hazendaz.jmockit</groupId>
<artifactId>jmockit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkCount>4</forkCount>
<reuseForks>true</reuseForks>
<argLine>${argLine} -javaagent:"${settings.localRepository}"/com/github/hazendaz/jmockit/jmockit/1.52.0/jmockit-1.52.0.jar</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.1</version>

@ -1,5 +1,9 @@
package org.redisson;
import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.NatMapper;
@ -9,19 +13,22 @@ import org.redisson.config.Protocol;
import org.redisson.misc.RedisURI;
import org.redisson.spring.data.connection.RedissonClusterConnection;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class BaseTest {
protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events";
private static final GenericContainer<?> REDIS = createRedis();
protected static final GenericContainer<?> REDIS = createRedis();
protected static final Protocol protocol = Protocol.RESP2;
@ -130,6 +137,141 @@ public abstract class BaseTest {
redissonCallback.accept(new RedissonClusterConnection(redissonCluster));
}
protected void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "master")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("redis")
.withExposedPorts(6379);
master.start();
assert master.getNetwork() == network;
int masterPort = master.getFirstMappedPort();
master.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
cmd.getExposedPorts()[0]));
});
nodes.add(master);
for (int i = 0; i < slaves; i++) {
GenericContainer<?> slave =
new GenericContainer<>("bitnami/redis:7.2.4")
.withNetwork(network)
.withEnv("REDIS_REPLICATION_MODE", "slave")
.withEnv("REDIS_MASTER_HOST", "redis")
.withEnv("ALLOW_EMPTY_PASSWORD", "yes")
.withNetworkAliases("slave" + i)
.withExposedPorts(6379);
slave.start();
int slavePort = slave.getFirstMappedPort();
slave.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
cmd.getExposedPorts()[0]));
});
nodes.add(slave);
}
GenericContainer<?> sentinel1 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel1")
.withExposedPorts(26379);
sentinel1.start();
int sentinel1Port = sentinel1.getFirstMappedPort();
sentinel1.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel1);
GenericContainer<?> sentinel2 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel2")
.withExposedPorts(26379);
sentinel2.start();
int sentinel2Port = sentinel2.getFirstMappedPort();
sentinel2.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel2);
GenericContainer<?> sentinel3 =
new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
.withNetwork(network)
.withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
.withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
.withNetworkAliases("sentinel3")
.withExposedPorts(26379);
sentinel3.start();
int sentinel3Port = sentinel3.getFirstMappedPort();
sentinel3.withCreateContainerCmdModifier(cmd -> {
cmd.getHostConfig().withPortBindings(
new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
cmd.getExposedPorts()[0]));
});
nodes.add(sentinel3);
Thread.sleep(5000);
Config config = new Config();
config.setProtocol(protocol);
config.useSentinelServers()
.setPingConnectionInterval(0)
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
}
Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
.getPorts().getBindings().get(new ExposedPort(uri.getPort()));
Map<String, ContainerNetwork> ss = node.getContainerInfo().getNetworkSettings().getNetworks();
ContainerNetwork s = ss.values().iterator().next();
if (uri.getPort() == 6379
&& !uri.getHost().equals("redis")
&& BaseTest.this.getClass() == BaseTest.class
&& node.getNetworkAliases().contains("slave0")) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
if (mappedPort != null
&& s.getIpAddress().equals(uri.getHost())) {
return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
}
}
return uri;
}
})
.addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
.setMasterName("mymaster");
callback.accept(nodes, config);
nodes.forEach(n -> n.stop());
network.close();
}
@Before
public void beforeEach() {
redisson.getKeys().flushall();

@ -1,10 +1,26 @@
package org.redisson.spring.data.connection;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.protocol.CommandData;
import org.redisson.connection.ClientConnectionsEntry;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
@ -13,6 +29,105 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RedissonStreamTest extends BaseConnectionTest {
@Test
public void testReattachment() throws InterruptedException {
withSentinel((nodes, config) -> {
RedissonClient redissonClient = Redisson.create(config);
RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redissonClient);
StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());
Consumer consumer = Consumer.from("group", "consumer1");
StreamOffset<String> streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
String channel = "test";
AtomicInteger counter = new AtomicInteger();
Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
listener(redisConnectionFactory, channel, consumer, counter));
StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
t1.opsForStream().createGroup("test", "group");
listenerContainer.start();
AtomicReference<Boolean> invoked = new AtomicReference<>();
new MockUp<ClientConnectionsEntry>() {
@Mock
void reattachBlockingQueue(Invocation inv, CommandData<?, ?> commandData) {
try {
inv.proceed(commandData);
invoked.compareAndSet(null, true);
} catch (Exception e) {
e.printStackTrace();
invoked.set(false);
throw e;
}
}
};
for (int i = 0; i < 3; i++) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
ObjectRecord<String, String> record = StreamRecords.newRecord()
.ofObject("message")
.withStreamKey(channel);
RecordId recordId = stringRedisTemplate.opsForStream().add(record);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
nodes.get(0).stop();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
assertThat(invoked.get()).isTrue();
Assertions.assertThat(counter.get()).isEqualTo(3);
listenerContainer.stop();
redissonClient.shutdown();
}, 2);
}
private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> getOptions() {
return StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(String.class)
.build();
}
private StreamMessageListenerContainer.StreamReadRequest<String> getReadRequest(Consumer consumer, StreamOffset<String> streamOffset) {
return StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset)
.consumer(consumer)
.autoAcknowledge(false)
.cancelOnError((err) -> false) // do not stop consuming after error
.build();
}
private <T> StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) {
return message -> {
try {
System.out.println("Acknowledging message: " + message.getId());
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
System.out.println("RECEIVED " + consumer + " " + message);
counter.incrementAndGet();
} catch(Exception e) {
e.printStackTrace();
}
};
}
@Test
public void testPending() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);

@ -163,7 +163,7 @@ public class ClientConnectionsEntry {
connectionsHolder.getAllConnections().clear();
}
private void reattachBlockingQueue(CommandData<?, ?> commandData) {
void reattachBlockingQueue(CommandData<?, ?> commandData) {
if (commandData == null
|| !commandData.isBlockingCommand()
|| commandData.getPromise().isDone()) {

Loading…
Cancel
Save