From cf241af5c0b8995fbff723594430e16981d8d3c9 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 28 Apr 2023 11:03:02 +0300 Subject: [PATCH] Feature - LocalCacheUpdateListener and LocalCacheInvalidateListener listeners added for RLocalCachedMap object. #4686 --- .../org/redisson/RedissonLocalCachedMap.java | 70 ++++++++++++++----- .../LocalCacheInvalidateListener.java | 36 ++++++++++ .../listener/LocalCacheUpdateListener.java | 36 ++++++++++ .../redisson/cache/LocalCacheListener.java | 46 ++++++++++-- .../org/redisson/cache/LocalCacheView.java | 4 +- .../redisson/RedissonLocalCachedMapTest.java | 21 +++++- 6 files changed, 186 insertions(+), 27 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/listener/LocalCacheInvalidateListener.java create mode 100644 redisson/src/main/java/org/redisson/api/listener/LocalCacheUpdateListener.java diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 1e686e8f9..ff2a6b724 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -16,12 +16,11 @@ package org.redisson; import io.netty.buffer.ByteBuf; -import org.redisson.api.LocalCachedMapOptions; +import org.redisson.api.*; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy; -import org.redisson.api.RFuture; -import org.redisson.api.RLocalCachedMap; -import org.redisson.api.RedissonClient; +import org.redisson.api.listener.LocalCacheInvalidateListener; +import org.redisson.api.listener.LocalCacheUpdateListener; import org.redisson.cache.*; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; @@ -86,11 +85,12 @@ public class RedissonLocalCachedMap extends RedissonMap implements R listener = new LocalCacheListener(getRawName(), commandExecutor, this, codec, options, cacheUpdateLogTime) { @Override - protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException { + protected CacheValue updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException { CacheKey cacheKey = localCacheView.toCacheKey(keyBuf); Object key = codec.getMapKeyDecoder().decode(keyBuf, null); Object value = codec.getMapValueDecoder().decode(valueBuf, null); cachePut(cacheKey, key, value); + return new CacheValue(key, value); } }; @@ -132,8 +132,16 @@ public class RedissonLocalCachedMap extends RedissonMap implements R if (listener.isDisabled(cacheKey)) { return null; } - - return cache.put(cacheKey, new CacheValue(key, value)); + + CacheValue newValue = new CacheValue(key, value); + CacheValue oldValue = cache.put(cacheKey, newValue); + Object oldV = null; + if (oldValue != null) { + oldV = oldValue.getValue(); + } + listener.notifyInvalidate(new CacheValue(key, oldV)); + listener.notifyUpdate(newValue); + return oldValue; } private CacheValue cachePutIfAbsent(CacheKey cacheKey, Object key, Object value) { @@ -185,6 +193,13 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return cache.remove(cacheKey, new CacheValue(key, value)); } + private CacheValue cacheRemove(CacheKey cacheKey) { + CacheValue v = cache.remove(cacheKey); + listener.notifyInvalidate(v); + listener.notifyUpdate(v); + return v; + } + @Override public RFuture sizeAsync() { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { @@ -371,7 +386,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R protected RFuture removeOperationAsync(K key) { ByteBuf keyEncoded = encodeMapKey(key); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - CacheValue value = cache.remove(cacheKey); + CacheValue value = cacheRemove(cacheKey); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { keyEncoded.release(); @@ -417,7 +432,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - cache.remove(cacheKey); + cacheRemove(cacheKey); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); params.add(msgEncoded); } @@ -444,7 +459,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - cache.remove(cacheKey); + cacheRemove(cacheKey); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); params.add(msgEncoded); @@ -473,7 +488,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - cache.remove(cacheKey); + cacheRemove(cacheKey); } RFuture> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, @@ -494,7 +509,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R long count = 0; for (K k : keys) { CacheKey cacheKey = localCacheView.toCacheKey(k); - CacheValue val = cache.remove(cacheKey); + CacheValue val = cacheRemove(cacheKey); if (val != null) { count++; LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()); @@ -511,7 +526,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - cache.remove(cacheKey); + cacheRemove(cacheKey); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); params.add(msgEncoded); } @@ -537,7 +552,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - cache.remove(cacheKey); + cacheRemove(cacheKey); ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); params.add(msgEncoded); @@ -566,7 +581,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(keyEncoded); CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded); - cache.remove(cacheKey); + cacheRemove(cacheKey); } return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.HDEL, params.toArray()); @@ -1173,7 +1188,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CompletionStage f = future.thenApply(res -> { if (res) { CacheKey cacheKey = localCacheView.toCacheKey(key); - cache.remove(cacheKey); + cacheRemove(cacheKey); } return res; }); @@ -1285,4 +1300,27 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return super.entrySet(keyPattern, count); } + @Override + public int addListener(ObjectListener listener) { + if (listener instanceof LocalCacheInvalidateListener) { + return this.listener.addListener((LocalCacheInvalidateListener) listener); + } + if (listener instanceof LocalCacheUpdateListener) { + return this.listener.addListener((LocalCacheUpdateListener) listener); + } + return super.addListener(listener); + } + + @Override + public RFuture addListenerAsync(ObjectListener listener) { + if (listener instanceof LocalCacheInvalidateListener) { + int r = this.listener.addListener((LocalCacheInvalidateListener) listener); + return new CompletableFutureWrapper<>(r); + } + if (listener instanceof LocalCacheUpdateListener) { + int r = this.listener.addListener((LocalCacheUpdateListener) listener); + return new CompletableFutureWrapper<>(r); + } + return super.addListenerAsync(listener); + } } diff --git a/redisson/src/main/java/org/redisson/api/listener/LocalCacheInvalidateListener.java b/redisson/src/main/java/org/redisson/api/listener/LocalCacheInvalidateListener.java new file mode 100644 index 000000000..7cdc8871d --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/LocalCacheInvalidateListener.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for local cache invalidation event published by Redis. + * + * @author Nikita Koksharov + * + */ +public interface LocalCacheInvalidateListener extends ObjectListener { + + /** + * Invoked on event of map entry invalidation + * + * @param key key to remove + * @param value value to remove + */ + void onInvalidate(K key, V value); + +} diff --git a/redisson/src/main/java/org/redisson/api/listener/LocalCacheUpdateListener.java b/redisson/src/main/java/org/redisson/api/listener/LocalCacheUpdateListener.java new file mode 100644 index 000000000..dc83bf5c0 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/listener/LocalCacheUpdateListener.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.listener; + +import org.redisson.api.ObjectListener; + +/** + * Redisson Object Event listener for local cache update event published by Redis. + * + * @author Nikita Koksharov + * + */ +public interface LocalCacheUpdateListener extends ObjectListener { + + /** + * Invoked on event of map entry udpate + * + * @param key key to update + * @param value new value + */ + void onUpdate(K key, V value); + +} diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index 4905bb290..920c9a287 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -25,6 +25,8 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.listener.BaseStatusListener; +import org.redisson.api.listener.LocalCacheInvalidateListener; +import org.redisson.api.listener.LocalCacheUpdateListener; import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; @@ -55,7 +57,7 @@ public abstract class LocalCacheListener { private String name; private CommandAsyncExecutor commandExecutor; - private Map cache; + private Map cache; private RObject object; private byte[] instanceId = new byte[16]; private Codec codec; @@ -66,7 +68,11 @@ public abstract class LocalCacheListener { private RTopic invalidationTopic; private int syncListenerId; private int reconnectionListenerId; - + + private final Map> invalidateListeners = new ConcurrentHashMap<>(); + + private final Map> updateListeners = new ConcurrentHashMap<>(); + public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, RObject object, Codec codec, LocalCachedMapOptions options, long cacheUpdateLogTime) { super(); @@ -133,7 +139,7 @@ public abstract class LocalCacheListener { return disabledKeys.containsKey(key); } - public void add(Map cache) { + public void add(Map cache) { this.cache = cache; invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName()); @@ -200,7 +206,8 @@ public abstract class LocalCacheListener { if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { for (byte[] keyHash : invalidateMsg.getKeyHashes()) { CacheKey key = new CacheKey(keyHash); - cache.remove(key); + CacheValue value = cache.remove(key); + notifyInvalidate(value); } } } @@ -213,7 +220,8 @@ public abstract class LocalCacheListener { ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey()); ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue()); try { - updateCache(keyBuf, valueBuf); + CacheValue value = updateCache(keyBuf, valueBuf); + notifyUpdate(value); } catch (IOException e) { log.error("Can't decode map entry", e); } finally { @@ -245,7 +253,19 @@ public abstract class LocalCacheListener { } } } - + + public void notifyUpdate(CacheValue value) { + for (LocalCacheUpdateListener listener : updateListeners.values()) { + listener.onUpdate(value.getKey(), value.getValue()); + } + } + + public void notifyInvalidate(CacheValue value) { + for (LocalCacheInvalidateListener listener : invalidateListeners.values()) { + listener.onInvalidate(value.getKey(), value.getValue()); + } + } + public RFuture clearLocalCacheAsync() { cache.clear(); if (syncListenerId == 0) { @@ -276,7 +296,7 @@ public abstract class LocalCacheListener { return RedissonObject.suffixName(name, TOPIC_SUFFIX); } - protected abstract void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException; + protected abstract CacheValue updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException; private void disableKeys(final String requestId, final Set keys, long timeout) { for (CacheKey key : keys) { @@ -350,4 +370,16 @@ public abstract class LocalCacheListener { return semaphore; } + public int addListener(LocalCacheInvalidateListener listener) { + int listenerId = System.identityHashCode(listener); + invalidateListeners.put(listenerId, listener); + return listenerId; + } + + public int addListener(LocalCacheUpdateListener listener) { + int listenerId = System.identityHashCode(listener); + updateListeners.put(listenerId, listener); + return listenerId; + } + } diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheView.java b/redisson/src/main/java/org/redisson/cache/LocalCacheView.java index 508f12381..c0843e2b8 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheView.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheView.java @@ -260,8 +260,8 @@ public class LocalCacheView { return new CacheKey(Hash.hash128toArray(encodedKey)); } - public ConcurrentMap getCache() { - return cache; + public ConcurrentMap getCache() { + return (ConcurrentMap) cache; } public ConcurrentMap createCache(LocalCachedMapOptions options) { diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 2be48631c..be5572786 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -11,6 +11,7 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy; import org.redisson.api.LocalCachedMapOptions.SyncStrategy; import org.redisson.api.MapOptions.WriteMode; +import org.redisson.api.listener.LocalCacheInvalidateListener; import org.redisson.api.map.MapLoader; import org.redisson.client.RedisClient; import org.redisson.client.RedisClientConfig; @@ -124,6 +125,22 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { } } + @Test + public void testListeners() throws InterruptedException { + RLocalCachedMap map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults()); + Map entries = new HashMap(); + map.addListener((LocalCacheInvalidateListener) (key, value) -> { + entries.put(key, value); + }); + map.put("v1", "v2"); + map.put("v3", "v4"); + + Thread.sleep(100); + + assertThat(entries.keySet()).containsOnly("v1", "v3"); + assertThat(entries.values()).containsOnly(null, null); + } + @Test public void testMapLoaderGet() { Map cache = new HashMap<>(); @@ -158,12 +175,12 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { @Override protected RMap getMap(String name) { - return redisson.getLocalCachedMap(name, LocalCachedMapOptions.defaults()); + return redisson.getLocalCachedMap(name, LocalCachedMapOptions.defaults()); } @Override protected RMap getMap(String name, Codec codec) { - return redisson.getLocalCachedMap(name, codec, LocalCachedMapOptions.defaults()); + return redisson.getLocalCachedMap(name, codec, LocalCachedMapOptions.defaults()); } @Override