From b03a34f20b843dcecff51bbbf37c5254ea5db106 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 14 Dec 2015 13:48:41 +0300 Subject: [PATCH] RSetCacheReactive added. #321 --- .../java/org/redisson/RedissonMapCache.java | 2 +- .../java/org/redisson/RedissonReactive.java | 13 + .../java/org/redisson/RedissonReadLock.java | 4 +- .../java/org/redisson/RedissonSetCache.java | 2 +- .../java/org/redisson/RedissonWriteLock.java | 4 +- .../java/org/redisson/api/RBatchReactive.java | 33 ++ .../org/redisson/api/RSetCacheReactive.java | 42 ++ .../redisson/api/RedissonReactiveClient.java | 24 ++ .../client/protocol/RedisCommands.java | 1 + ...ctionReactive.java => PublisherAdder.java} | 22 +- .../reactive/RedissonBatchReactive.java | 11 + .../reactive/RedissonExpirableReactive.java | 2 +- .../RedissonLexSortedSetReactive.java | 15 +- .../reactive/RedissonListReactive.java | 23 +- .../reactive/RedissonMapCacheReactive.java | 3 +- .../reactive/RedissonObjectReactive.java | 3 - .../RedissonScoredSortedSetReactive.java | 113 +----- .../reactive/RedissonSetCacheReactive.java | 373 ++++++++++++++++++ .../reactive/RedissonSetReactive.java | 123 +----- .../reactive/SetReactiveIterator.java | 133 +++++++ 20 files changed, 667 insertions(+), 279 deletions(-) create mode 100644 src/main/java/org/redisson/api/RSetCacheReactive.java rename src/main/java/org/redisson/reactive/{RedissonCollectionReactive.java => PublisherAdder.java} (72%) create mode 100644 src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java create mode 100644 src/main/java/org/redisson/reactive/SetReactiveIterator.java diff --git a/src/main/java/org/redisson/RedissonMapCache.java b/src/main/java/org/redisson/RedissonMapCache.java index 61de85b61..44ab4c5f9 100644 --- a/src/main/java/org/redisson/RedissonMapCache.java +++ b/src/main/java/org/redisson/RedissonMapCache.java @@ -345,7 +345,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + "redis.call('pexpire', KEYS[2], ARGV[1]); " + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", - Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive)); + Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive)); } @Override diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index ba1df5c91..ba505fa40 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -35,6 +35,7 @@ import org.redisson.api.RPatternTopicReactive; import org.redisson.api.RQueueReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; +import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; import org.redisson.api.RedissonReactiveClient; @@ -66,6 +67,7 @@ import org.redisson.reactive.RedissonPatternTopicReactive; import org.redisson.reactive.RedissonQueueReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScriptReactive; +import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonTopicReactive; @@ -245,6 +247,16 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonDequeReactive(codec, commandExecutor, name); } + @Override + public RSetCacheReactive getSetCache(String name) { + return new RedissonSetCacheReactive(evictionScheduler, commandExecutor, name); + } + + @Override + public RSetCacheReactive getSetCache(String name, Codec codec) { + return new RedissonSetCacheReactive(codec, evictionScheduler, commandExecutor, name); + } + @Override public RAtomicLongReactive getAtomicLong(String name) { return new RedissonAtomicLongReactive(commandExecutor, name); @@ -293,5 +305,6 @@ public class RedissonReactive implements RedissonReactiveClient { connectionManager.shutdown(); } + } diff --git a/src/main/java/org/redisson/RedissonReadLock.java b/src/main/java/org/redisson/RedissonReadLock.java index 5dd9e87a5..fb1d8bd6e 100644 --- a/src/main/java/org/redisson/RedissonReadLock.java +++ b/src/main/java/org/redisson/RedissonReadLock.java @@ -105,7 +105,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { - stopRefreshTask(); + cancelExpirationRenewal(); } } @@ -115,7 +115,7 @@ public class RedissonReadLock extends RedissonLock implements RLock { } Future forceUnlockAsync() { - stopRefreshTask(); + cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " + diff --git a/src/main/java/org/redisson/RedissonSetCache.java b/src/main/java/org/redisson/RedissonSetCache.java index 02908f71b..bdfaefc98 100644 --- a/src/main/java/org/redisson/RedissonSetCache.java +++ b/src/main/java/org/redisson/RedissonSetCache.java @@ -498,7 +498,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + "redis.call('pexpire', KEYS[2], ARGV[1]); " + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", - Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive)); + Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive)); } @Override diff --git a/src/main/java/org/redisson/RedissonWriteLock.java b/src/main/java/org/redisson/RedissonWriteLock.java index 526790b31..474f6915c 100644 --- a/src/main/java/org/redisson/RedissonWriteLock.java +++ b/src/main/java/org/redisson/RedissonWriteLock.java @@ -107,7 +107,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { - stopRefreshTask(); + cancelExpirationRenewal(); } } @@ -117,7 +117,7 @@ public class RedissonWriteLock extends RedissonLock implements RLock { } Future forceUnlockAsync() { - stopRefreshTask(); + cancelExpirationRenewal(); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hdel', KEYS[1], KEYS[2]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " + diff --git a/src/main/java/org/redisson/api/RBatchReactive.java b/src/main/java/org/redisson/api/RBatchReactive.java index ff0a04d53..2ad4d1cd1 100644 --- a/src/main/java/org/redisson/api/RBatchReactive.java +++ b/src/main/java/org/redisson/api/RBatchReactive.java @@ -33,6 +33,33 @@ import org.redisson.client.codec.Codec; */ public interface RBatchReactive { + /** + * Returns set-based cache instance by name. + * Uses map (value_hash, value) under the hood for minimal memory consumption. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCacheReactive getSetCache(String name); + + /** + * Returns set-based cache instance by name + * using provided codec for values. + * Uses map (value_hash, value) under the hood for minimal memory consumption. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCacheReactive getSetCache(String name, Codec codec); + /** * Returns map-based cache instance by name * using provided codec for both cache keys and values. @@ -175,6 +202,12 @@ public interface RBatchReactive { */ RLexSortedSetReactive getLexSortedSet(String name); + /** + * Returns bitSet instance by name. + * + * @param name of bitSet + * @return + */ RBitSetReactive getBitSet(String name); /** diff --git a/src/main/java/org/redisson/api/RSetCacheReactive.java b/src/main/java/org/redisson/api/RSetCacheReactive.java new file mode 100644 index 000000000..bc05f8fa1 --- /dev/null +++ b/src/main/java/org/redisson/api/RSetCacheReactive.java @@ -0,0 +1,42 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * Async set functions + * + * @author Nikita Koksharov + * + * @param value + */ +public interface RSetCacheReactive extends RCollectionReactive { + + Publisher add(V value, long ttl, TimeUnit unit); + + /** + * Returns the number of elements in cache. + * This number can reflects expired elements too + * due to non realtime cleanup process. + * + */ + @Override + Publisher size(); + +} diff --git a/src/main/java/org/redisson/api/RedissonReactiveClient.java b/src/main/java/org/redisson/api/RedissonReactiveClient.java index 8f2208bd8..039e67993 100644 --- a/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -32,6 +32,30 @@ import org.redisson.core.NodesGroup; */ public interface RedissonReactiveClient { + /** + * Returns set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCacheReactive getSetCache(String name); + + /** + * Returns set-based cache instance by name. + * Supports value eviction with a given TTL value. + * + *

If eviction is not required then it's better to use regular map {@link #getSet(String, Codec)}.

+ * + * @param name + * @param codec + * @return + */ + RSetCacheReactive getSetCache(String name, Codec codec); + /** * Returns map-based cache instance by name * using provided codec for both cache keys and values. diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index d3e4a339d..de097ac61 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -176,6 +176,7 @@ public interface RedisCommands { RedisCommand> HVALS = new RedisCommand>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); RedisCommand HEXISTS = new RedisCommand("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY); RedisStrictCommand HLEN = new RedisStrictCommand("HLEN", new IntegerReplayConvertor()); + RedisStrictCommand HLEN_LONG = new RedisStrictCommand("HLEN"); RedisCommand> HKEYS = new RedisCommand>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); RedisCommand HMSET = new RedisCommand("HMSET", new VoidReplayConvertor(), 2, ValueType.MAP); RedisCommand> HMGET = new RedisCommand>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); diff --git a/src/main/java/org/redisson/reactive/RedissonCollectionReactive.java b/src/main/java/org/redisson/reactive/PublisherAdder.java similarity index 72% rename from src/main/java/org/redisson/reactive/RedissonCollectionReactive.java rename to src/main/java/org/redisson/reactive/PublisherAdder.java index 38deade2b..2d2ec7c80 100644 --- a/src/main/java/org/redisson/reactive/RedissonCollectionReactive.java +++ b/src/main/java/org/redisson/reactive/PublisherAdder.java @@ -17,27 +17,25 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; +import org.redisson.api.RCollectionReactive; -import reactor.fn.BiFunction; -import reactor.fn.Function; import reactor.rx.Promise; import reactor.rx.Promises; import reactor.rx.action.support.DefaultSubscriber; +public class PublisherAdder { -abstract class RedissonCollectionReactive extends RedissonExpirableReactive { + private final RCollectionReactive destination; - RedissonCollectionReactive(CommandReactiveExecutor connectionManager, String name) { - super(connectionManager, name); + public PublisherAdder(RCollectionReactive destination) { + this.destination = destination; } - RedissonCollectionReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { - super(codec, connectionManager, name); + public Long sum(Long first, Long second) { + return first + second; } - protected Publisher addAll(Publisher c, final Function> function, final BiFunction sizeFunc) { + public Publisher addAll(Publisher c) { final Promise promise = Promises.prepare(); c.subscribe(new DefaultSubscriber() { @@ -55,7 +53,7 @@ abstract class RedissonCollectionReactive extends RedissonExpirableReactive { @Override public void onNext(V o) { lastValue = o; - function.apply(o).subscribe(new DefaultSubscriber() { + destination.add(o).subscribe(new DefaultSubscriber() { @Override public void onSubscribe(Subscription s) { @@ -69,7 +67,7 @@ abstract class RedissonCollectionReactive extends RedissonExpirableReactive { @Override public void onNext(Long o) { - lastSize = sizeFunc.apply(lastSize, o); + lastSize = sum(lastSize, o); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 3ce533000..17c282323 100644 --- a/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -34,6 +34,7 @@ import org.redisson.api.RMapReactive; import org.redisson.api.RQueueReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; +import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetReactive; import org.redisson.api.RTopicReactive; import org.redisson.client.codec.Codec; @@ -155,6 +156,16 @@ public class RedissonBatchReactive implements RBatchReactive { return new RedissonAtomicLongReactive(executorService, name); } + @Override + public RSetCacheReactive getSetCache(String name) { + return new RedissonSetCacheReactive(evictionScheduler, executorService, name); + } + + @Override + public RSetCacheReactive getSetCache(String name, Codec codec) { + return new RedissonSetCacheReactive(codec, evictionScheduler, executorService, name); + } + @Override public RScoredSortedSetReactive getScoredSortedSet(String name) { return new RedissonScoredSortedSetReactive(executorService, name); diff --git a/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java b/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java index 21b129125..a70faa029 100644 --- a/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java @@ -38,7 +38,7 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen @Override public Publisher expire(long timeToLive, TimeUnit timeUnit) { - return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toSeconds(timeToLive)); + return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PEXPIRE, getName(), timeUnit.toMillis(timeToLive)); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java b/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java index 8fe89f923..8eb13b674 100644 --- a/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java @@ -25,9 +25,6 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; -import reactor.fn.BiFunction; -import reactor.fn.Function; - public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactive implements RLexSortedSetReactive { public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { @@ -36,17 +33,7 @@ public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactiv @Override public Publisher addAll(Publisher c) { - return addAll(c, new Function>() { - @Override - public Publisher apply(String o) { - return add(o); - } - }, new BiFunction() { - @Override - public Long apply(Long left, Long right) { - return left + right; - } - }); + return new PublisherAdder(this).addAll(c); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonListReactive.java b/src/main/java/org/redisson/reactive/RedissonListReactive.java index 57637f5dd..c58c515b8 100644 --- a/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -33,8 +33,8 @@ import org.reactivestreams.Subscription; import org.redisson.api.RListReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommand.ValueType; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; @@ -54,7 +54,7 @@ import reactor.rx.subscription.ReactiveSubscription; * * @param the type of elements held in this collection */ -public class RedissonListReactive extends RedissonCollectionReactive implements RListReactive { +public class RedissonListReactive extends RedissonExpirableReactive implements RListReactive { public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); @@ -175,17 +175,14 @@ public class RedissonListReactive extends RedissonCollectionReactive imple @Override public Publisher addAll(Publisher c) { - return addAll(c, new Function>() { - @Override - public Publisher apply(V o) { - return add(o); - } - }, new BiFunction() { + return new PublisherAdder(this) { + @Override - public Long apply(Long left, Long right) { - return right; + public Long sum(Long first, Long second) { + return second; } - }); + + }.addAll(c); } @Override @@ -276,10 +273,6 @@ public class RedissonListReactive extends RedissonCollectionReactive imple return commandExecutor.readReactive(getName(), codec, LINDEX, getName(), index); } - private boolean isPositionInRange(long index, long size) { - return index >= 0 && index <= size; - } - @Override public Publisher set(long index, V element) { return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand("EVAL", 5), diff --git a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index 2f4f28190..b9dd061c7 100644 --- a/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -278,7 +278,6 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im } }); - } @Override @@ -351,7 +350,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + "redis.call('pexpire', KEYS[2], ARGV[1]); " + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", - Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toSeconds(timeToLive)); + Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive)); } @Override diff --git a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 6a17d955d..3a2ab55eb 100644 --- a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -16,14 +16,11 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.redisson.api.RObjectReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; -import reactor.core.reactivestreams.SubscriberBarrier; import reactor.rx.Stream; import reactor.rx.Streams; diff --git a/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index b53310902..50af3b9a4 100644 --- a/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -17,29 +17,22 @@ package org.redisson.reactive; import java.math.BigDecimal; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.List; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; -import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveExecutor; -import reactor.core.reactivestreams.SubscriberBarrier; -import reactor.rx.Stream; - -public class RedissonScoredSortedSetReactive extends RedissonCollectionReactive implements RScoredSortedSetReactive { +public class RedissonScoredSortedSetReactive extends RedissonExpirableReactive implements RScoredSortedSetReactive { public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); @@ -135,107 +128,11 @@ public class RedissonScoredSortedSetReactive extends RedissonCollectionReacti @Override public Publisher iterator() { - return new Stream() { - + return new SetReactiveIterator() { @Override - public void subscribe(final Subscriber t) { - t.onSubscribe(new SubscriberBarrier(t) { - - private List firstValues; - private long nextIterPos; - private InetSocketAddress client; - - private long currentIndex; - private List prevValues = new ArrayList(); - - @Override - protected void doRequest(long n) { - currentIndex = n; - - if (!prevValues.isEmpty()) { - List vals = new ArrayList(prevValues); - prevValues.clear(); - - handle(vals); - - if (currentIndex == 0) { - return; - } - } - - nextValues(); - } - - private void handle(List vals) { - for (V val : vals) { - if (currentIndex > 0) { - onNext(val); - } else { - prevValues.add(val); - } - currentIndex--; - if (currentIndex == 0) { - onComplete(); - } - } - } - - protected void nextValues() { - final SubscriberBarrier m = this; - scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(ListScanResult res) { - client = res.getRedisClient(); - - long prevIterPos = nextIterPos; - if (nextIterPos == 0 && firstValues == null) { - firstValues = res.getValues(); - } else if (res.getValues().equals(firstValues)) { - m.onComplete(); - currentIndex = 0; - return; - } - - nextIterPos = res.getPos(); - if (prevIterPos == nextIterPos) { - nextIterPos = -1; - } - - handle(res.getValues()); - - if (currentIndex == 0) { - return; - } - - if (nextIterPos == -1) { - m.onComplete(); - currentIndex = 0; - } - } - - @Override - public void onError(Throwable error) { - m.onError(error); - } - - @Override - public void onComplete() { - if (currentIndex == 0) { - return; - } - nextValues(); - } - }); - } - }); + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); } - }; } diff --git a/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java new file mode 100644 index 000000000..2d43fdbe3 --- /dev/null +++ b/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -0,0 +1,373 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.redisson.EvictionScheduler; +import org.redisson.api.RSetCacheReactive; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.convertor.Convertor; +import org.redisson.client.protocol.convertor.LongReplayConvertor; +import org.redisson.client.protocol.convertor.VoidReplayConvertor; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.command.CommandReactiveExecutor; + +import net.openhft.hashing.LongHashFunction; +import reactor.rx.Promise; +import reactor.rx.Promises; +import reactor.rx.action.support.DefaultSubscriber; + +/** + *

Set-based cache with ability to set TTL for each entry via + * {@link #put(Object, Object, long, TimeUnit)} method. + * And therefore has an complex lua-scripts inside. + * Uses map(value_hash, value) to tie with sorted set which contains expiration record for every value with TTL. + *

+ * + *

Current Redis implementation doesn't have set entry eviction functionality. + * Thus values are checked for TTL expiration during any value read operation. + * If entry expired then it doesn't returns and clean task runs hronous. + * Clean task deletes removes 100 expired entries at once. + * In addition there is {@link org.redisson.EvictionScheduler}. This scheduler + * deletes expired entries in time interval between 5 seconds to 2 hours.

+ * + *

If eviction is not required then it's better to use {@link org.redisson.reactive.RedissonSet}.

+ * + * @author Nikita Koksharov + * + * @param key + * @param value + */ +public class RedissonSetCacheReactive extends RedissonExpirableReactive implements RSetCacheReactive { + + private static final RedisCommand ADD_ALL = new RedisCommand("HMSET", new VoidReplayConvertor()); + private static final RedisCommand EVAL_ADD = new RedisCommand("EVAL", new LongReplayConvertor(), 5); + private static final RedisCommand> EVAL_CONTAINS_KEY = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); + private static final RedisStrictCommand HDEL = new RedisStrictCommand("HDEL", new BooleanReplayConvertor()); + + private final EvictionScheduler evictionScheduler; + + public RedissonSetCacheReactive(EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + this.evictionScheduler = evictionScheduler; + evictionScheduler.schedule(getName(), getTimeoutSetName()); + } + + public RedissonSetCacheReactive(Codec codec, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + this.evictionScheduler = evictionScheduler; + evictionScheduler.schedule(getName(), getTimeoutSetName()); + } + + @Override + public Publisher size() { + return commandExecutor.readReactive(getName(), codec, RedisCommands.HLEN_LONG, getName()); + } + + private byte[] hash(Object o) { + if (o == null) { + throw new NullPointerException("Value can't be null"); + } + try { + byte[] objectState = codec.getValueEncoder().encode(o); + return hash(objectState); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private byte[] hash(byte[] objectState) { + long h1 = LongHashFunction.farmUo().hashBytes(objectState); + long h2 = LongHashFunction.xx_r39().hashBytes(objectState); + + return ByteBuffer.allocate((2 * Long.SIZE) / Byte.SIZE) + .putLong(h1) + .putLong(h2) + .array(); + } + + String getTimeoutSetName() { + return "redisson__timeout__set__{" + getName() + "}"; + } + + @Override + public Publisher contains(Object o) { + Promise result = Promises.prepare(); + + byte[] key = hash(o); + + Publisher> future = commandExecutor.evalReadReactive(getName(), codec, EVAL_CONTAINS_KEY, + "local value = redis.call('hexists', KEYS[1], KEYS[3]); " + + "local expireDate = 92233720368547758; " + + "if value == 1 then " + + "local expireDateScore = redis.call('zscore', KEYS[2], KEYS[3]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "end;" + + "return {expireDate, value}; ", + Arrays.asList(getName(), getTimeoutSetName(), key)); + + addExpireListener(result, future, new BooleanReplayConvertor(), false); + + return result; + } + + private void addExpireListener(final Promise result, Publisher> publisher, final Convertor convertor, final T nullValue) { + publisher.subscribe(new DefaultSubscriber>() { + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(List res) { + Long expireDate = (Long) res.get(0); + long currentDate = System.currentTimeMillis(); + if (expireDate <= currentDate) { + result.onNext(nullValue); + result.onComplete(); + evictionScheduler.runCleanTask(getName(), getTimeoutSetName(), currentDate); + return; + } + + if (convertor != null) { + result.onNext((T) convertor.convert(res.get(1))); + } else { + result.onNext((T) res.get(1)); + } + result.onComplete(); + } + + @Override + public void onError(Throwable t) { + result.onError(t); + } + + }); + } + + Publisher> scanIterator(InetSocketAddress client, long startPos) { + return commandExecutor.evalReadReactive(client, getName(), codec, RedisCommands.EVAL_SSCAN, + "local result = {}; " + + "local res = redis.call('hscan', KEYS[1], ARGV[1]); " + + "for i, value in ipairs(res[2]) do " + + "if i % 2 == 0 then " + + "local key = res[2][i-1]; " + + "local expireDate = redis.call('zscore', KEYS[2], key); " + + "if (expireDate == false) or (expireDate ~= false and tonumber(expireDate) > tonumber(ARGV[2])) then " + + "table.insert(result, value); " + + "end; " + + "end; " + + "end;" + + "return {res[1], result};", Arrays.asList(getName(), getTimeoutSetName()), startPos, System.currentTimeMillis()); + } + + @Override + public Publisher iterator() { + return new SetReactiveIterator() { + @Override + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos); + } + }; + } + + @Override + public Publisher add(V value, long ttl, TimeUnit unit) { + if (unit == null) { + throw new NullPointerException("TimeUnit param can't be null"); + } + + try { + byte[] objectState = encode(value); + byte[] key = hash(objectState); + + long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl); + return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], ARGV[1], KEYS[3]); " + + "if redis.call('hexists', KEYS[1], KEYS[3]) == 0 then " + + "redis.call('hset', KEYS[1], KEYS[3], ARGV[2]); " + + "return 1; " + + "end;" + + "return 0; ", + Arrays.asList(getName(), getTimeoutSetName(), key), timeoutDate, objectState); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private byte[] encode(V value) throws IOException { + return codec.getValueEncoder().encode(value); + } + + @Override + public Publisher add(V e) { + byte[] key = hash(e); + return commandExecutor.evalWriteReactive(getName(), codec, EVAL_ADD, + "if redis.call('hexists', KEYS[1], KEYS[2]) == 0 then " + + "redis.call('hset', KEYS[1], KEYS[2], ARGV[1]); " + + "return 1; " + + "end; " + + "return 0; ", + Arrays.asList(getName(), key), e); + } + + @Override + public Publisher remove(Object o) { + byte[] key = hash(o); + return commandExecutor.writeReactive(getName(), codec, HDEL, getName(), key); + } + + @Override + public Publisher containsAll(Collection c) { + return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN_WITH_VALUES, + "local s = redis.call('hvals', KEYS[1]);" + + "for i = 0, table.getn(s), 1 do " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == s[i] then " + + "table.remove(ARGV, j) " + + "end " + + "end; " + + "end;" + + "return table.getn(ARGV) == 0 and 1 or 0; ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher addAll(Collection c) { + if (c.isEmpty()) { + return newSucceeded(0L); + } + + List params = new ArrayList(c.size()*2 + 1); + params.add(getName()); + try { + for (V value : c) { + byte[] objectState = encode(value); + byte[] key = hash(objectState); + params.add(key); + params.add(objectState); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return commandExecutor.writeReactive(getName(), codec, ADD_ALL, params.toArray()); + } + + @Override + public Publisher retainAll(Collection c) { + List params = new ArrayList(c.size()); + for (Object object : c) { + params.add(hash(object)); + } + return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_BOOLEAN, + "local keys = redis.call('hkeys', KEYS[1]); " + + "local i=1;" + + "while i <= #keys do " + + "local changed = false;" + + "local element = keys[i];" + + "for j, argElement in pairs(ARGV) do " + + "if argElement == element then " + + "changed = true;" + + "table.remove(keys, i); " + + "table.remove(ARGV, j); " + + "break; " + + "end; " + + "end; " + + "if changed == false then " + + "i = i + 1 " + + "end " + + "end " + + "if #keys > 0 then " + + "for i=1, #keys,5000 do " + + "redis.call('hdel', KEYS[1], unpack(keys, i, math.min(i+4999, #keys))); " + + "redis.call('zrem', KEYS[2], unpack(keys, i, math.min(i+4999, #keys))); " + + "end " + + "return 1;" + + "end; " + + "return 0; ", + Arrays.asList(getName(), getTimeoutSetName()), params.toArray()); + } + + @Override + public Publisher removeAll(Collection c) { + List params = new ArrayList(c.size()+1); + params.add(getName()); + for (Object object : c) { + params.add(hash(object)); + } + + return commandExecutor.writeReactive(getName(), codec, HDEL, params.toArray()); + } + + @Override + public Publisher delete() { + return commandExecutor.writeReactive(getName(), RedisCommands.DEL_SINGLE, getName(), getTimeoutSetName()); + } + + @Override + public Publisher expire(long timeToLive, TimeUnit timeUnit) { + return commandExecutor.evalWriteReactive(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + + "redis.call('pexpire', KEYS[2], ARGV[1]); " + + "return redis.call('pexpire', KEYS[1], ARGV[1]); ", + Arrays.asList(getName(), getTimeoutSetName()), timeUnit.toMillis(timeToLive)); + } + + @Override + public Publisher expireAt(long timestamp) { + return commandExecutor.evalWriteReactive(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zadd', KEYS[2], 92233720368547758, 'redisson__expiretag');" + + "redis.call('pexpireat', KEYS[2], ARGV[1]); " + + "return redis.call('pexpireat', KEYS[1], ARGV[1]); ", + Arrays.asList(getName(), getTimeoutSetName()), timestamp); + } + + @Override + public Publisher clearExpire() { + return commandExecutor.evalWriteReactive(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + "redis.call('zrem', KEYS[2], 'redisson__expiretag'); " + + "redis.call('persist', KEYS[2]); " + + "return redis.call('persist', KEYS[1]); ", + Arrays.asList(getName(), getTimeoutSetName())); + } + + @Override + public Publisher addAll(Publisher c) { + return new PublisherAdder(this).addAll(c); + } + +} diff --git a/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/src/main/java/org/redisson/reactive/RedissonSetReactive.java index a6c81ebaa..016a066b8 100644 --- a/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -22,19 +22,12 @@ import java.util.Collections; import java.util.List; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.redisson.api.RSetReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveExecutor; -import reactor.core.reactivestreams.SubscriberBarrier; -import reactor.fn.BiFunction; -import reactor.fn.Function; -import reactor.rx.Stream; - /** * Distributed and concurrent implementation of {@link java.util.Set} * @@ -42,7 +35,7 @@ import reactor.rx.Stream; * * @param value */ -public class RedissonSetReactive extends RedissonCollectionReactive implements RSetReactive { +public class RedissonSetReactive extends RedissonExpirableReactive implements RSetReactive { public RedissonSetReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); @@ -54,17 +47,7 @@ public class RedissonSetReactive extends RedissonCollectionReactive implem @Override public Publisher addAll(Publisher c) { - return addAll(c, new Function>() { - @Override - public Publisher apply(V o) { - return add(o); - } - }, new BiFunction() { - @Override - public Long apply(Long left, Long right) { - return left + right; - } - }); + return new PublisherAdder(this).addAll(c); } @Override @@ -157,107 +140,11 @@ public class RedissonSetReactive extends RedissonCollectionReactive implem @Override public Publisher iterator() { - return new Stream() { - + return new SetReactiveIterator() { @Override - public void subscribe(final Subscriber t) { - t.onSubscribe(new SubscriberBarrier(t) { - - private List firstValues; - private long nextIterPos; - private InetSocketAddress client; - - private long currentIndex; - private List prevValues = new ArrayList(); - - @Override - protected void doRequest(long n) { - currentIndex = n; - - if (!prevValues.isEmpty()) { - List vals = new ArrayList(prevValues); - prevValues.clear(); - - handle(vals); - - if (currentIndex == 0) { - return; - } - } - - nextValues(); - } - - private void handle(List vals) { - for (V val : vals) { - if (currentIndex > 0) { - onNext(val); - } else { - prevValues.add(val); - } - currentIndex--; - if (currentIndex == 0) { - onComplete(); - } - } - } - - protected void nextValues() { - final SubscriberBarrier m = this; - scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(ListScanResult res) { - client = res.getRedisClient(); - - long prevIterPos = nextIterPos; - if (nextIterPos == 0 && firstValues == null) { - firstValues = res.getValues(); - } else if (res.getValues().equals(firstValues)) { - m.onComplete(); - currentIndex = 0; - return; - } - - nextIterPos = res.getPos(); - if (prevIterPos == nextIterPos) { - nextIterPos = -1; - } - - handle(res.getValues()); - - if (currentIndex == 0) { - return; - } - - if (nextIterPos == -1) { - m.onComplete(); - currentIndex = 0; - } - } - - @Override - public void onError(Throwable error) { - m.onError(error); - } - - @Override - public void onComplete() { - if (currentIndex == 0) { - return; - } - nextValues(); - } - }); - } - }); + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); } - }; } diff --git a/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/src/main/java/org/redisson/reactive/SetReactiveIterator.java new file mode 100644 index 000000000..b4855f1b3 --- /dev/null +++ b/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -0,0 +1,133 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.client.protocol.decoder.ListScanResult; + +import reactor.core.reactivestreams.SubscriberBarrier; +import reactor.rx.Stream; + +public abstract class SetReactiveIterator extends Stream { + + @Override + public void subscribe(final Subscriber t) { + t.onSubscribe(new SubscriberBarrier(t) { + + private List firstValues; + private long nextIterPos; + private InetSocketAddress client; + + private long currentIndex; + private List prevValues = new ArrayList(); + + @Override + protected void doRequest(long n) { + currentIndex = n; + + if (!prevValues.isEmpty()) { + List vals = new ArrayList(prevValues); + prevValues.clear(); + + handle(vals); + + if (currentIndex == 0) { + return; + } + } + + nextValues(); + } + + private void handle(List vals) { + for (V val : vals) { + if (currentIndex > 0) { + onNext(val); + } else { + prevValues.add(val); + } + currentIndex--; + if (currentIndex == 0) { + onComplete(); + } + } + } + + protected void nextValues() { + final SubscriberBarrier m = this; + scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ListScanResult res) { + client = res.getRedisClient(); + + long prevIterPos = nextIterPos; + if (nextIterPos == 0 && firstValues == null) { + firstValues = res.getValues(); + } else if (res.getValues().equals(firstValues)) { + m.onComplete(); + currentIndex = 0; + return; + } + + nextIterPos = res.getPos(); + if (prevIterPos == nextIterPos) { + nextIterPos = -1; + } + + handle(res.getValues()); + + if (currentIndex == 0) { + return; + } + + if (nextIterPos == -1) { + m.onComplete(); + currentIndex = 0; + } + } + + @Override + public void onError(Throwable error) { + m.onError(error); + } + + @Override + public void onComplete() { + if (currentIndex == 0) { + return; + } + nextValues(); + } + }); + } + }); + } + + protected abstract Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos); + +}