Feature - RClientSideCaching object added. #6218

pull/6222/head
Nikita Koksharov 5 months ago
parent 6980a13d23
commit 3b074a28de

@ -1188,6 +1188,11 @@ public final class Redisson implements RedissonClient {
return new RedissonLiveObjectService(liveObjectClassCache, commandExecutor.copy(params));
}
@Override
public RClientSideCaching getClientSideCaching(ClientSideCachingOptions options) {
return new RedissonClientSideCaching(commandExecutor, options);
}
@Override
public void shutdown() {
writeBehindService.stop();

@ -0,0 +1,224 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.api.*;
import org.redisson.api.listener.TrackingListener;
import org.redisson.api.options.ClientSideCachingOptions;
import org.redisson.api.options.ClientSideCachingParams;
import org.redisson.cache.*;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.PublishSubscribeService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
public final class RedissonClientSideCaching implements RClientSideCaching {
Map<CacheKeyParams, Object> cache;
final Map<String, Set<CacheKeyParams>> name2cacheKey = new ConcurrentHashMap<>();
final CommandAsyncExecutor commandExecutor;
RedissonClientSideCaching(CommandAsyncExecutor commandExecutor, ClientSideCachingOptions options) {
ClientSideCachingParams params = (ClientSideCachingParams) options;
if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.NONE) {
cache = new NoneCacheMap<>(params.getTtl(), params.getIdleTime());
}
if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.LRU) {
cache = new LRUCacheMap<>(params.getSize(), params.getTtl(), params.getIdleTime());
}
if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.LFU) {
cache = new LFUCacheMap<>(params.getSize(), params.getTtl(), params.getIdleTime());
}
if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.SOFT) {
cache = ReferenceCacheMap.soft(params.getTtl(), params.getIdleTime());
}
if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.WEAK) {
cache = ReferenceCacheMap.weak(params.getTtl(), params.getIdleTime());
}
CommandAsyncExecutor tracked = commandExecutor.copy(true);
this.commandExecutor = create(tracked, CommandAsyncExecutor.class);
PublishSubscribeService subscribeService = this.commandExecutor.getConnectionManager().getSubscribeService();
CompletableFuture<Integer> r = subscribeService.subscribe(this.commandExecutor,
new TrackingListener() {
@Override
public void onChange(String name) {
Set<CacheKeyParams> keys = name2cacheKey.remove(name);
if (keys == null) {
return;
}
for (CacheKeyParams key : keys) {
cache.keySet().remove(key);
}
}
});
r.join();
}
public <T> T create(Object instance, Class<T> clazz) {
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (!method.getName().contains("read")) {
return method.invoke(instance, args);
}
String name = (String) Arrays.stream(args)
.filter(r -> r instanceof String)
.findFirst()
.orElse(null);
if (name == null) {
return method.invoke(instance, args);
}
CacheKeyParams key = new CacheKeyParams(args);
Set<CacheKeyParams> values = name2cacheKey.computeIfAbsent(name, v -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
values.add(key);
return cache.computeIfAbsent(key, k -> {
try {
return method.invoke(instance, args);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
});
}
};
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, handler);
}
@Override
public <V> RBucket<V> getBucket(String name) {
return new RedissonBucket<>(commandExecutor, name);
}
@Override
public <V> RBucket<V> getBucket(String name, Codec codec) {
return new RedissonBucket<>(codec, commandExecutor, name);
}
@Override
public <K, V> RStream<K, V> getStream(String name) {
return new RedissonStream<>(commandExecutor, name);
}
@Override
public <K, V> RStream<K, V> getStream(String name, Codec codec) {
return new RedissonStream<>(codec, commandExecutor, name);
}
@Override
public <V> RSet<V> getSet(String name) {
return new RedissonSet<>(commandExecutor, name, null);
}
@Override
public <V> RSet<V> getSet(String name, Codec codec) {
return new RedissonSet<>(codec, commandExecutor, name, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<>(commandExecutor, name, null, null, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<>(codec, commandExecutor, name, null, null, null);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<>(commandExecutor, name, null);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSet<>(codec, commandExecutor, name, null);
}
@Override
public <V> RList<V> getList(String name) {
return new RedissonList<>(commandExecutor, name, null);
}
@Override
public <V> RList<V> getList(String name, Codec codec) {
return new RedissonList<>(codec, commandExecutor, name, null);
}
@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<>(commandExecutor, name, null);
}
@Override
public <V> RQueue<V> getQueue(String name, Codec codec) {
return new RedissonQueue<>(codec, commandExecutor, name, null);
}
@Override
public <V> RDeque<V> getDeque(String name) {
return new RedissonDeque<>(commandExecutor, name, null);
}
@Override
public <V> RDeque<V> getDeque(String name, Codec codec) {
return new RedissonDeque<>(codec, commandExecutor, name, null);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue<>(commandExecutor, name, null);
}
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<>(codec, commandExecutor, name, null);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name) {
return new RedissonBlockingDeque<>(commandExecutor, name, null);
}
@Override
public <V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec) {
return new RedissonBlockingDeque<>(codec, commandExecutor, name, null);
}
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<>(commandExecutor, name, null);
}
@Override
public <V> RGeo<V> getGeo(String name, Codec codec) {
return new RedissonGeo<>(codec, commandExecutor, name, null);
}
}

@ -0,0 +1,252 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.client.codec.Codec;
public interface RClientSideCaching {
/**
* Returns object holder instance by name.
*
* @param <V> type of value
* @param name name of object
* @return Bucket object
*/
<V> RBucket<V> getBucket(String name);
/**
* Returns object holder instance by name
* using provided codec for object.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return Bucket object
*/
<V> RBucket<V> getBucket(String name, Codec codec);
/**
* Returns stream instance by <code>name</code>
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name of stream
* @return RStream object
*/
<K, V> RStream<K, V> getStream(String name);
/**
* Returns stream instance by <code>name</code>
* using provided <code>codec</code> for entries.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name name of stream
* @param codec codec for entry
* @return RStream object
*/
<K, V> RStream<K, V> getStream(String name, Codec codec);
/**
* Returns set instance by name.
*
* @param <V> type of value
* @param name name of object
* @return Set object
*/
<V> RSet<V> getSet(String name);
/**
* Returns set instance by name
* using provided codec for set objects.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return Set object
*/
<V> RSet<V> getSet(String name, Codec codec);
/**
* Returns map instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name name of object
* @return Map object
*/
<K, V> RMap<K, V> getMap(String name);
/**
* Returns map instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name name of object
* @param codec codec for keys and values
* @return Map object
*/
<K, V> RMap<K, V> getMap(String name, Codec codec);
/**
* Returns Redis Sorted Set instance by name.
* This sorted set sorts objects by object score.
*
* @param <V> type of value
* @param name name of object
* @return ScoredSortedSet object
*/
<V> RScoredSortedSet<V> getScoredSortedSet(String name);
/**
* Returns Redis Sorted Set instance by name
* using provided codec for sorted set objects.
* This sorted set sorts objects by object score.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return ScoredSortedSet object
*/
<V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec);
/**
* Returns list instance by name.
*
* @param <V> type of value
* @param name name of object
* @return List object
*/
<V> RList<V> getList(String name);
/**
* Returns list instance by name
* using provided codec for list objects.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return List object
*/
<V> RList<V> getList(String name, Codec codec);
/**
* Returns unbounded queue instance by name.
*
* @param <V> type of value
* @param name of object
* @return queue object
*/
<V> RQueue<V> getQueue(String name);
/**
* Returns unbounded queue instance by name
* using provided codec for queue objects.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for message
* @return Queue object
*/
<V> RQueue<V> getQueue(String name, Codec codec);
/**
* Returns unbounded deque instance by name.
*
* @param <V> type of value
* @param name name of object
* @return Deque object
*/
<V> RDeque<V> getDeque(String name);
/**
* Returns unbounded deque instance by name
* using provided codec for deque objects.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for values
* @return Deque object
*/
<V> RDeque<V> getDeque(String name, Codec codec);
/**
* Returns unbounded blocking queue instance by name.
*
* @param <V> type of value
* @param name name of object
* @return BlockingQueue object
*/
<V> RBlockingQueue<V> getBlockingQueue(String name);
/**
* Returns unbounded blocking queue instance by name
* using provided codec for queue objects.
*
* @param <V> type of value
* @param name name of queue
* @param codec queue objects codec
* @return BlockingQueue object
*/
<V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec);
/**
* Returns unbounded blocking deque instance by name.
*
* @param <V> type of value
* @param name name of object
* @return BlockingDeque object
*/
<V> RBlockingDeque<V> getBlockingDeque(String name);
/**
* Returns unbounded blocking deque instance by name
* using provided codec for deque objects.
*
* @param <V> type of value
* @param name name of object
* @param codec deque objects codec
* @return BlockingDeque object
*/
<V> RBlockingDeque<V> getBlockingDeque(String name, Codec codec);
/**
* Returns geospatial items holder instance by <code>name</code>.
*
* @param <V> type of value
* @param name name of object
* @return Geo object
*/
<V> RGeo<V> getGeo(String name);
/**
* Returns geospatial items holder instance by <code>name</code>
* using provided codec for geospatial members.
*
* @param <V> type of value
* @param name name of object
* @param codec codec for value
* @return Geo object
*/
<V> RGeo<V> getGeo(String name, Codec codec);
}

@ -1926,6 +1926,18 @@ public interface RedissonClient {
*/
RLiveObjectService getLiveObjectService(LiveObjectOptions options);
/**
* Returns client side caching facade interface with the specified <code>options</code>.
* <p><strong>
* NOTE: client side caching feature is ineffective for Map or JSON based structures.<br>
* Use local cached <a href="https://redisson.org/docs/data-and-services/collections/#eviction-local-cache-and-data-partitioning">Map</a>, <a href="https://redisson.org/docs/data-and-services/collections/#local-cache">JSON Store</a> instead.
* </strong>
*
* @param options client cache options
* @return Client side caching instance
*/
RClientSideCaching getClientSideCaching(ClientSideCachingOptions options);
/**
* Returns RxJava Redisson instance
*

@ -0,0 +1,109 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.options;
import java.time.Duration;
/**
*
* @author Nikita Koksharov
*
*/
public interface ClientSideCachingOptions {
enum EvictionPolicy {
/**
* Client cache without eviction.
*/
NONE,
/**
* Least Recently Used client cache eviction policy.
*/
LRU,
/**
* Least Frequently Used client cache eviction policy.
*/
LFU,
/**
* Client cache eviction policy with Soft Reference used for values.
* All references will be collected by GC
*/
SOFT,
/**
* Client cache eviction policy with Weak Reference used for values.
* All references will be collected by GC
*/
WEAK
};
/**
* Creates the default options
*
* @return options instance
*/
static ClientSideCachingOptions defaults() {
return new ClientSideCachingParams();
}
/**
* Defines client cache eviction policy.
*
* @param evictionPolicy
* <p><code>LRU</code> - uses client cache with LRU (least recently used) eviction policy.
* <p><code>LFU</code> - uses client cache with LFU (least frequently used) eviction policy.
* <p><code>SOFT</code> - uses client cache with soft references. The garbage collector will evict items from the client cache when the JVM is running out of memory.
* <p><code>WEAK</code> - uses client cache with weak references. The garbage collector will evict items from the client cache when it became weakly reachable.
* <p><code>NONE</code> - doesn't use eviction policy, but timeToLive and maxIdleTime params are still working.
* @return options instance
*/
ClientSideCachingOptions evictionPolicy(EvictionPolicy evictionPolicy);
/**
* Defines client cache size.
* <p>
* If size is <code>0</code> then client cache is unbounded.
* <p>
* If size is <code>-1</code> then client cache is always empty and doesn't store data.
*
* @param size size of client cache
* @return options instance
*/
ClientSideCachingOptions size(int size);
/**
* Defines time to live in milliseconds of each map entry in client cache.
* If value equals to <code>0</code> then timeout is not applied
*
* @param ttl - time to live in milliseconds
* @return LocalCachedMapOptions instance
*/
ClientSideCachingOptions timeToLive(Duration ttl);
/**
* Defines max idle time in milliseconds of each map entry in client cache.
* If value equals to <code>0</code> then timeout is not applied
*
* @param idleTime time to live in milliseconds
* @return LocalCachedMapOptions instance
*/
ClientSideCachingOptions maxIdle(Duration idleTime);
}

@ -0,0 +1,72 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.options;
import java.time.Duration;
/**
*
* @author Nikita Koksharov
*
*/
public final class ClientSideCachingParams implements ClientSideCachingOptions {
private EvictionPolicy evictionPolicy = EvictionPolicy.NONE;
private int size;
private Duration ttl = Duration.ZERO;
private Duration idleTime = Duration.ZERO;
@Override
public ClientSideCachingOptions evictionPolicy(EvictionPolicy evictionPolicy) {
this.evictionPolicy = evictionPolicy;
return this;
}
@Override
public ClientSideCachingOptions size(int size) {
this.size = size;
return this;
}
@Override
public ClientSideCachingOptions timeToLive(Duration ttl) {
this.ttl = ttl;
return this;
}
@Override
public ClientSideCachingOptions maxIdle(Duration idleTime) {
this.idleTime = idleTime;
return this;
}
public EvictionPolicy getEvictionPolicy() {
return evictionPolicy;
}
public int getSize() {
return size;
}
public long getTtl() {
return ttl.toMillis();
}
public long getIdleTime() {
return idleTime.toMillis();
}
}

@ -0,0 +1,46 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.util.Arrays;
/**
*
* @author Nikita Koksharov
*
*/
public final class CacheKeyParams {
private final Object[] values;
public CacheKeyParams(Object[] values) {
this.values = values;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKeyParams cacheKey = (CacheKeyParams) o;
return Arrays.deepEquals(values, cacheKey.values);
}
@Override
public int hashCode() {
return Arrays.deepHashCode(values);
}
}

@ -263,6 +263,33 @@ public class PublishSubscribeService {
return registerClientTrackingListener(commandExecutor, ffs, listenerId, null);
}
public CompletableFuture<Integer> subscribe(CommandAsyncExecutor commandExecutor, TrackingListener listener) {
int listenerId = System.identityHashCode(listener);
List<CompletableFuture<PubSubConnectionEntry>> ffs = new ArrayList<>();
for (MasterSlaveEntry entry : connectionManager.getEntrySet()) {
RedisPubSubListener<Object> entryListener = new RedisPubSubListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (msg != null
&& channel.equals(ChannelName.TRACKING.toString())) {
listener.onChange((String) msg);
}
}
};
int entryListenerId = System.identityHashCode(entryListener);
Collection<Integer> listeners = flushListeners.computeIfAbsent(listenerId, k -> new HashSet<>());
listeners.add(entryListenerId);
CompletableFuture<PubSubConnectionEntry> future = subscribe(PubSubType.SUBSCRIBE, StringCodec.INSTANCE,
ChannelName.TRACKING, entry, entry.getEntry(), entryListener);
ffs.add(future);
}
return registerClientTrackingListener(commandExecutor, ffs, listenerId, null);
}
private CompletableFuture<Integer> registerClientTrackingListener(CommandAsyncExecutor commandExecutor,
List<CompletableFuture<PubSubConnectionEntry>> ffs,
int listenerId,

@ -0,0 +1,35 @@
package org.redisson;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RBucket;
import org.redisson.api.RClientSideCaching;
import org.redisson.api.RedissonClient;
import org.redisson.api.options.ClientSideCachingOptions;
import org.redisson.config.Config;
import org.redisson.config.Protocol;
public class RedissonClientSideCachingTest extends RedisDockerTest {
@Test
public void testBucket() throws InterruptedException {
Config c = redisson.getConfig();
c.setProtocol(Protocol.RESP3);
RedissonClient rs = Redisson.create(c);
RClientSideCaching csc = rs.getClientSideCaching(ClientSideCachingOptions.defaults());
RBucket<String> b = csc.getBucket("test1");
Assertions.assertThat(b.get()).isNull();
Assertions.assertThat(b.get()).isNull();
RBucket<Object> b2 = rs.getBucket("test1");
b2.set("123");
Thread.sleep(100);
Assertions.assertThat(b.get()).isEqualTo("123");
rs.shutdown();
}
}
Loading…
Cancel
Save