diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 8cb039eea..fbaeccd22 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -1188,6 +1188,11 @@ public final class Redisson implements RedissonClient { return new RedissonLiveObjectService(liveObjectClassCache, commandExecutor.copy(params)); } + @Override + public RClientSideCaching getClientSideCaching(ClientSideCachingOptions options) { + return new RedissonClientSideCaching(commandExecutor, options); + } + @Override public void shutdown() { writeBehindService.stop(); diff --git a/redisson/src/main/java/org/redisson/RedissonClientSideCaching.java b/redisson/src/main/java/org/redisson/RedissonClientSideCaching.java new file mode 100644 index 000000000..fb627c871 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonClientSideCaching.java @@ -0,0 +1,224 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import org.redisson.api.*; +import org.redisson.api.listener.TrackingListener; +import org.redisson.api.options.ClientSideCachingOptions; +import org.redisson.api.options.ClientSideCachingParams; +import org.redisson.cache.*; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.pubsub.PublishSubscribeService; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public final class RedissonClientSideCaching implements RClientSideCaching { + + Map cache; + final Map> name2cacheKey = new ConcurrentHashMap<>(); + + final CommandAsyncExecutor commandExecutor; + + RedissonClientSideCaching(CommandAsyncExecutor commandExecutor, ClientSideCachingOptions options) { + ClientSideCachingParams params = (ClientSideCachingParams) options; + if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.NONE) { + cache = new NoneCacheMap<>(params.getTtl(), params.getIdleTime()); + } + if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.LRU) { + cache = new LRUCacheMap<>(params.getSize(), params.getTtl(), params.getIdleTime()); + } + if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.LFU) { + cache = new LFUCacheMap<>(params.getSize(), params.getTtl(), params.getIdleTime()); + } + if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.SOFT) { + cache = ReferenceCacheMap.soft(params.getTtl(), params.getIdleTime()); + } + if (params.getEvictionPolicy() == ClientSideCachingOptions.EvictionPolicy.WEAK) { + cache = ReferenceCacheMap.weak(params.getTtl(), params.getIdleTime()); + } + + CommandAsyncExecutor tracked = commandExecutor.copy(true); + this.commandExecutor = create(tracked, CommandAsyncExecutor.class); + + PublishSubscribeService subscribeService = this.commandExecutor.getConnectionManager().getSubscribeService(); + CompletableFuture r = subscribeService.subscribe(this.commandExecutor, + new TrackingListener() { + @Override + public void onChange(String name) { + Set keys = name2cacheKey.remove(name); + if (keys == null) { + return; + } + + for (CacheKeyParams key : keys) { + cache.keySet().remove(key); + } + } + }); + r.join(); + } + + public T create(Object instance, Class clazz) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (!method.getName().contains("read")) { + return method.invoke(instance, args); + } + + String name = (String) Arrays.stream(args) + .filter(r -> r instanceof String) + .findFirst() + .orElse(null); + if (name == null) { + return method.invoke(instance, args); + } + + CacheKeyParams key = new CacheKeyParams(args); + Set values = name2cacheKey.computeIfAbsent(name, v -> Collections.newSetFromMap(new ConcurrentHashMap<>())); + values.add(key); + return cache.computeIfAbsent(key, k -> { + try { + return method.invoke(instance, args); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException(e); + } + }); + } + }; + return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[] { clazz }, handler); + } + + @Override + public RBucket getBucket(String name) { + return new RedissonBucket<>(commandExecutor, name); + } + + @Override + public RBucket getBucket(String name, Codec codec) { + return new RedissonBucket<>(codec, commandExecutor, name); + } + + @Override + public RStream getStream(String name) { + return new RedissonStream<>(commandExecutor, name); + } + + @Override + public RStream getStream(String name, Codec codec) { + return new RedissonStream<>(codec, commandExecutor, name); + } + + @Override + public RSet getSet(String name) { + return new RedissonSet<>(commandExecutor, name, null); + } + + @Override + public RSet getSet(String name, Codec codec) { + return new RedissonSet<>(codec, commandExecutor, name, null); + } + + @Override + public RMap getMap(String name) { + return new RedissonMap<>(commandExecutor, name, null, null, null); + } + + @Override + public RMap getMap(String name, Codec codec) { + return new RedissonMap<>(codec, commandExecutor, name, null, null, null); + } + + @Override + public RScoredSortedSet getScoredSortedSet(String name) { + return new RedissonScoredSortedSet<>(commandExecutor, name, null); + } + + @Override + public RScoredSortedSet getScoredSortedSet(String name, Codec codec) { + return new RedissonScoredSortedSet<>(codec, commandExecutor, name, null); + } + + @Override + public RList getList(String name) { + return new RedissonList<>(commandExecutor, name, null); + } + + @Override + public RList getList(String name, Codec codec) { + return new RedissonList<>(codec, commandExecutor, name, null); + } + + @Override + public RQueue getQueue(String name) { + return new RedissonQueue<>(commandExecutor, name, null); + } + + @Override + public RQueue getQueue(String name, Codec codec) { + return new RedissonQueue<>(codec, commandExecutor, name, null); + } + + @Override + public RDeque getDeque(String name) { + return new RedissonDeque<>(commandExecutor, name, null); + } + + @Override + public RDeque getDeque(String name, Codec codec) { + return new RedissonDeque<>(codec, commandExecutor, name, null); + } + + @Override + public RBlockingQueue getBlockingQueue(String name) { + return new RedissonBlockingQueue<>(commandExecutor, name, null); + } + + @Override + public RBlockingQueue getBlockingQueue(String name, Codec codec) { + return new RedissonBlockingQueue<>(codec, commandExecutor, name, null); + } + + @Override + public RBlockingDeque getBlockingDeque(String name) { + return new RedissonBlockingDeque<>(commandExecutor, name, null); + } + + @Override + public RBlockingDeque getBlockingDeque(String name, Codec codec) { + return new RedissonBlockingDeque<>(codec, commandExecutor, name, null); + } + + @Override + public RGeo getGeo(String name) { + return new RedissonGeo<>(commandExecutor, name, null); + } + + @Override + public RGeo getGeo(String name, Codec codec) { + return new RedissonGeo<>(codec, commandExecutor, name, null); + } +} diff --git a/redisson/src/main/java/org/redisson/api/RClientSideCaching.java b/redisson/src/main/java/org/redisson/api/RClientSideCaching.java new file mode 100644 index 000000000..7219e86cd --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RClientSideCaching.java @@ -0,0 +1,252 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.client.codec.Codec; + +public interface RClientSideCaching { + + /** + * Returns object holder instance by name. + * + * @param type of value + * @param name name of object + * @return Bucket object + */ + RBucket getBucket(String name); + + /** + * Returns object holder instance by name + * using provided codec for object. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return Bucket object + */ + RBucket getBucket(String name, Codec codec); + + /** + * Returns stream instance by name + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name of stream + * @return RStream object + */ + RStream getStream(String name); + + /** + * Returns stream instance by name + * using provided codec for entries. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name name of stream + * @param codec codec for entry + * @return RStream object + */ + RStream getStream(String name, Codec codec); + + /** + * Returns set instance by name. + * + * @param type of value + * @param name name of object + * @return Set object + */ + RSet getSet(String name); + + /** + * Returns set instance by name + * using provided codec for set objects. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return Set object + */ + RSet getSet(String name, Codec codec); + + /** + * Returns map instance by name. + * + * @param type of key + * @param type of value + * @param name name of object + * @return Map object + */ + RMap getMap(String name); + + /** + * Returns map instance by name + * using provided codec for both map keys and values. + * + * @param type of key + * @param type of value + * @param name name of object + * @param codec codec for keys and values + * @return Map object + */ + RMap getMap(String name, Codec codec); + + /** + * Returns Redis Sorted Set instance by name. + * This sorted set sorts objects by object score. + * + * @param type of value + * @param name name of object + * @return ScoredSortedSet object + */ + RScoredSortedSet getScoredSortedSet(String name); + + /** + * Returns Redis Sorted Set instance by name + * using provided codec for sorted set objects. + * This sorted set sorts objects by object score. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return ScoredSortedSet object + */ + RScoredSortedSet getScoredSortedSet(String name, Codec codec); + + /** + * Returns list instance by name. + * + * @param type of value + * @param name name of object + * @return List object + */ + RList getList(String name); + + /** + * Returns list instance by name + * using provided codec for list objects. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return List object + */ + RList getList(String name, Codec codec); + + /** + * Returns unbounded queue instance by name. + * + * @param type of value + * @param name of object + * @return queue object + */ + RQueue getQueue(String name); + + /** + * Returns unbounded queue instance by name + * using provided codec for queue objects. + * + * @param type of value + * @param name name of object + * @param codec codec for message + * @return Queue object + */ + RQueue getQueue(String name, Codec codec); + + /** + * Returns unbounded deque instance by name. + * + * @param type of value + * @param name name of object + * @return Deque object + */ + RDeque getDeque(String name); + + /** + * Returns unbounded deque instance by name + * using provided codec for deque objects. + * + * @param type of value + * @param name name of object + * @param codec codec for values + * @return Deque object + */ + RDeque getDeque(String name, Codec codec); + + /** + * Returns unbounded blocking queue instance by name. + * + * @param type of value + * @param name name of object + * @return BlockingQueue object + */ + RBlockingQueue getBlockingQueue(String name); + + /** + * Returns unbounded blocking queue instance by name + * using provided codec for queue objects. + * + * @param type of value + * @param name name of queue + * @param codec queue objects codec + * @return BlockingQueue object + */ + RBlockingQueue getBlockingQueue(String name, Codec codec); + + /** + * Returns unbounded blocking deque instance by name. + * + * @param type of value + * @param name name of object + * @return BlockingDeque object + */ + RBlockingDeque getBlockingDeque(String name); + + /** + * Returns unbounded blocking deque instance by name + * using provided codec for deque objects. + * + * @param type of value + * @param name name of object + * @param codec deque objects codec + * @return BlockingDeque object + */ + RBlockingDeque getBlockingDeque(String name, Codec codec); + + /** + * Returns geospatial items holder instance by name. + * + * @param type of value + * @param name name of object + * @return Geo object + */ + RGeo getGeo(String name); + + /** + * Returns geospatial items holder instance by name + * using provided codec for geospatial members. + * + * @param type of value + * @param name name of object + * @param codec codec for value + * @return Geo object + */ + RGeo getGeo(String name, Codec codec); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index 48ac51987..33af755b4 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -1926,6 +1926,18 @@ public interface RedissonClient { */ RLiveObjectService getLiveObjectService(LiveObjectOptions options); + /** + * Returns client side caching facade interface with the specified options. + *

+ * NOTE: client side caching feature is ineffective for Map or JSON based structures.
+ * Use local cached Map, JSON Store instead. + *
+ * + * @param options client cache options + * @return Client side caching instance + */ + RClientSideCaching getClientSideCaching(ClientSideCachingOptions options); + /** * Returns RxJava Redisson instance * diff --git a/redisson/src/main/java/org/redisson/api/options/ClientSideCachingOptions.java b/redisson/src/main/java/org/redisson/api/options/ClientSideCachingOptions.java new file mode 100644 index 000000000..c2e8fc005 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/options/ClientSideCachingOptions.java @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.options; + +import java.time.Duration; + +/** + * + * @author Nikita Koksharov + * + */ +public interface ClientSideCachingOptions { + + enum EvictionPolicy { + + /** + * Client cache without eviction. + */ + NONE, + + /** + * Least Recently Used client cache eviction policy. + */ + LRU, + + /** + * Least Frequently Used client cache eviction policy. + */ + LFU, + + /** + * Client cache eviction policy with Soft Reference used for values. + * All references will be collected by GC + */ + SOFT, + + /** + * Client cache eviction policy with Weak Reference used for values. + * All references will be collected by GC + */ + WEAK + }; + + /** + * Creates the default options + * + * @return options instance + */ + static ClientSideCachingOptions defaults() { + return new ClientSideCachingParams(); + } + + /** + * Defines client cache eviction policy. + * + * @param evictionPolicy + *

LRU - uses client cache with LRU (least recently used) eviction policy. + *

LFU - uses client cache with LFU (least frequently used) eviction policy. + *

SOFT - uses client cache with soft references. The garbage collector will evict items from the client cache when the JVM is running out of memory. + *

WEAK - uses client cache with weak references. The garbage collector will evict items from the client cache when it became weakly reachable. + *

NONE - doesn't use eviction policy, but timeToLive and maxIdleTime params are still working. + * @return options instance + */ + ClientSideCachingOptions evictionPolicy(EvictionPolicy evictionPolicy); + + /** + * Defines client cache size. + *

+ * If size is 0 then client cache is unbounded. + *

+ * If size is -1 then client cache is always empty and doesn't store data. + * + * @param size size of client cache + * @return options instance + */ + ClientSideCachingOptions size(int size); + + /** + * Defines time to live in milliseconds of each map entry in client cache. + * If value equals to 0 then timeout is not applied + * + * @param ttl - time to live in milliseconds + * @return LocalCachedMapOptions instance + */ + ClientSideCachingOptions timeToLive(Duration ttl); + + /** + * Defines max idle time in milliseconds of each map entry in client cache. + * If value equals to 0 then timeout is not applied + * + * @param idleTime time to live in milliseconds + * @return LocalCachedMapOptions instance + */ + ClientSideCachingOptions maxIdle(Duration idleTime); + +} diff --git a/redisson/src/main/java/org/redisson/api/options/ClientSideCachingParams.java b/redisson/src/main/java/org/redisson/api/options/ClientSideCachingParams.java new file mode 100644 index 000000000..7eb14c939 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/options/ClientSideCachingParams.java @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.options; + +import java.time.Duration; + + +/** + * + * @author Nikita Koksharov + * + */ +public final class ClientSideCachingParams implements ClientSideCachingOptions { + + private EvictionPolicy evictionPolicy = EvictionPolicy.NONE; + private int size; + private Duration ttl = Duration.ZERO; + private Duration idleTime = Duration.ZERO; + + @Override + public ClientSideCachingOptions evictionPolicy(EvictionPolicy evictionPolicy) { + this.evictionPolicy = evictionPolicy; + return this; + } + + @Override + public ClientSideCachingOptions size(int size) { + this.size = size; + return this; + } + + @Override + public ClientSideCachingOptions timeToLive(Duration ttl) { + this.ttl = ttl; + return this; + } + + @Override + public ClientSideCachingOptions maxIdle(Duration idleTime) { + this.idleTime = idleTime; + return this; + } + + public EvictionPolicy getEvictionPolicy() { + return evictionPolicy; + } + + public int getSize() { + return size; + } + + public long getTtl() { + return ttl.toMillis(); + } + + public long getIdleTime() { + return idleTime.toMillis(); + } +} diff --git a/redisson/src/main/java/org/redisson/cache/CacheKeyParams.java b/redisson/src/main/java/org/redisson/cache/CacheKeyParams.java new file mode 100644 index 000000000..5b12b4ae9 --- /dev/null +++ b/redisson/src/main/java/org/redisson/cache/CacheKeyParams.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.cache; + +import java.util.Arrays; + +/** + * + * @author Nikita Koksharov + * + */ +public final class CacheKeyParams { + + private final Object[] values; + + public CacheKeyParams(Object[] values) { + this.values = values; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKeyParams cacheKey = (CacheKeyParams) o; + return Arrays.deepEquals(values, cacheKey.values); + } + + @Override + public int hashCode() { + return Arrays.deepHashCode(values); + } + +} diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index eb303aaf2..b06b3076a 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -263,6 +263,33 @@ public class PublishSubscribeService { return registerClientTrackingListener(commandExecutor, ffs, listenerId, null); } + public CompletableFuture subscribe(CommandAsyncExecutor commandExecutor, TrackingListener listener) { + int listenerId = System.identityHashCode(listener); + + List> ffs = new ArrayList<>(); + for (MasterSlaveEntry entry : connectionManager.getEntrySet()) { + RedisPubSubListener entryListener = new RedisPubSubListener() { + @Override + public void onMessage(CharSequence channel, Object msg) { + if (msg != null + && channel.equals(ChannelName.TRACKING.toString())) { + listener.onChange((String) msg); + } + } + }; + int entryListenerId = System.identityHashCode(entryListener); + + Collection listeners = flushListeners.computeIfAbsent(listenerId, k -> new HashSet<>()); + listeners.add(entryListenerId); + + CompletableFuture future = subscribe(PubSubType.SUBSCRIBE, StringCodec.INSTANCE, + ChannelName.TRACKING, entry, entry.getEntry(), entryListener); + ffs.add(future); + } + + return registerClientTrackingListener(commandExecutor, ffs, listenerId, null); + } + private CompletableFuture registerClientTrackingListener(CommandAsyncExecutor commandExecutor, List> ffs, int listenerId, diff --git a/redisson/src/test/java/org/redisson/RedissonClientSideCachingTest.java b/redisson/src/test/java/org/redisson/RedissonClientSideCachingTest.java new file mode 100644 index 000000000..956c2ebeb --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonClientSideCachingTest.java @@ -0,0 +1,35 @@ +package org.redisson; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.redisson.api.RBucket; +import org.redisson.api.RClientSideCaching; +import org.redisson.api.RedissonClient; +import org.redisson.api.options.ClientSideCachingOptions; +import org.redisson.config.Config; +import org.redisson.config.Protocol; + +public class RedissonClientSideCachingTest extends RedisDockerTest { + + @Test + public void testBucket() throws InterruptedException { + Config c = redisson.getConfig(); + c.setProtocol(Protocol.RESP3); + + RedissonClient rs = Redisson.create(c); + + RClientSideCaching csc = rs.getClientSideCaching(ClientSideCachingOptions.defaults()); + RBucket b = csc.getBucket("test1"); + Assertions.assertThat(b.get()).isNull(); + Assertions.assertThat(b.get()).isNull(); + + RBucket b2 = rs.getBucket("test1"); + b2.set("123"); + Thread.sleep(100); + + Assertions.assertThat(b.get()).isEqualTo("123"); + + rs.shutdown(); + } + +}