diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 26309b254..f20330741 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -280,7 +280,7 @@ public class PublishSubscribeService { return; } - long newTimeout = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long newTimeout = timeout - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); subscribeNoTimeout(codec, channelName, entry, clientEntry, promise, type, lock, new AtomicInteger(), listeners); timeout(promise, newTimeout); }); diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index dd42760b8..7cac270ec 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -30,6 +30,9 @@ import java.io.IOException; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -143,6 +146,26 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { assertThat(entries.values()).containsOnly(null, null); } + @Test + public void testSubscriptionTimeout() { + Config config = new Config(); + config.useSingleServer() + .setSubscriptionsPerConnection(2) + .setSubscriptionConnectionPoolSize(1) + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + RLocalCachedMap m1 = redisson.getLocalCachedMap("pubsub_test1", LocalCachedMapOptions.defaults()); + ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); + e.schedule(() -> { + m1.destroy(); + }, 1, TimeUnit.SECONDS); + + RLocalCachedMap m2 = redisson.getLocalCachedMap("pubsub_test2", LocalCachedMapOptions.defaults()); + + redisson.shutdown(); + } + @Test public void testExpiration() throws IOException, InterruptedException { RedisRunner.RedisProcess instance = new RedisRunner() @@ -520,10 +543,10 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { .cacheSize(5) .syncStrategy(SyncStrategy.INVALIDATE); - RLocalCachedMap map1 = redisson.getLocalCachedMap("test", options); + RLocalCachedMap map1 = redisson.getLocalCachedMap("testLocalCacheClear", options); Map cache1 = map1.getCachedMap(); - RLocalCachedMap map2 = redisson.getLocalCachedMap("test", options); + RLocalCachedMap map2 = redisson.getLocalCachedMap("testLocalCacheClear", options); Map cache2 = map2.getCachedMap(); map1.put("1", 1);