From 018291395655494d472ffc34cea8408ff4d2e047 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 24 Nov 2023 09:49:29 +0300 Subject: [PATCH] Fixed - EntryExpiredListener isn't invoked by RMapCache instance in Redis Cluster 7+ #5447 --- .../eviction/MapCacheEvictionTask.java | 9 ++- .../org/redisson/RedissonMapCacheTest.java | 59 ++++++++++--------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java index 23a9237df..bc780b5f2 100644 --- a/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/MapCacheEvictionTask.java @@ -44,6 +44,8 @@ public class MapCacheEvictionTask extends EvictionTask { private EvictionScheduler evictionScheduler; + private String publishCommand; + public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName, String expiredChannelName, String lastAccessTimeSetName, CommandAsyncExecutor executor, boolean removeEmpty, EvictionScheduler evictionScheduler) { @@ -56,6 +58,7 @@ public class MapCacheEvictionTask extends EvictionTask { this.executeTaskOnceLatchName = RedissonObject.prefixName("redisson__execute_task_once_latch", name); this.removeEmpty = removeEmpty; this.evictionScheduler = evictionScheduler; + this.publishCommand = executor.getConnectionManager().getSubscribeService().getPublishCommand(); } @Override @@ -77,7 +80,7 @@ public class MapCacheEvictionTask extends EvictionTask { + "if v ~= false then " + "local t, val = struct.unpack('dLc0', v); " + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); " - + "local listeners = redis.call('publish', KEYS[4], msg); " + + "local listeners = redis.call(ARGV[5], KEYS[4], msg); " + "if (listeners == 0) then " + "break;" + "end; " @@ -95,7 +98,7 @@ public class MapCacheEvictionTask extends EvictionTask { + "if v ~= false then " + "local t, val = struct.unpack('dLc0', v); " + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); " - + "local listeners = redis.call('publish', KEYS[4], msg); " + + "local listeners = redis.call(ARGV[5], KEYS[4], msg); " + "if (listeners == 0) then " + "break;" + "end; " @@ -109,7 +112,7 @@ public class MapCacheEvictionTask extends EvictionTask { + "end; " + "return #expiredKeys1 + #expiredKeys2;", Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName), - System.currentTimeMillis(), keysLimit, latchExpireTime, 1); + System.currentTimeMillis(), keysLimit, latchExpireTime, 1, publishCommand); if (removeEmpty) { CompletionStage r = expiredFuture.thenCompose(removed -> { diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 7b99de7ed..4357030a0 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -1,43 +1,29 @@ package org.redisson; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.util.AbstractMap; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - import org.awaitility.Awaitility; import org.joor.Reflect; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.redisson.api.*; import org.redisson.api.MapOptions.WriteMode; -import org.redisson.api.map.event.EntryCreatedListener; -import org.redisson.api.map.event.EntryEvent; -import org.redisson.api.map.event.EntryExpiredListener; -import org.redisson.api.map.event.EntryRemovedListener; -import org.redisson.api.map.event.EntryUpdatedListener; -import org.redisson.client.codec.Codec; -import org.redisson.client.codec.DoubleCodec; -import org.redisson.client.codec.IntegerCodec; -import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.StringCodec; +import org.redisson.api.map.event.*; +import org.redisson.client.codec.*; import org.redisson.codec.CompositeCodec; import org.redisson.config.Config; import org.redisson.eviction.EvictionScheduler; +import java.time.Duration; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + public class RedissonMapCacheTest extends BaseMapTest { @Test @@ -872,6 +858,23 @@ public class RedissonMapCacheTest extends BaseMapTest { map.destroy(); } + @Test + public void testExpirationInCluster() { + testInCluster(r -> { + AtomicBoolean executed = new AtomicBoolean(); + RMapCache map = r.getMapCache("simple"); + map.addListener(new EntryExpiredListener() { + @Override + public void onExpired(EntryEvent event) { + executed.set(true); + } + }); + map.put("1", "2", 1, TimeUnit.SECONDS); + + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(executed.get()).isTrue()); + }); + } + @Test public void testReplaceValueTTL() throws InterruptedException { RMapCache map = redisson.getMapCache("simple");