Feature - LocalCacheUpdateListener and LocalCacheInvalidateListener listeners added for RLocalCachedMap object. #4686

pull/5004/head
Nikita Koksharov 2 years ago
parent 76957f81df
commit cf241af5c0

@ -16,12 +16,11 @@
package org.redisson;
import io.netty.buffer.ByteBuf;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.*;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.LocalCacheInvalidateListener;
import org.redisson.api.listener.LocalCacheUpdateListener;
import org.redisson.cache.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
@ -86,11 +85,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
listener = new LocalCacheListener(getRawName(), commandExecutor, this, codec, options, cacheUpdateLogTime) {
@Override
protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
protected CacheValue updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
CacheKey cacheKey = localCacheView.toCacheKey(keyBuf);
Object key = codec.getMapKeyDecoder().decode(keyBuf, null);
Object value = codec.getMapValueDecoder().decode(valueBuf, null);
cachePut(cacheKey, key, value);
return new CacheValue(key, value);
}
};
@ -132,8 +132,16 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (listener.isDisabled(cacheKey)) {
return null;
}
return cache.put(cacheKey, new CacheValue(key, value));
CacheValue newValue = new CacheValue(key, value);
CacheValue oldValue = cache.put(cacheKey, newValue);
Object oldV = null;
if (oldValue != null) {
oldV = oldValue.getValue();
}
listener.notifyInvalidate(new CacheValue(key, oldV));
listener.notifyUpdate(newValue);
return oldValue;
}
private CacheValue cachePutIfAbsent(CacheKey cacheKey, Object key, Object value) {
@ -185,6 +193,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return cache.remove(cacheKey, new CacheValue(key, value));
}
private CacheValue cacheRemove(CacheKey cacheKey) {
CacheValue v = cache.remove(cacheKey);
listener.notifyInvalidate(v);
listener.notifyUpdate(v);
return v;
}
@Override
public RFuture<Integer> sizeAsync() {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
@ -371,7 +386,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
protected RFuture<V> removeOperationAsync(K key) {
ByteBuf keyEncoded = encodeMapKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
CacheValue value = cache.remove(cacheKey);
CacheValue value = cacheRemove(cacheKey);
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
keyEncoded.release();
@ -417,7 +432,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(keyEncoded);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey);
cacheRemove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
}
@ -444,7 +459,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(keyEncoded);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey);
cacheRemove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
@ -473,7 +488,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(keyEncoded);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey);
cacheRemove(cacheKey);
}
RFuture<List<Long>> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
@ -494,7 +509,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
long count = 0;
for (K k : keys) {
CacheKey cacheKey = localCacheView.toCacheKey(k);
CacheValue val = cache.remove(cacheKey);
CacheValue val = cacheRemove(cacheKey);
if (val != null) {
count++;
LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash());
@ -511,7 +526,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(keyEncoded);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey);
cacheRemove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
}
@ -537,7 +552,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(keyEncoded);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey);
cacheRemove(cacheKey);
ByteBuf msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
@ -566,7 +581,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(keyEncoded);
CacheKey cacheKey = localCacheView.toCacheKey(keyEncoded);
cache.remove(cacheKey);
cacheRemove(cacheKey);
}
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.HDEL, params.toArray());
@ -1173,7 +1188,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cache.remove(cacheKey);
cacheRemove(cacheKey);
}
return res;
});
@ -1285,4 +1300,27 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return super.entrySet(keyPattern, count);
}
@Override
public int addListener(ObjectListener listener) {
if (listener instanceof LocalCacheInvalidateListener) {
return this.listener.addListener((LocalCacheInvalidateListener) listener);
}
if (listener instanceof LocalCacheUpdateListener) {
return this.listener.addListener((LocalCacheUpdateListener) listener);
}
return super.addListener(listener);
}
@Override
public RFuture<Integer> addListenerAsync(ObjectListener listener) {
if (listener instanceof LocalCacheInvalidateListener) {
int r = this.listener.addListener((LocalCacheInvalidateListener) listener);
return new CompletableFutureWrapper<>(r);
}
if (listener instanceof LocalCacheUpdateListener) {
int r = this.listener.addListener((LocalCacheUpdateListener) listener);
return new CompletableFutureWrapper<>(r);
}
return super.addListenerAsync(listener);
}
}

@ -0,0 +1,36 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for local cache invalidation event published by Redis.
*
* @author Nikita Koksharov
*
*/
public interface LocalCacheInvalidateListener<K, V> extends ObjectListener {
/**
* Invoked on event of map entry invalidation
*
* @param key key to remove
* @param value value to remove
*/
void onInvalidate(K key, V value);
}

@ -0,0 +1,36 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for local cache update event published by Redis.
*
* @author Nikita Koksharov
*
*/
public interface LocalCacheUpdateListener<K, V> extends ObjectListener {
/**
* Invoked on event of map entry udpate
*
* @param key key to update
* @param value new value
*/
void onUpdate(K key, V value);
}

@ -25,6 +25,8 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.LocalCacheInvalidateListener;
import org.redisson.api.listener.LocalCacheUpdateListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
@ -55,7 +57,7 @@ public abstract class LocalCacheListener {
private String name;
private CommandAsyncExecutor commandExecutor;
private Map<?, ?> cache;
private Map<CacheKey, ? extends CacheValue> cache;
private RObject object;
private byte[] instanceId = new byte[16];
private Codec codec;
@ -66,7 +68,11 @@ public abstract class LocalCacheListener {
private RTopic invalidationTopic;
private int syncListenerId;
private int reconnectionListenerId;
private final Map<Integer, LocalCacheInvalidateListener<?, ?>> invalidateListeners = new ConcurrentHashMap<>();
private final Map<Integer, LocalCacheUpdateListener<?, ?>> updateListeners = new ConcurrentHashMap<>();
public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor,
RObject object, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime) {
super();
@ -133,7 +139,7 @@ public abstract class LocalCacheListener {
return disabledKeys.containsKey(key);
}
public void add(Map<?, ?> cache) {
public void add(Map<CacheKey, ? extends CacheValue> cache) {
this.cache = cache;
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
@ -200,7 +206,8 @@ public abstract class LocalCacheListener {
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
CacheValue value = cache.remove(key);
notifyInvalidate(value);
}
}
}
@ -213,7 +220,8 @@ public abstract class LocalCacheListener {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
updateCache(keyBuf, valueBuf);
CacheValue value = updateCache(keyBuf, valueBuf);
notifyUpdate(value);
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
@ -245,7 +253,19 @@ public abstract class LocalCacheListener {
}
}
}
public void notifyUpdate(CacheValue value) {
for (LocalCacheUpdateListener listener : updateListeners.values()) {
listener.onUpdate(value.getKey(), value.getValue());
}
}
public void notifyInvalidate(CacheValue value) {
for (LocalCacheInvalidateListener listener : invalidateListeners.values()) {
listener.onInvalidate(value.getKey(), value.getValue());
}
}
public RFuture<Void> clearLocalCacheAsync() {
cache.clear();
if (syncListenerId == 0) {
@ -276,7 +296,7 @@ public abstract class LocalCacheListener {
return RedissonObject.suffixName(name, TOPIC_SUFFIX);
}
protected abstract void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException;
protected abstract CacheValue updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException;
private void disableKeys(final String requestId, final Set<CacheKey> keys, long timeout) {
for (CacheKey key : keys) {
@ -350,4 +370,16 @@ public abstract class LocalCacheListener {
return semaphore;
}
public <K, V> int addListener(LocalCacheInvalidateListener<K, V> listener) {
int listenerId = System.identityHashCode(listener);
invalidateListeners.put(listenerId, listener);
return listenerId;
}
public <K, V> int addListener(LocalCacheUpdateListener<K, V> listener) {
int listenerId = System.identityHashCode(listener);
updateListeners.put(listenerId, listener);
return listenerId;
}
}

@ -260,8 +260,8 @@ public class LocalCacheView<K, V> {
return new CacheKey(Hash.hash128toArray(encodedKey));
}
public ConcurrentMap<CacheKey, CacheValue> getCache() {
return cache;
public <K1, V1> ConcurrentMap<K1, V1> getCache() {
return (ConcurrentMap<K1, V1>) cache;
}
public ConcurrentMap<CacheKey, CacheValue> createCache(LocalCachedMapOptions<?, ?> options) {

@ -11,6 +11,7 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.listener.LocalCacheInvalidateListener;
import org.redisson.api.map.MapLoader;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
@ -124,6 +125,22 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
}
}
@Test
public void testListeners() throws InterruptedException {
RLocalCachedMap<String, String> map = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
Map<String, String> entries = new HashMap();
map.addListener((LocalCacheInvalidateListener<String, String>) (key, value) -> {
entries.put(key, value);
});
map.put("v1", "v2");
map.put("v3", "v4");
Thread.sleep(100);
assertThat(entries.keySet()).containsOnly("v1", "v3");
assertThat(entries.values()).containsOnly(null, null);
}
@Test
public void testMapLoaderGet() {
Map<String, String> cache = new HashMap<>();
@ -158,12 +175,12 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
@Override
protected <K, V> RMap<K, V> getMap(String name) {
return redisson.getLocalCachedMap(name, LocalCachedMapOptions.<K, V>defaults());
return redisson.getLocalCachedMap(name, LocalCachedMapOptions.defaults());
}
@Override
protected <K, V> RMap<K, V> getMap(String name, Codec codec) {
return redisson.getLocalCachedMap(name, codec, LocalCachedMapOptions.<K, V>defaults());
return redisson.getLocalCachedMap(name, codec, LocalCachedMapOptions.defaults());
}
@Override

Loading…
Cancel
Save