JCache expiration listener doesn't work

pull/762/head
Nikita 8 years ago
parent 14fb190772
commit bcc09d9d43

@ -155,10 +155,6 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
return "jcache_updated_sync_channel:{" + getName() + "}"; return "jcache_updated_sync_channel:{" + getName() + "}";
} }
String getExpiredSyncChannelName() {
return "jcache_expired_sync_channel:{" + getName() + "}";
}
String getRemovedSyncChannelName() { String getRemovedSyncChannelName() {
return "jcache_removed_sync_channel:{" + getName() + "}"; return "jcache_removed_sync_channel:{" + getName() + "}";
} }
@ -2365,22 +2361,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
} }
if (CacheEntryExpiredListener.class.isAssignableFrom(listener.getClass())) { if (CacheEntryExpiredListener.class.isAssignableFrom(listener.getClass())) {
String channelName = getExpiredChannelName(); String channelName = getExpiredChannelName();
if (sync) {
channelName = getExpiredSyncChannelName();
}
RTopic<List<Object>> topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, sync)); RTopic<List<Object>> topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, false));
int listenerId = topic.addListener(new MessageListener<List<Object>>() { int listenerId = topic.addListener(new MessageListener<List<Object>>() {
@Override @Override
public void onMessage(String channel, List<Object> msg) { public void onMessage(String channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1)); JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1));
try { if (filter == null || filter.evaluate(event)) {
if (filter == null || filter.evaluate(event)) { List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event); ((CacheEntryExpiredListener<K, V>) listener).onExpired(events);
((CacheEntryExpiredListener<K, V>) listener).onExpired(events);
}
} finally {
sendSync(sync, msg);
} }
} }
}); });

@ -0,0 +1,113 @@
package org.redisson.jcache;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.BaseTest;
import org.redisson.api.RMap;
public class JCacheTest extends BaseTest {
@Test
public void testMapPutGet() throws InterruptedException, IllegalArgumentException, URISyntaxException {
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
long startTime = System.nanoTime();
cache.get("123");
long spentTime = System.nanoTime() - startTime;
System.out.println("get spentTime: " + spentTime);
}
@Test
public void testMapPutGet2() throws InterruptedException, IllegalArgumentException, URISyntaxException {
RMap<String, String> cache = redisson.getMap("test");
long startTime = System.nanoTime();
cache.put("123", "90");
long spentTime = System.nanoTime() - startTime;
System.out.println("put spentTime: " + spentTime);
startTime = System.nanoTime();
cache.get("123");
spentTime = System.nanoTime() - startTime;
System.out.println("get spentTime: " + spentTime);
}
// @Test
public void testExpiration() throws InterruptedException, IllegalArgumentException, URISyntaxException {
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
ExpiredListener clientListener = new ExpiredListener(latch, key, "90");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<String, String>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
cache.put(key, "90");
Assert.assertNotNull(cache.get(key));
latch.await();
Assert.assertNull(cache.get(key));
}
public static class ExpiredListener implements CacheEntryExpiredListener<String, String>, Serializable {
private Object key;
private Object value;
private CountDownLatch latch;
public ExpiredListener(CountDownLatch latch, Object key, Object value) {
super();
this.latch = latch;
this.key = key;
this.value = value;
}
@Override
public void onExpired(Iterable<CacheEntryEvent<? extends String, ? extends String>> events)
throws CacheEntryListenerException {
CacheEntryEvent<? extends String, ? extends String> entry = events.iterator().next();
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getValue()).isEqualTo(value);
latch.countDown();
}
}
}

@ -1,7 +1,5 @@
{ {
"singleServerConfig":{ "singleServerConfig":{
"address":[ "address": "redis://127.0.0.1:6379"
"//127.0.0.1:6379"
]
} }
} }
Loading…
Cancel
Save