From 1e7f132742cf318cf19ffa42880bb07a7b224263 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Jul 2018 13:10:01 +0300 Subject: [PATCH 1/5] RObjectReactive and RScoredSortedSetReactive interfaces synced with RObjectAsync and RScoredSortedSetAsync --- .../org/redisson/RedissonScoredSortedSet.java | 9 +- .../org/redisson/api/RObjectReactive.java | 74 ++++++ .../api/RScoredSortedSetReactive.java | 224 +++++++++++++++++- .../reactive/RedissonObjectReactive.java | 81 +++++++ .../RedissonScoredSortedSetReactive.java | 188 ++++++++++++++- 5 files changed, 563 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 4344e839a..7f8a51890 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -393,12 +393,17 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc } private ListScanResult scanIterator(RedisClient client, long startPos, String pattern, int count) { + RFuture> f = scanIteratorAsync(client, startPos, pattern, count); + return get(f); + } + + public RFuture> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) { if (pattern == null) { RFuture> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "COUNT", count); - return get(f); + return f; } RFuture> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "MATCH", pattern, "COUNT", count); - return get(f); + return f; } @Override diff --git a/redisson/src/main/java/org/redisson/api/RObjectReactive.java b/redisson/src/main/java/org/redisson/api/RObjectReactive.java index 088f3527e..94ea5da11 100644 --- a/redisson/src/main/java/org/redisson/api/RObjectReactive.java +++ b/redisson/src/main/java/org/redisson/api/RObjectReactive.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; @@ -29,6 +31,78 @@ public interface RObjectReactive { String getName(); Codec getCodec(); + + /** + * Restores object using its state returned by {@link #dump()} method. + * + * @param state - state of object + * @return void + */ + Publisher restore(byte[] state); + + /** + * Restores object using its state returned by {@link #dump()} method and set time to live for it. + * + * @param state - state of object + * @param timeToLive - time to live of the object + * @param timeUnit - time unit + * @return void + */ + Publisher restore(byte[] state, long timeToLive, TimeUnit timeUnit); + + /** + * Restores and replaces object if it already exists. + * + * @param state - state of the object + * @return void + */ + Publisher restoreAndReplace(byte[] state); + + /** + * Restores and replaces object if it already exists and set time to live for it. + * + * @param state - state of the object + * @param timeToLive - time to live of the object + * @param timeUnit - time unit + * @return void + */ + Publisher restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit); + + /** + * Returns dump of object + * + * @return dump + */ + Publisher dump(); + + /** + * Update the last access time of an object. + * + * @return true if object was touched else false + */ + Publisher touch(); + + /** + * Delete the objects. + * Actual removal will happen later asynchronously. + *

+ * Requires Redis 4.0+ + * + * @return true if it was exist and deleted else false + */ + Publisher unlink(); + + /** + * Copy object from source Redis instance to destination Redis instance + * + * @param host - destination host + * @param port - destination port + * @param database - destination database + * @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds + * @return void + */ + Publisher copy(String host, int port, int database, long timeout); + /** * Transfer a object from a source Redis instance to a destination Redis instance * in mode diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index 3bf788487..98e538516 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -17,33 +17,235 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; +/** + * + * @author Nikita Koksharov + * + * @param value type + */ public interface RScoredSortedSetReactive extends RExpirableReactive { + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + Publisher pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + * + */ + Publisher pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + Publisher pollFirst(long timeout, TimeUnit unit); + + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + Publisher pollLast(long timeout, TimeUnit unit); + + /** + * Removes and returns the head elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the head element, + * or {@code null} if this sorted set is empty + */ + Publisher> pollFirst(int count); + + /** + * Removes and returns the tail elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the tail element or {@code null} if this sorted set is empty + */ + Publisher> pollLast(int count); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element, + * or {@code null} if this sorted set is empty + */ Publisher pollFirst(); + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ Publisher pollLast(); - Publisher iterator(); - + /** + * Returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element or {@code null} if this sorted set is empty + */ Publisher first(); + /** + * Returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ Publisher last(); + /** + * Returns score of the head element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ + Publisher firstScore(); + + /** + * Returns score of the tail element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ + Publisher lastScore(); + + /** + * Returns an iterator over elements in this set. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @return iterator + */ + Publisher iterator(String pattern); + + /** + * Returns an iterator over elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * + * @param count - size of elements batch + * @return iterator + */ + Publisher iterator(int count); + + /** + * Returns an iterator over elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @param count - size of elements batch + * @return iterator + */ + Publisher iterator(String pattern, int count); + + Publisher iterator(); + Publisher removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); Publisher removeRangeByRank(int startIndex, int endIndex); + /** + * Returns rank of value, with the scores ordered from low to high. + * + * @param o - object + * @return rank or null if value does not exist + */ Publisher rank(V o); + + /** + * Returns rank of value, with the scores ordered from high to low. + * + * @param o - object + * @return rank or null if value does not exist + */ + Publisher revRank(V o); Publisher getScore(V o); + /** + * Adds element to this set, overrides previous score if it has been already added. + * + * @param score - object score + * @param object - object itself + * @return true if element has added and false if not. + */ Publisher add(double score, V object); + Publisher addAll(Map objects); + + /** + * Adds element to this set, overrides previous score if it has been already added. + * Finally return the rank of the item + * + * @param score - object score + * @param object - object itself + * @return rank + */ + Publisher addAndGetRank(double score, V object); + + /** + * Adds element to this set, overrides previous score if it has been already added. + * Finally return the reverse rank of the item + * + * @param score - object score + * @param object - object itself + * @return reverse rank + */ + Publisher addAndGetRevRank(double score, V object); + + /** + * Adds element to this set only if has not been added before. + *

+ * Requires Redis 3.0.2 and higher. + * + * @param score - object score + * @param object - object itself + * @return true if element has added and false if not. + */ + Publisher tryAdd(double score, V object); + Publisher remove(V object); Publisher size(); @@ -58,6 +260,24 @@ public interface RScoredSortedSetReactive extends RExpirableReactive { Publisher addScore(V object, Number value); + /** + * Adds score to element and returns its reverse rank + * + * @param object - object itself + * @param value - object score + * @return reverse rank + */ + Publisher addScoreAndGetRevRank(V object, Number value); + + /** + * Adds score to element and returns its rank + * + * @param object - object itself + * @param value - object score + * @return rank + */ + Publisher addScoreAndGetRank(V object, Number value); + Publisher> valueRange(int startIndex, int endIndex); Publisher>> entryRange(int startIndex, int endIndex); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 1604b52f4..0e6106052 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -17,6 +17,7 @@ package org.redisson.reactive; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonReference; @@ -125,6 +126,86 @@ abstract class RedissonObjectReactive implements RObjectReactive { } } + @Override + public Publisher restore(final byte[] state) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.restoreAsync(state); + } + }); + } + + @Override + public Publisher restore(final byte[] state, final long timeToLive, final TimeUnit timeUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.restoreAsync(state, timeToLive, timeUnit); + } + }); + } + + @Override + public Publisher restoreAndReplace(final byte[] state) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.restoreAndReplaceAsync(state); + } + }); + } + + @Override + public Publisher restoreAndReplace(final byte[] state, final long timeToLive, final TimeUnit timeUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.restoreAndReplaceAsync(state, timeToLive, timeUnit); + } + }); + } + + @Override + public Publisher dump() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.dumpAsync(); + } + }); + } + + @Override + public Publisher touch() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.touchAsync(); + } + }); + } + + @Override + public Publisher unlink() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.unlinkAsync(); + } + }); + } + + @Override + public Publisher copy(final String host, final int port, final int database, final long timeout) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.copyAsync(host, port, database, timeout); + } + }); + } + @Override public Publisher rename(final String newName) { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 8f7311327..6039168c2 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -17,6 +17,7 @@ package org.redisson.reactive; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonScoredSortedSet; @@ -26,7 +27,6 @@ import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveExecutor; @@ -181,20 +181,25 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv }); } - private Publisher> scanIteratorReactive(RedisClient client, long startPos) { - return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); - } - - @Override - public Publisher iterator() { + private Publisher scanIteratorReactive(final String pattern, final int count) { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { - return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); + protected Publisher> scanIteratorReactive(final RedisClient client, final long nextIterPos) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return ((RedissonScoredSortedSet)instance).scanIteratorAsync(client, nextIterPos, pattern, count); + } + }); } }; } + @Override + public Publisher iterator() { + return scanIteratorReactive(null, 10); + } + @Override public Publisher containsAll(final Collection c) { return reactive(new Supplier>() { @@ -460,4 +465,169 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv }); } + @Override + public Publisher pollLastFromAny(final long timeout, final TimeUnit unit, final String... queueNames) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.pollLastFromAnyAsync(timeout, unit, queueNames); + } + }); + } + + @Override + public Publisher pollFirstFromAny(final long timeout, final TimeUnit unit, final String... queueNames) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.pollFirstFromAnyAsync(timeout, unit, queueNames); + } + }); + } + + @Override + public Publisher pollFirst(final long timeout, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.pollFirstAsync(timeout, unit); + } + }); + } + + @Override + public Publisher pollLast(final long timeout, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.pollLastAsync(timeout, unit); + } + }); + } + + @Override + public Publisher> pollFirst(final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.pollFirstAsync(count); + } + }); + } + + @Override + public Publisher> pollLast(final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.pollLastAsync(count); + } + }); + } + + @Override + public Publisher firstScore() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.firstScoreAsync(); + } + }); + } + + @Override + public Publisher lastScore() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.lastScoreAsync(); + } + }); + } + + @Override + public Publisher iterator(String pattern) { + return scanIteratorReactive(pattern, 10); + } + + @Override + public Publisher iterator(int count) { + return scanIteratorReactive(null, count); + } + + @Override + public Publisher iterator(String pattern, int count) { + return scanIteratorReactive(pattern, count); + } + + @Override + public Publisher revRank(final V o) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.revRankAsync(o); + } + }); + } + + @Override + public Publisher addAll(final Map objects) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAllAsync(objects); + } + }); + } + + @Override + public Publisher addAndGetRank(final double score, final V object) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAndGetRankAsync(score, object); + } + }); + } + + @Override + public Publisher addAndGetRevRank(final double score, final V object) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAndGetRevRankAsync(score, object); + } + }); + } + + @Override + public Publisher tryAdd(final double score, final V object) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAddAsync(score, object); + } + }); + } + + @Override + public Publisher addScoreAndGetRevRank(final V object, final Number value) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addScoreAndGetRevRankAsync(object, value); + } + }); + } + + @Override + public Publisher addScoreAndGetRank(final V object, final Number value) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addScoreAndGetRankAsync(object, value); + } + }); + } + } From 13305dfcbc9daa17a91a2807f062d1a5a51bb026 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Jul 2018 13:44:12 +0300 Subject: [PATCH 2/5] RRateLimiterReactive object added. #1553 --- .../java/org/redisson/RedissonReactive.java | 10 +- .../redisson/api/RRateLimiterReactive.java | 144 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 8 + .../reactive/RedissonExpirableReactive.java | 4 + .../reactive/RedissonObjectReactive.java | 8 +- .../reactive/RedissonRateLimiterReactive.java | 120 +++++++++++++++ 6 files changed, 287 insertions(+), 7 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index a21c3fae3..e0a60598d 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -16,7 +16,6 @@ package org.redisson; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.UUID; @@ -32,7 +31,6 @@ import org.redisson.api.RBitSetReactive; import org.redisson.api.RBlockingQueueReactive; import org.redisson.api.RBucketReactive; import org.redisson.api.RDequeReactive; -import org.redisson.api.RFuture; import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RKeys; import org.redisson.api.RKeysReactive; @@ -45,6 +43,7 @@ import org.redisson.api.RMapReactive; import org.redisson.api.RPatternTopicReactive; import org.redisson.api.RPermitExpirableSemaphoreReactive; import org.redisson.api.RQueueReactive; +import org.redisson.api.RRateLimiterReactive; import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScriptReactive; @@ -57,7 +56,6 @@ import org.redisson.api.RTransactionReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; import org.redisson.codec.ReferenceCodecProvider; import org.redisson.command.CommandReactiveService; import org.redisson.config.Config; @@ -83,6 +81,7 @@ import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonPatternTopicReactive; import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive; import org.redisson.reactive.RedissonQueueReactive; +import org.redisson.reactive.RedissonRateLimiterReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScriptReactive; @@ -121,6 +120,11 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getReferenceCodecProvider(); } + @Override + public RRateLimiterReactive getRateLimiter(String name) { + return new RedissonRateLimiterReactive(commandExecutor, name); + } + @Override public RSemaphoreReactive getSemaphore(String name) { return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); diff --git a/redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java b/redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java new file mode 100644 index 000000000..1934ffca1 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RRateLimiterReactive.java @@ -0,0 +1,144 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + */ +public interface RRateLimiterReactive extends RObjectReactive { + + /** + * Initializes RateLimiter's state and stores config to Redis server. + * + * @param mode - rate mode + * @param rate - rate + * @param rateInterval - rate time interval + * @param rateIntervalUnit - rate time interval unit + * @return + */ + Publisher trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit); + + /** + * Acquires a permit only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(); + + /** + * Acquires the given number of permits only if all are available at the + * time of invocation. + * + *

Acquires a permits, if all are available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by given number of permits. + * + *

If no permits are available then this method will return + * immediately with the value {@code false}. + * + * @param permits the number of permits to acquire + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + */ + Publisher tryAcquire(long permits); + + /** + * Acquires a permit from this RateLimiter, blocking until one is available. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + */ + Publisher acquire(); + + /** + * Acquires a specified permits from this RateLimiter, + * blocking until one is available. + * + *

Acquires the given number of permits, if they are available + * and returns immediately, reducing the number of available permits + * by the given amount. + * + * @param permits + */ + Publisher acquire(long permits); + + /** + * Acquires a permit from this RateLimiter, if one becomes available + * within the given waiting time. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * specified waiting time elapses. + * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long timeout, TimeUnit unit); + + /** + * Acquires the given number of permits only if all are available + * within the given waiting time. + * + *

Acquires the given number of permits, if all are available and returns immediately, + * with the value {@code true}, reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * the specified waiting time elapses. + * + *

If a permits is acquired then the value {@code true} is returned. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param permits amount + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + */ + Publisher tryAcquire(long permits, long timeout, TimeUnit unit); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index 42803b932..a3862ae51 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,14 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns rate limiter instance by name + * + * @param name of rate limiter + * @return RateLimiter object + */ + RRateLimiterReactive getRateLimiter(String name); + /** * Returns semaphore instance by name * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java index 05d96898c..604395f12 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonExpirableReactive.java @@ -34,12 +34,16 @@ import reactor.fn.Supplier; */ abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { + protected final RExpirableAsync instance; + RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { super(connectionManager, name, instance); + this.instance = instance; } RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) { super(codec, connectionManager, name, instance); + this.instance = instance; } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 0e6106052..1da539cd2 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -21,8 +21,8 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.redisson.RedissonReference; -import org.redisson.api.RExpirableAsync; import org.redisson.api.RFuture; +import org.redisson.api.RObjectAsync; import org.redisson.api.RObjectReactive; import org.redisson.client.codec.Codec; import org.redisson.command.CommandReactiveExecutor; @@ -44,9 +44,9 @@ abstract class RedissonObjectReactive implements RObjectReactive { final CommandReactiveExecutor commandExecutor; private final String name; final Codec codec; - protected RExpirableAsync instance; + protected RObjectAsync instance; - public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { + public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) { this.codec = codec; this.name = name; this.commandExecutor = commandExecutor; @@ -57,7 +57,7 @@ abstract class RedissonObjectReactive implements RObjectReactive { return commandExecutor.reactive(supplier); } - public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) { + public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java new file mode 100644 index 000000000..63911757a --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java @@ -0,0 +1,120 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonRateLimiter; +import org.redisson.api.RFuture; +import org.redisson.api.RRateLimiterAsync; +import org.redisson.api.RRateLimiterReactive; +import org.redisson.api.RateIntervalUnit; +import org.redisson.api.RateType; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonRateLimiterReactive extends RedissonObjectReactive implements RRateLimiterReactive { + + private final RRateLimiterAsync instance; + + public RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name) { + this(connectionManager, name, new RedissonRateLimiter(connectionManager, name)); + } + + private RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name, RRateLimiterAsync instance) { + super(connectionManager, name, instance); + this.instance = instance; + } + + @Override + public Publisher trySetRate(final RateType mode, final long rate, final long rateInterval, + final RateIntervalUnit rateIntervalUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.trySetRateAsync(mode, rate, rateInterval, rateIntervalUnit); + } + }); + } + + @Override + public Publisher tryAcquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(); + } + }); + } + + @Override + public Publisher tryAcquire(final long permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits); + } + }); + } + + @Override + public Publisher acquire() { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(); + } + }); + } + + @Override + public Publisher acquire(final long permits) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.acquireAsync(permits); + } + }); + } + + @Override + public Publisher tryAcquire(final long timeout, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(timeout, unit); + } + }); + } + + @Override + public Publisher tryAcquire(final long permits, final long timeout, final TimeUnit unit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.tryAcquireAsync(permits, timeout, unit); + } + }); + } + +} From 4a7f63051c43239fdf8d5ab414a420cfe2049d08 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Jul 2018 14:16:43 +0300 Subject: [PATCH 3/5] Added reactive version of FairLock. #1554 --- .../src/main/java/org/redisson/RedissonFairLock.java | 6 +++--- .../src/main/java/org/redisson/RedissonReactive.java | 5 +++++ .../java/org/redisson/api/RedissonReactiveClient.java | 10 ++++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index dd040f656..747e50f61 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -24,7 +24,7 @@ import org.redisson.api.RLock; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; -import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.pubsub.LockPubSub; /** @@ -40,11 +40,11 @@ import org.redisson.pubsub.LockPubSub; public class RedissonFairLock extends RedissonLock implements RLock { private final long threadWaitTime = 5000; - private final CommandExecutor commandExecutor; + private final CommandAsyncExecutor commandExecutor; private final String threadsQueueName; private final String timeoutSetName; - protected RedissonFairLock(CommandExecutor commandExecutor, String name) { + protected RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; threadsQueueName = prefixName("redisson_lock_queue", name); diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index e0a60598d..b589ff3de 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -120,6 +120,11 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getReferenceCodecProvider(); } + @Override + public RLockReactive getFairLock(String name) { + return new RedissonLockReactive(commandExecutor, name, new RedissonFairLock(commandExecutor, name)); + } + @Override public RRateLimiterReactive getRateLimiter(String name) { return new RedissonRateLimiterReactive(commandExecutor, name); diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index a3862ae51..a9023ca2e 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -63,6 +63,16 @@ public interface RedissonReactiveClient { */ RReadWriteLockReactive getReadWriteLock(String name); + /** + * Returns lock instance by name. + *

+ * Implements a fair locking so it guarantees an acquire order by threads. + * + * @param name - name of object + * @return Lock object + */ + RLockReactive getFairLock(String name); + /** * Returns lock instance by name. *

From 14aad8e0e0fb56efd0c037ff2c3d14ff35c4e879 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Jul 2018 15:10:16 +0300 Subject: [PATCH 4/5] Added RGeoReactive. #1555 --- .../java/org/redisson/RedissonReactive.java | 12 + .../java/org/redisson/api/RGeoReactive.java | 549 ++++++++++++++++++ .../redisson/api/RedissonReactiveClient.java | 20 + .../reactive/RedissonGeoReactive.java | 431 ++++++++++++++ 4 files changed, 1012 insertions(+) create mode 100644 redisson/src/main/java/org/redisson/api/RGeoReactive.java create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonGeoReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index b589ff3de..da2f99049 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -31,6 +31,7 @@ import org.redisson.api.RBitSetReactive; import org.redisson.api.RBlockingQueueReactive; import org.redisson.api.RBucketReactive; import org.redisson.api.RDequeReactive; +import org.redisson.api.RGeoReactive; import org.redisson.api.RHyperLogLogReactive; import org.redisson.api.RKeys; import org.redisson.api.RKeysReactive; @@ -70,6 +71,7 @@ import org.redisson.reactive.RedissonBitSetReactive; import org.redisson.reactive.RedissonBlockingQueueReactive; import org.redisson.reactive.RedissonBucketReactive; import org.redisson.reactive.RedissonDequeReactive; +import org.redisson.reactive.RedissonGeoReactive; import org.redisson.reactive.RedissonHyperLogLogReactive; import org.redisson.reactive.RedissonKeysReactive; import org.redisson.reactive.RedissonLexSortedSetReactive; @@ -120,6 +122,16 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getReferenceCodecProvider(); } + @Override + public RGeoReactive getGeo(String name) { + return new RedissonGeoReactive(commandExecutor, name); + } + + @Override + public RGeoReactive getGeo(String name, Codec codec) { + return new RedissonGeoReactive(codec, commandExecutor, name); + } + @Override public RLockReactive getFairLock(String name) { return new RedissonLockReactive(commandExecutor, name, new RedissonFairLock(commandExecutor, name)); diff --git a/redisson/src/main/java/org/redisson/api/RGeoReactive.java b/redisson/src/main/java/org/redisson/api/RGeoReactive.java new file mode 100644 index 000000000..01f6a6aaa --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RGeoReactive.java @@ -0,0 +1,549 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Publisher; + +/** + * Geospatial items holder. + * + * @author Nikita Koksharov + * + * @param type of value + */ +public interface RGeoReactive extends RScoredSortedSetReactive { + + /** + * Adds geospatial member. + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param member - object itself + * @return number of elements added to the sorted set, + * not including elements already existing for which + * the score was updated + */ + Publisher add(double longitude, double latitude, V member); + + /** + * Adds geospatial members. + * + * @param entries - objects + * @return number of elements added to the sorted set, + * not including elements already existing for which + * the score was updated + */ + Publisher add(GeoEntry... entries); + + /** + * Returns distance between members in GeoUnit units. + * + * @param firstMember - first object + * @param secondMember - second object + * @param geoUnit - geo unit + * @return distance + */ + Publisher dist(V firstMember, V secondMember, GeoUnit geoUnit); + + /** + * Returns 11 characters Geohash string mapped by defined member. + * + * @param members - objects + * @return hash mapped by object + */ + Publisher> hash(V... members); + + /** + * Returns geo-position mapped by defined member. + * + * @param members - objects + * @return geo position mapped by object + */ + Publisher> pos(V... members); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units. + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return list of objects + */ + Publisher> radius(double longitude, double latitude, double radius, GeoUnit geoUnit); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units and limited by count + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return list of objects + */ + Publisher> radius(double longitude, double latitude, double radius, GeoUnit geoUnit, int count); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - order of result + * @return list of objects + */ + Publisher> radius(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * and limited by count + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - order of result + * @param count - result limit + * @return list of objects + */ + Publisher> radius(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Returns the distance mapped by member, distance between member and the location. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units. + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return distance mapped by object + */ + Publisher> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit); + + /** + * Returns the distance mapped by member, distance between member and the location. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units and limited by count. + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return distance mapped by object + */ + Publisher> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit, int count); + + /** + * Returns the distance mapped by member, distance between member and the location. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - order of result + * @return distance mapped by object + */ + Publisher> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder); + + /** + * Returns the distance mapped by member, distance between member and the location. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * and limited by count + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - order of result + * @param count - result limit + * @return distance mapped by object + */ + Publisher> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units. + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units and limited by count + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit, int count); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * and limited by count + * + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @param count - result limit + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units. + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return list of objects + */ + Publisher> radius(V member, double radius, GeoUnit geoUnit); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units and limited by count + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return list of objects + */ + Publisher> radius(V member, double radius, GeoUnit geoUnit, int count); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @return list of objects + */ + Publisher> radius(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder); + + /** + * Returns the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @param count - result limit + * @return list of objects + */ + Publisher> radius(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Returns the distance mapped by member, distance between member and the defined member location. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units. + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return distance mapped by object + */ + Publisher> radiusWithDistance(V member, double radius, GeoUnit geoUnit); + + /** + * Returns the distance mapped by member, distance between member and the defined member location. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units and limited by count + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return distance mapped by object + */ + Publisher> radiusWithDistance(V member, double radius, GeoUnit geoUnit, int count); + + /** + * Returns the distance mapped by member, distance between member and the defined member location. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo + * @return distance mapped by object + */ + Publisher> radiusWithDistance(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder); + + /** + * Returns the distance mapped by member, distance between member and the defined member location. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * and limited by count + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo + * @param count - result limit + * @return distance mapped by object + */ + Publisher> radiusWithDistance(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units. + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(V member, double radius, GeoUnit geoUnit); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units and limited by count + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(V member, double radius, GeoUnit geoUnit, int count); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder); + + /** + * Returns the geo-position mapped by member. + * Members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * and limited by count + * + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @param count - result limit + * @return geo position mapped by object + */ + Publisher> radiusWithPosition(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Finds the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units. + * Store result to destName. + * + * @param destName - Geo object destination + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return length of result + */ + Publisher radiusStoreTo(String destName, double longitude, double latitude, double radius, GeoUnit geoUnit); + + /** + * Finds the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units and limited by count + * Store result to destName. + * + * @param destName - Geo object destination + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return length of result + */ + Publisher radiusStoreTo(String destName, double longitude, double latitude, double radius, GeoUnit geoUnit, int count); + + /** + * Finds the members of a sorted set, which are within the + * borders of the area specified with the center location + * and the maximum distance from the center (the radius) + * in GeoUnit units with GeoOrder + * and limited by count + * Store result to destName. + * + * @param destName - Geo object destination + * @param longitude - longitude of object + * @param latitude - latitude of object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - order of result + * @param count - result limit + * @return length of result + */ + Publisher radiusStoreTo(String destName, double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + + /** + * Finds the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units. + * Store result to destName. + * + * @param destName - Geo object destination + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @return length of result + */ + Publisher radiusStoreTo(String destName, V member, double radius, GeoUnit geoUnit); + + /** + * Finds the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units and limited by count + * Store result to destName. + * + * @param destName - Geo object destination + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param count - result limit + * @return length of result + */ + Publisher radiusStoreTo(String destName, V member, double radius, GeoUnit geoUnit, int count); + + /** + * Finds the members of a sorted set, which are within the + * borders of the area specified with the defined member location + * and the maximum distance from the defined member location (the radius) + * in GeoUnit units with GeoOrder + * Store result to destName. + * + * @param destName - Geo object destination + * @param member - object + * @param radius - radius in geo units + * @param geoUnit - geo unit + * @param geoOrder - geo order + * @param count - result limit + * @return length of result + */ + Publisher radiusStoreTo(String destName, V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java index a9023ca2e..181953612 100644 --- a/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonReactiveClient.java @@ -30,6 +30,26 @@ import org.redisson.config.Config; */ public interface RedissonReactiveClient { + /** + * Returns geospatial items holder instance by name. + * + * @param type of value + * @param name - name of object + * @return Geo object + */ + RGeoReactive getGeo(String name); + + /** + * Returns geospatial items holder instance by name + * using provided codec for geospatial members. + * + * @param type of value + * @param name - name of object + * @param codec - codec for value + * @return Geo object + */ + RGeoReactive getGeo(String name, Codec codec); + /** * Returns rate limiter instance by name * diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonGeoReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonGeoReactive.java new file mode 100644 index 000000000..f8de937f4 --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonGeoReactive.java @@ -0,0 +1,431 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.List; +import java.util.Map; + +import org.reactivestreams.Publisher; +import org.redisson.RedissonGeo; +import org.redisson.api.GeoEntry; +import org.redisson.api.GeoOrder; +import org.redisson.api.GeoPosition; +import org.redisson.api.GeoUnit; +import org.redisson.api.RFuture; +import org.redisson.api.RGeoAsync; +import org.redisson.api.RGeoReactive; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.Supplier; + +/** + * + * @author Nikita Koksharov + * + * @param value type + */ +public class RedissonGeoReactive extends RedissonScoredSortedSetReactive implements RGeoReactive { + + private final RGeoAsync instance; + + public RedissonGeoReactive(CommandReactiveExecutor commandExecutor, String name) { + this(commandExecutor, name, new RedissonGeo(commandExecutor, name, null)); + } + + public RedissonGeoReactive(CommandReactiveExecutor commandExecutor, String name, RGeoAsync instance) { + super(commandExecutor, name, instance); + this.instance = instance; + } + + public RedissonGeoReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + this(codec, commandExecutor, name, new RedissonGeo(codec, commandExecutor, name, null)); + } + + public RedissonGeoReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RGeoAsync instance) { + super(codec, commandExecutor, name, instance); + this.instance = instance; + } + + @Override + public Publisher add(final double longitude, final double latitude, final V member) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(longitude, latitude, member); + } + }); + } + + @Override + public Publisher add(final GeoEntry... entries) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.addAsync(entries); + } + }); + } + + @Override + public Publisher dist(final V firstMember, final V secondMember, final GeoUnit geoUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.distAsync(firstMember, secondMember, geoUnit); + } + }); + } + + @Override + public Publisher> hash(final V... members) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.hashAsync(members); + } + }); + } + + @Override + public Publisher> pos(final V... members) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.posAsync(members); + } + }); + } + + @Override + public Publisher> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(longitude, latitude, radius, geoUnit); + } + }); + } + + @Override + public Publisher> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(longitude, latitude, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit, + final GeoOrder geoOrder) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(longitude, latitude, radius, geoUnit, geoOrder); + } + }); + } + + @Override + public Publisher> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit, + final GeoOrder geoOrder, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(longitude, latitude, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final GeoOrder geoOrder) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit, geoOrder); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final GeoOrder geoOrder) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit, geoOrder); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher> radius(final V member, final double radius, final GeoUnit geoUnit) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(member, radius, geoUnit); + } + }); + } + + @Override + public Publisher> radius(final V member, final double radius, final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(member, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher> radius(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(member, radius, geoUnit, geoOrder); + } + }); + } + + @Override + public Publisher> radius(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusAsync(member, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(member, radius, geoUnit); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(member, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(member, radius, geoUnit, geoOrder); + } + }); + } + + @Override + public Publisher> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder, + final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithDistanceAsync(member, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(member, radius, geoUnit); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(member, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit, + final GeoOrder geoOrder) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(member, radius, geoUnit, geoOrder); + } + }); + } + + @Override + public Publisher> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit, + final GeoOrder geoOrder, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.radiusWithPositionAsync(member, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher radiusStoreTo(final String destName, final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.radiusStoreToAsync(destName, longitude, latitude, radius, geoUnit); + } + }); + } + + @Override + public Publisher radiusStoreTo(final String destName, final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.radiusStoreToAsync(destName, longitude, latitude, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher radiusStoreTo(final String destName, final double longitude, final double latitude, final double radius, + final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.radiusStoreToAsync(destName, longitude, latitude, radius, geoUnit, geoOrder, count); + } + }); + } + + @Override + public Publisher radiusStoreTo(final String destName, final V member, final double radius, final GeoUnit geoUnit) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.radiusStoreToAsync(destName, member, radius, geoUnit); + } + }); + } + + @Override + public Publisher radiusStoreTo(final String destName, final V member, final double radius, final GeoUnit geoUnit, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.radiusStoreToAsync(destName, member, radius, geoUnit, count); + } + }); + } + + @Override + public Publisher radiusStoreTo(final String destName, final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder, + final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.radiusStoreToAsync(destName, member, radius, geoUnit, geoOrder, count); + } + }); + } + + +} From 11c3e8770eecab395e0ca47b158011083d1acd22 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Jul 2018 16:07:03 +0300 Subject: [PATCH 5/5] RScoredSortedSetReactive, RSetReactive, RListReactive implements RSortableReactive interface. #1556 --- .../java/org/redisson/api/RListReactive.java | 2 +- .../api/RScoredSortedSetReactive.java | 3 +- .../java/org/redisson/api/RSetReactive.java | 3 +- .../org/redisson/api/RSortableReactive.java | 159 ++++++++++++++++++ .../reactive/RedissonListReactive.java | 123 ++++++++++++++ .../RedissonScoredSortedSetReactive.java | 125 ++++++++++++++ .../reactive/RedissonSetReactive.java | 123 ++++++++++++++ 7 files changed, 534 insertions(+), 4 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/RSortableReactive.java diff --git a/redisson/src/main/java/org/redisson/api/RListReactive.java b/redisson/src/main/java/org/redisson/api/RListReactive.java index c27d3c91b..a94c56434 100644 --- a/redisson/src/main/java/org/redisson/api/RListReactive.java +++ b/redisson/src/main/java/org/redisson/api/RListReactive.java @@ -28,7 +28,7 @@ import org.reactivestreams.Publisher; * @param the type of elements held in this collection */ // TODO add sublist support -public interface RListReactive extends RCollectionReactive { +public interface RListReactive extends RCollectionReactive, RSortableReactive> { /** * Loads elements by specified indexes diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index 98e538516..0fea838c0 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; @@ -29,7 +30,7 @@ import org.redisson.client.protocol.ScoredEntry; * * @param value type */ -public interface RScoredSortedSetReactive extends RExpirableReactive { +public interface RScoredSortedSetReactive extends RExpirableReactive, RSortableReactive> { /** * Removes and returns first available tail element of any sorted set, diff --git a/redisson/src/main/java/org/redisson/api/RSetReactive.java b/redisson/src/main/java/org/redisson/api/RSetReactive.java index c04dbd404..652342b3b 100644 --- a/redisson/src/main/java/org/redisson/api/RSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.api; -import java.util.Iterator; import java.util.Set; import org.reactivestreams.Publisher; @@ -27,7 +26,7 @@ import org.reactivestreams.Publisher; * * @param value */ -public interface RSetReactive extends RCollectionReactive { +public interface RSetReactive extends RCollectionReactive, RSortableReactive> { /** * Returns an iterator over elements in this set. diff --git a/redisson/src/main/java/org/redisson/api/RSortableReactive.java b/redisson/src/main/java/org/redisson/api/RSortableReactive.java new file mode 100644 index 000000000..10e83af68 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RSortableReactive.java @@ -0,0 +1,159 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.Collection; +import java.util.List; + +import org.reactivestreams.Publisher; + +/** + * + * @author Nikita Koksharov + * + * @param object type + */ +public interface RSortableReactive { + + /** + * Read data in sorted view + * + * @param order for sorted data + * @return sorted collection + */ + Publisher readSorted(SortOrder order); + + /** + * Read data in sorted view + * + * @param order for sorted data + * @param offset of sorted data + * @param count of sorted data + * @return sorted collection + */ + Publisher readSorted(SortOrder order, int offset, int count); + + /** + * Read data in sorted view + * + * @param byPattern that is used to generate the keys that are used for sorting + * @param order for sorted data + * @return sorted collection + */ + Publisher readSorted(String byPattern, SortOrder order); + + /** + * Read data in sorted view + * + * @param byPattern that is used to generate the keys that are used for sorting + * @param order for sorted data + * @param offset of sorted data + * @param count of sorted data + * @return sorted collection + */ + Publisher readSorted(String byPattern, SortOrder order, int offset, int count); + + /** + * Read data in sorted view + * + * @param object type + * @param byPattern that is used to generate the keys that are used for sorting + * @param getPatterns that is used to load values by keys in sorted view + * @param order for sorted data + * @return sorted collection + */ + Publisher> readSorted(String byPattern, List getPatterns, SortOrder order); + + /** + * Read data in sorted view + * + * @param object type + * @param byPattern that is used to generate the keys that are used for sorting + * @param getPatterns that is used to load values by keys in sorted view + * @param order for sorted data + * @param offset of sorted data + * @param count of sorted data + * @return sorted collection + */ + Publisher> readSorted(String byPattern, List getPatterns, SortOrder order, int offset, int count); + + /** + * Sort data and store to destName list + * + * @param destName list object destination + * @param order for sorted data + * @return length of sorted data + */ + Publisher sortTo(String destName, SortOrder order); + + /** + * Sort data and store to destName list + * + * @param destName list object destination + * @param order for sorted data + * @param offset of sorted data + * @param count of sorted data + * @return length of sorted data + */ + Publisher sortTo(String destName, SortOrder order, int offset, int count); + + /** + * Sort data and store to destName list + * + * @param destName list object destination + * @param byPattern that is used to generate the keys that are used for sorting + * @param order for sorted data + * @return length of sorted data + */ + Publisher sortTo(String destName, String byPattern, SortOrder order); + + /** + * Sort data and store to destName list + * + * @param destName list object destination + * @param byPattern that is used to generate the keys that are used for sorting + * @param order for sorted data + * @param offset of sorted data + * @param count of sorted data + * @return length of sorted data + */ + Publisher sortTo(String destName, String byPattern, SortOrder order, int offset, int count); + + /** + * Sort data and store to destName list + * + * @param destName list object destination + * @param byPattern that is used to generate the keys that are used for sorting + * @param getPatterns that is used to load values by keys in sorted view + * @param order for sorted data + * @return length of sorted data + */ + Publisher sortTo(String destName, String byPattern, List getPatterns, SortOrder order); + + /** + * Sort data and store to destName list + * + * @param destName list object destination + * @param byPattern that is used to generate the keys that are used for sorting + * @param getPatterns that is used to load values by keys in sorted view + * @param order for sorted data + * @param offset of sorted data + * @param count of sorted data + * @return length of sorted data + */ + Publisher sortTo(String destName, String byPattern, List getPatterns, SortOrder order, int offset, int count); + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index f8c43d953..1080a0b14 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -31,6 +31,7 @@ import org.redisson.RedissonList; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; import org.redisson.api.RListReactive; +import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.LongReplayConvertor; @@ -427,4 +428,126 @@ public class RedissonListReactive extends RedissonExpirableReactive implement return hash; } + @Override + public Publisher> readSorted(final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(order); + } + }); + } + + @Override + public Publisher> readSorted(final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(order, offset, count); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, order); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, order, offset, count); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final List getPatterns, final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, getPatterns, order); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final List getPatterns, final SortOrder order, + final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, getPatterns, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final List getPatterns, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, getPatterns, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final List getPatterns, final SortOrder order, + final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, getPatterns, order, offset, count); + } + }); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 6039168c2..07de78611 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -16,7 +16,9 @@ package org.redisson.reactive; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; @@ -25,6 +27,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScoredSortedSetReactive; +import org.redisson.api.SortOrder; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.ScoredEntry; @@ -630,4 +633,126 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv }); } + @Override + public Publisher> readSorted(final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(order); + } + }); + } + + @Override + public Publisher> readSorted(final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(order, offset, count); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, order); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, order, offset, count); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final List getPatterns, final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, getPatterns, order); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final List getPatterns, final SortOrder order, + final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, getPatterns, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final List getPatterns, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, getPatterns, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final List getPatterns, final SortOrder order, + final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, getPatterns, order, offset, count); + } + }); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 823b4261f..3169260a0 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -26,6 +26,7 @@ import org.redisson.RedissonSet; import org.redisson.api.RFuture; import org.redisson.api.RSetAsync; import org.redisson.api.RSetReactive; +import org.redisson.api.SortOrder; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; @@ -280,4 +281,126 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements return iterator(null, 10); } + @Override + public Publisher> readSorted(final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(order); + } + }); + } + + @Override + public Publisher> readSorted(final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(order, offset, count); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, order); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, order, offset, count); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final List getPatterns, final SortOrder order) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, getPatterns, order); + } + }); + } + + @Override + public Publisher> readSorted(final String byPattern, final List getPatterns, final SortOrder order, + final int offset, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return instance.readSortAsync(byPattern, getPatterns, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final SortOrder order, final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, order, offset, count); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final List getPatterns, final SortOrder order) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, getPatterns, order); + } + }); + } + + @Override + public Publisher sortTo(final String destName, final String byPattern, final List getPatterns, final SortOrder order, + final int offset, final int count) { + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sortToAsync(destName, byPattern, getPatterns, order, offset, count); + } + }); + } + }