diff --git a/redisson-spring-data/redisson-spring-data-32/pom.xml b/redisson-spring-data/redisson-spring-data-32/pom.xml
index 1984d983b..472a67ed1 100644
--- a/redisson-spring-data/redisson-spring-data-32/pom.xml
+++ b/redisson-spring-data/redisson-spring-data-32/pom.xml
@@ -26,10 +26,28 @@
1.4.12
test
+
+
+ com.github.hazendaz.jmockit
+ jmockit
+ test
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+ 4
+ true
+ ${argLine} -javaagent:"${settings.localRepository}"/com/github/hazendaz/jmockit/jmockit/1.52.0/jmockit-1.52.0.jar
+
+
+
maven-jar-plugin
3.4.1
diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java
index 0f00625d8..69e6077a8 100644
--- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java
+++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/BaseTest.java
@@ -1,5 +1,9 @@
package org.redisson;
+import com.github.dockerjava.api.model.ContainerNetwork;
+import com.github.dockerjava.api.model.ExposedPort;
+import com.github.dockerjava.api.model.PortBinding;
+import com.github.dockerjava.api.model.Ports;
import org.junit.Before;
import org.junit.BeforeClass;
import org.redisson.api.NatMapper;
@@ -9,19 +13,22 @@ import org.redisson.config.Protocol;
import org.redisson.misc.RedisURI;
import org.redisson.spring.data.connection.RedissonClusterConnection;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
public abstract class BaseTest {
protected static final String NOTIFY_KEYSPACE_EVENTS = "--notify-keyspace-events";
- private static final GenericContainer> REDIS = createRedis();
+ protected static final GenericContainer> REDIS = createRedis();
protected static final Protocol protocol = Protocol.RESP2;
@@ -130,6 +137,141 @@ public abstract class BaseTest {
redissonCallback.accept(new RedissonClusterConnection(redissonCluster));
}
+ protected void withSentinel(BiConsumer>, Config> callback, int slaves) throws InterruptedException {
+ Network network = Network.newNetwork();
+
+ List>> nodes = new ArrayList<>();
+
+ GenericContainer> master =
+ new GenericContainer<>("bitnami/redis:7.2.4")
+ .withNetwork(network)
+ .withEnv("REDIS_REPLICATION_MODE", "master")
+ .withEnv("ALLOW_EMPTY_PASSWORD", "yes")
+ .withNetworkAliases("redis")
+ .withExposedPorts(6379);
+ master.start();
+ assert master.getNetwork() == network;
+ int masterPort = master.getFirstMappedPort();
+ master.withCreateContainerCmdModifier(cmd -> {
+ cmd.getHostConfig().withPortBindings(
+ new PortBinding(Ports.Binding.bindPort(Integer.valueOf(masterPort)),
+ cmd.getExposedPorts()[0]));
+ });
+ nodes.add(master);
+
+ for (int i = 0; i < slaves; i++) {
+ GenericContainer> slave =
+ new GenericContainer<>("bitnami/redis:7.2.4")
+ .withNetwork(network)
+ .withEnv("REDIS_REPLICATION_MODE", "slave")
+ .withEnv("REDIS_MASTER_HOST", "redis")
+ .withEnv("ALLOW_EMPTY_PASSWORD", "yes")
+ .withNetworkAliases("slave" + i)
+ .withExposedPorts(6379);
+ slave.start();
+ int slavePort = slave.getFirstMappedPort();
+ slave.withCreateContainerCmdModifier(cmd -> {
+ cmd.getHostConfig().withPortBindings(
+ new PortBinding(Ports.Binding.bindPort(Integer.valueOf(slavePort)),
+ cmd.getExposedPorts()[0]));
+ });
+ nodes.add(slave);
+ }
+
+ GenericContainer> sentinel1 =
+ new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
+
+ .withNetwork(network)
+ .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
+ .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
+ .withNetworkAliases("sentinel1")
+ .withExposedPorts(26379);
+ sentinel1.start();
+ int sentinel1Port = sentinel1.getFirstMappedPort();
+ sentinel1.withCreateContainerCmdModifier(cmd -> {
+ cmd.getHostConfig().withPortBindings(
+ new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel1Port)),
+ cmd.getExposedPorts()[0]));
+ });
+ nodes.add(sentinel1);
+
+ GenericContainer> sentinel2 =
+ new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
+ .withNetwork(network)
+ .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
+ .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
+ .withNetworkAliases("sentinel2")
+ .withExposedPorts(26379);
+ sentinel2.start();
+ int sentinel2Port = sentinel2.getFirstMappedPort();
+ sentinel2.withCreateContainerCmdModifier(cmd -> {
+ cmd.getHostConfig().withPortBindings(
+ new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel2Port)),
+ cmd.getExposedPorts()[0]));
+ });
+ nodes.add(sentinel2);
+
+ GenericContainer> sentinel3 =
+ new GenericContainer<>("bitnami/redis-sentinel:7.2.4")
+ .withNetwork(network)
+ .withEnv("REDIS_SENTINEL_DOWN_AFTER_MILLISECONDS", "5000")
+ .withEnv("REDIS_SENTINEL_FAILOVER_TIMEOUT", "10000")
+ .withNetworkAliases("sentinel3")
+ .withExposedPorts(26379);
+ sentinel3.start();
+ int sentinel3Port = sentinel3.getFirstMappedPort();
+ sentinel3.withCreateContainerCmdModifier(cmd -> {
+ cmd.getHostConfig().withPortBindings(
+ new PortBinding(Ports.Binding.bindPort(Integer.valueOf(sentinel3Port)),
+ cmd.getExposedPorts()[0]));
+ });
+ nodes.add(sentinel3);
+
+ Thread.sleep(5000);
+
+ Config config = new Config();
+ config.setProtocol(protocol);
+ config.useSentinelServers()
+ .setPingConnectionInterval(0)
+ .setNatMapper(new NatMapper() {
+
+ @Override
+ public RedisURI map(RedisURI uri) {
+ for (GenericContainer extends GenericContainer>> node : nodes) {
+ if (node.getContainerInfo() == null) {
+ continue;
+ }
+
+ Ports.Binding[] mappedPort = node.getContainerInfo().getNetworkSettings()
+ .getPorts().getBindings().get(new ExposedPort(uri.getPort()));
+
+ Map ss = node.getContainerInfo().getNetworkSettings().getNetworks();
+ ContainerNetwork s = ss.values().iterator().next();
+
+ if (uri.getPort() == 6379
+ && !uri.getHost().equals("redis")
+ && BaseTest.this.getClass() == BaseTest.class
+ && node.getNetworkAliases().contains("slave0")) {
+ return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
+ }
+
+ if (mappedPort != null
+ && s.getIpAddress().equals(uri.getHost())) {
+ return new RedisURI(uri.getScheme(), "127.0.0.1", Integer.valueOf(mappedPort[0].getHostPortSpec()));
+ }
+ }
+ return uri;
+ }
+ })
+ .addSentinelAddress("redis://127.0.0.1:" + sentinel1.getFirstMappedPort())
+ .setMasterName("mymaster");
+
+ callback.accept(nodes, config);
+
+ nodes.forEach(n -> n.stop());
+ network.close();
+ }
+
@Before
public void beforeEach() {
redisson.getKeys().flushall();
diff --git a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java
index dce81f229..1b7ebaa0a 100644
--- a/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java
+++ b/redisson-spring-data/redisson-spring-data-32/src/test/java/org/redisson/spring/data/connection/RedissonStreamTest.java
@@ -1,10 +1,26 @@
package org.redisson.spring.data.connection;
+import mockit.Invocation;
+import mockit.Mock;
+import mockit.MockUp;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
+import org.redisson.Redisson;
+import org.redisson.api.RedissonClient;
+import org.redisson.client.protocol.CommandData;
+import org.redisson.connection.ClientConnectionsEntry;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.stream.StreamListener;
+import org.springframework.data.redis.stream.StreamMessageListenerContainer;
+import org.springframework.data.redis.stream.Subscription;
+import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
@@ -13,6 +29,105 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RedissonStreamTest extends BaseConnectionTest {
+ @Test
+ public void testReattachment() throws InterruptedException {
+ withSentinel((nodes, config) -> {
+ RedissonClient redissonClient = Redisson.create(config);
+
+ RedisConnectionFactory redisConnectionFactory = new RedissonConnectionFactory(redissonClient);
+
+ StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, getOptions());
+
+
+ Consumer consumer = Consumer.from("group", "consumer1");
+ StreamOffset streamOffset = StreamOffset.create("test", ReadOffset.from(">"));
+ String channel = "test";
+ AtomicInteger counter = new AtomicInteger();
+ Subscription subscription = listenerContainer.register(getReadRequest(consumer, streamOffset),
+ listener(redisConnectionFactory, channel, consumer, counter));
+
+ StringRedisTemplate t1 = new StringRedisTemplate(redisConnectionFactory);
+ t1.opsForStream().createGroup("test", "group");
+
+ listenerContainer.start();
+
+ AtomicReference invoked = new AtomicReference<>();
+
+ new MockUp() {
+
+ @Mock
+ void reattachBlockingQueue(Invocation inv, CommandData, ?> commandData) {
+ try {
+ inv.proceed(commandData);
+ invoked.compareAndSet(null, true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ invoked.set(false);
+ throw e;
+ }
+ }
+ };
+
+ for (int i = 0; i < 3; i++) {
+ StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
+ ObjectRecord record = StreamRecords.newRecord()
+ .ofObject("message")
+ .withStreamKey(channel);
+ RecordId recordId = stringRedisTemplate.opsForStream().add(record);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ nodes.get(0).stop();
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ assertThat(invoked.get()).isTrue();
+
+ Assertions.assertThat(counter.get()).isEqualTo(3);
+ listenerContainer.stop();
+ redissonClient.shutdown();
+ }, 2);
+ }
+
+ private StreamMessageListenerContainer.StreamMessageListenerContainerOptions> getOptions() {
+ return StreamMessageListenerContainer
+ .StreamMessageListenerContainerOptions
+ .builder()
+ .pollTimeout(Duration.ofSeconds(1))
+ .targetType(String.class)
+ .build();
+ }
+
+ private StreamMessageListenerContainer.StreamReadRequest getReadRequest(Consumer consumer, StreamOffset streamOffset) {
+ return StreamMessageListenerContainer.StreamReadRequest
+ .builder(streamOffset)
+ .consumer(consumer)
+ .autoAcknowledge(false)
+ .cancelOnError((err) -> false) // do not stop consuming after error
+ .build();
+ }
+
+ private StreamListener listener(RedisConnectionFactory redisConnectionFactory, String channel, Consumer consumer, AtomicInteger counter) {
+
+ return message -> {
+ try {
+ System.out.println("Acknowledging message: " + message.getId());
+ StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(redisConnectionFactory);
+ stringRedisTemplate.opsForStream().acknowledge(channel, consumer.getGroup(), message.getId());
+ System.out.println("RECEIVED " + consumer + " " + message);
+ counter.incrementAndGet();
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ };
+ }
+
@Test
public void testPending() {
connection.streamCommands().xGroupCreate("test".getBytes(), "testGroup", ReadOffset.latest(), true);
diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java
index cd9146155..4990cf48e 100644
--- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java
+++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java
@@ -163,7 +163,7 @@ public class ClientConnectionsEntry {
connectionsHolder.getAllConnections().clear();
}
- private void reattachBlockingQueue(CommandData, ?> commandData) {
+ void reattachBlockingQueue(CommandData, ?> commandData) {
if (commandData == null
|| !commandData.isBlockingCommand()
|| commandData.getPromise().isDone()) {