Fixed - EntryExpiredListener isn't invoked by RMapCache instance in Redis Cluster 7+ #5447

pull/5457/head
Nikita Koksharov 1 year ago
parent 052bc248b1
commit 0182913956

@ -44,6 +44,8 @@ public class MapCacheEvictionTask extends EvictionTask {
private EvictionScheduler evictionScheduler;
private String publishCommand;
public MapCacheEvictionTask(String name, String timeoutSetName, String maxIdleSetName,
String expiredChannelName, String lastAccessTimeSetName, CommandAsyncExecutor executor,
boolean removeEmpty, EvictionScheduler evictionScheduler) {
@ -56,6 +58,7 @@ public class MapCacheEvictionTask extends EvictionTask {
this.executeTaskOnceLatchName = RedissonObject.prefixName("redisson__execute_task_once_latch", name);
this.removeEmpty = removeEmpty;
this.evictionScheduler = evictionScheduler;
this.publishCommand = executor.getConnectionManager().getSubscribeService().getPublishCommand();
}
@Override
@ -77,7 +80,7 @@ public class MapCacheEvictionTask extends EvictionTask {
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "local listeners = redis.call('publish', KEYS[4], msg); "
+ "local listeners = redis.call(ARGV[5], KEYS[4], msg); "
+ "if (listeners == 0) then "
+ "break;"
+ "end; "
@ -95,7 +98,7 @@ public class MapCacheEvictionTask extends EvictionTask {
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "local listeners = redis.call('publish', KEYS[4], msg); "
+ "local listeners = redis.call(ARGV[5], KEYS[4], msg); "
+ "if (listeners == 0) then "
+ "break;"
+ "end; "
@ -109,7 +112,7 @@ public class MapCacheEvictionTask extends EvictionTask {
+ "end; "
+ "return #expiredKeys1 + #expiredKeys2;",
Arrays.asList(name, timeoutSetName, maxIdleSetName, expiredChannelName, lastAccessTimeSetName, executeTaskOnceLatchName),
System.currentTimeMillis(), keysLimit, latchExpireTime, 1);
System.currentTimeMillis(), keysLimit, latchExpireTime, 1, publishCommand);
if (removeEmpty) {
CompletionStage<Integer> r = expiredFuture.thenCompose(removed -> {

@ -1,43 +1,29 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.awaitility.Awaitility;
import org.joor.Reflect;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.redisson.api.*;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.map.event.EntryCreatedListener;
import org.redisson.api.map.event.EntryEvent;
import org.redisson.api.map.event.EntryExpiredListener;
import org.redisson.api.map.event.EntryRemovedListener;
import org.redisson.api.map.event.EntryUpdatedListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.api.map.event.*;
import org.redisson.client.codec.*;
import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config;
import org.redisson.eviction.EvictionScheduler;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
public class RedissonMapCacheTest extends BaseMapTest {
@Test
@ -872,6 +858,23 @@ public class RedissonMapCacheTest extends BaseMapTest {
map.destroy();
}
@Test
public void testExpirationInCluster() {
testInCluster(r -> {
AtomicBoolean executed = new AtomicBoolean();
RMapCache<String, String> map = r.getMapCache("simple");
map.addListener(new EntryExpiredListener() {
@Override
public void onExpired(EntryEvent event) {
executed.set(true);
}
});
map.put("1", "2", 1, TimeUnit.SECONDS);
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(executed.get()).isTrue());
});
}
@Test
public void testReplaceValueTTL() throws InterruptedException {
RMapCache<SimpleKey, SimpleValue> map = redisson.getMapCache("simple");

Loading…
Cancel
Save