Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/reactive/RedissonObjectReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java
pull/1821/head
Nikita 7 years ago
commit 776d89dc3e

@ -24,7 +24,7 @@ import org.redisson.api.RLock;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.LockPubSub;
/**
@ -40,11 +40,11 @@ import org.redisson.pubsub.LockPubSub;
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime = 5000;
private final CommandExecutor commandExecutor;
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
protected RedissonFairLock(CommandExecutor commandExecutor, String name) {
protected RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
threadsQueueName = prefixName("redisson_lock_queue", name);

@ -16,7 +16,6 @@
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
@ -32,7 +31,7 @@ import org.redisson.api.RBitSetReactive;
import org.redisson.api.RBlockingQueueReactive;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RDequeReactive;
import org.redisson.api.RFuture;
import org.redisson.api.RGeoReactive;
import org.redisson.api.RHyperLogLogReactive;
import org.redisson.api.RKeys;
import org.redisson.api.RKeysReactive;
@ -45,6 +44,7 @@ import org.redisson.api.RMapReactive;
import org.redisson.api.RPatternTopicReactive;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
@ -57,7 +57,6 @@ import org.redisson.api.RTransactionReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandReactiveService;
import org.redisson.config.Config;
@ -72,6 +71,7 @@ import org.redisson.reactive.RedissonBitSetReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonBucketReactive;
import org.redisson.reactive.RedissonDequeReactive;
import org.redisson.reactive.RedissonGeoReactive;
import org.redisson.reactive.RedissonHyperLogLogReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
@ -83,6 +83,7 @@ import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonPatternTopicReactive;
import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive;
import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonRateLimiterReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonScriptReactive;
@ -121,6 +122,26 @@ public class RedissonReactive implements RedissonReactiveClient {
codecProvider = config.getReferenceCodecProvider();
}
@Override
public <V> RGeoReactive<V> getGeo(String name) {
return new RedissonGeoReactive<V>(commandExecutor, name);
}
@Override
public <V> RGeoReactive<V> getGeo(String name, Codec codec) {
return new RedissonGeoReactive<V>(codec, commandExecutor, name);
}
@Override
public RLockReactive getFairLock(String name) {
return new RedissonLockReactive(commandExecutor, name, new RedissonFairLock(commandExecutor, name));
}
@Override
public RRateLimiterReactive getRateLimiter(String name) {
return new RedissonRateLimiterReactive(commandExecutor, name);
}
@Override
public RSemaphoreReactive getSemaphore(String name) {
return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub);

@ -393,12 +393,17 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
}
private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(client, startPos, pattern, count);
return get(f);
}
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "COUNT", count);
return get(f);
return f;
}
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "MATCH", pattern, "COUNT", count);
return get(f);
return f;
}
@Override

@ -0,0 +1,549 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
import java.util.Map;
import org.reactivestreams.Publisher;
/**
* Geospatial items holder.
*
* @author Nikita Koksharov
*
* @param <V> type of value
*/
public interface RGeoReactive<V> extends RScoredSortedSetReactive<V> {
/**
* Adds geospatial member.
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param member - object itself
* @return number of elements added to the sorted set,
* not including elements already existing for which
* the score was updated
*/
Publisher<Long> add(double longitude, double latitude, V member);
/**
* Adds geospatial members.
*
* @param entries - objects
* @return number of elements added to the sorted set,
* not including elements already existing for which
* the score was updated
*/
Publisher<Long> add(GeoEntry... entries);
/**
* Returns distance between members in <code>GeoUnit</code> units.
*
* @param firstMember - first object
* @param secondMember - second object
* @param geoUnit - geo unit
* @return distance
*/
Publisher<Double> dist(V firstMember, V secondMember, GeoUnit geoUnit);
/**
* Returns 11 characters Geohash string mapped by defined member.
*
* @param members - objects
* @return hash mapped by object
*/
Publisher<Map<V, String>> hash(V... members);
/**
* Returns geo-position mapped by defined member.
*
* @param members - objects
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> pos(V... members);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return list of objects
*/
Publisher<List<V>> radius(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units and limited by count
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return list of objects
*/
Publisher<List<V>> radius(double longitude, double latitude, double radius, GeoUnit geoUnit, int count);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - order of result
* @return list of objects
*/
Publisher<List<V>> radius(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* and limited by count
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - order of result
* @param count - result limit
* @return list of objects
*/
Publisher<List<V>> radius(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Returns the distance mapped by member, distance between member and the location.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the distance mapped by member, distance between member and the location.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units and limited by count.
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit, int count);
/**
* Returns the distance mapped by member, distance between member and the location.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - order of result
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder);
/**
* Returns the distance mapped by member, distance between member and the location.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* and limited by count
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - order of result
* @param count - result limit
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units and limited by count
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit, int count);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* and limited by count
*
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @param count - result limit
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return list of objects
*/
Publisher<List<V>> radius(V member, double radius, GeoUnit geoUnit);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units and limited by count
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return list of objects
*/
Publisher<List<V>> radius(V member, double radius, GeoUnit geoUnit, int count);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @return list of objects
*/
Publisher<List<V>> radius(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder);
/**
* Returns the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @param count - result limit
* @return list of objects
*/
Publisher<List<V>> radius(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Returns the distance mapped by member, distance between member and the defined member location.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(V member, double radius, GeoUnit geoUnit);
/**
* Returns the distance mapped by member, distance between member and the defined member location.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units and limited by count
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(V member, double radius, GeoUnit geoUnit, int count);
/**
* Returns the distance mapped by member, distance between member and the defined member location.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder);
/**
* Returns the distance mapped by member, distance between member and the defined member location.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* and limited by count
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo
* @param count - result limit
* @return distance mapped by object
*/
Publisher<Map<V, Double>> radiusWithDistance(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(V member, double radius, GeoUnit geoUnit);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units and limited by count
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(V member, double radius, GeoUnit geoUnit, int count);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder);
/**
* Returns the geo-position mapped by member.
* Members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* and limited by count
*
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @param count - result limit
* @return geo position mapped by object
*/
Publisher<Map<V, GeoPosition>> radiusWithPosition(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Finds the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units.
* Store result to <code>destName</code>.
*
* @param destName - Geo object destination
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return length of result
*/
Publisher<Long> radiusStoreTo(String destName, double longitude, double latitude, double radius, GeoUnit geoUnit);
/**
* Finds the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units and limited by count
* Store result to <code>destName</code>.
*
* @param destName - Geo object destination
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return length of result
*/
Publisher<Long> radiusStoreTo(String destName, double longitude, double latitude, double radius, GeoUnit geoUnit, int count);
/**
* Finds the members of a sorted set, which are within the
* borders of the area specified with the center location
* and the maximum distance from the center (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* and limited by count
* Store result to <code>destName</code>.
*
* @param destName - Geo object destination
* @param longitude - longitude of object
* @param latitude - latitude of object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - order of result
* @param count - result limit
* @return length of result
*/
Publisher<Long> radiusStoreTo(String destName, double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
/**
* Finds the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units.
* Store result to <code>destName</code>.
*
* @param destName - Geo object destination
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @return length of result
*/
Publisher<Long> radiusStoreTo(String destName, V member, double radius, GeoUnit geoUnit);
/**
* Finds the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units and limited by count
* Store result to <code>destName</code>.
*
* @param destName - Geo object destination
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param count - result limit
* @return length of result
*/
Publisher<Long> radiusStoreTo(String destName, V member, double radius, GeoUnit geoUnit, int count);
/**
* Finds the members of a sorted set, which are within the
* borders of the area specified with the defined member location
* and the maximum distance from the defined member location (the radius)
* in <code>GeoUnit</code> units with <code>GeoOrder</code>
* Store result to <code>destName</code>.
*
* @param destName - Geo object destination
* @param member - object
* @param radius - radius in geo units
* @param geoUnit - geo unit
* @param geoOrder - geo order
* @param count - result limit
* @return length of result
*/
Publisher<Long> radiusStoreTo(String destName, V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count);
}

@ -28,7 +28,7 @@ import org.reactivestreams.Publisher;
* @param <V> the type of elements held in this collection
*/
// TODO add sublist support
public interface RListReactive<V> extends RCollectionReactive<V> {
public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReactive<List<V>> {
/**
* Loads elements by specified <code>indexes</code>

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
@ -29,6 +31,78 @@ public interface RObjectReactive {
String getName();
Codec getCodec();
/**
* Restores object using its state returned by {@link #dump()} method.
*
* @param state - state of object
* @return void
*/
Publisher<Void> restore(byte[] state);
/**
* Restores object using its state returned by {@link #dump()} method and set time to live for it.
*
* @param state - state of object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
* @return void
*/
Publisher<Void> restore(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Restores and replaces object if it already exists.
*
* @param state - state of the object
* @return void
*/
Publisher<Void> restoreAndReplace(byte[] state);
/**
* Restores and replaces object if it already exists and set time to live for it.
*
* @param state - state of the object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
* @return void
*/
Publisher<Void> restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Returns dump of object
*
* @return dump
*/
Publisher<byte[]> dump();
/**
* Update the last access time of an object.
*
* @return <code>true</code> if object was touched else <code>false</code>
*/
Publisher<Boolean> touch();
/**
* Delete the objects.
* Actual removal will happen later asynchronously.
* <p>
* Requires Redis 4.0+
*
* @return <code>true</code> if it was exist and deleted else <code>false</code>
*/
Publisher<Boolean> unlink();
/**
* Copy object from source Redis instance to destination Redis instance
*
* @param host - destination host
* @param port - destination port
* @param database - destination database
* @param timeout - maximum idle time in any moment of the communication with the destination instance in milliseconds
* @return void
*/
Publisher<Void> copy(String host, int port, int database, long timeout);
/**
* Transfer a object from a source Redis instance to a destination Redis instance
* in mode

@ -0,0 +1,144 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RRateLimiterReactive extends RObjectReactive {
/**
* Initializes RateLimiter's state and stores config to Redis server.
*
* @param mode - rate mode
* @param rate - rate
* @param rateInterval - rate time interval
* @param rateIntervalUnit - rate time interval unit
* @return
*/
Publisher<Boolean> trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code false}.
*
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
Publisher<Boolean> tryAcquire();
/**
* Acquires the given number of <code>permits</code> only if all are available at the
* time of invocation.
*
* <p>Acquires a permits, if all are available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by given number of permits.
*
* <p>If no permits are available then this method will return
* immediately with the value {@code false}.
*
* @param permits the number of permits to acquire
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
Publisher<Boolean> tryAcquire(long permits);
/**
* Acquires a permit from this RateLimiter, blocking until one is available.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
*/
Publisher<Void> acquire();
/**
* Acquires a specified <code>permits</code> from this RateLimiter,
* blocking until one is available.
*
* <p>Acquires the given number of permits, if they are available
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* @param permits
*/
Publisher<Void> acquire(long permits);
/**
* Acquires a permit from this RateLimiter, if one becomes available
* within the given waiting time.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* specified waiting time elapses.
*
* <p>If a permit is acquired then the value {@code true} is returned.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if a permit was acquired and {@code false}
* if the waiting time elapsed before a permit was acquired
*/
Publisher<Boolean> tryAcquire(long timeout, TimeUnit unit);
/**
* Acquires the given number of <code>permits</code> only if all are available
* within the given waiting time.
*
* <p>Acquires the given number of permits, if all are available and returns immediately,
* with the value {@code true}, reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* the specified waiting time elapses.
*
* <p>If a permits is acquired then the value {@code true} is returned.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param permits amount
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if a permit was acquired and {@code false}
* if the waiting time elapsed before a permit was acquired
*/
Publisher<Boolean> tryAcquire(long permits, long timeout, TimeUnit unit);
}

@ -17,33 +17,236 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.ScoredEntry;
public interface RScoredSortedSetReactive<V> extends RExpirableReactive {
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortableReactive<Set<V>> {
/**
* Removes and returns first available tail element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty
*/
Publisher<V> pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element, or {@code null} if all sorted sets are empty
*
*/
Publisher<V> pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element,
* or {@code null} if this sorted set is empty
*/
Publisher<V> pollFirst(long timeout, TimeUnit unit);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element or {@code null} if this sorted set is empty
*/
Publisher<V> pollLast(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements or {@code null} if this sorted set is empty.
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this sorted set is empty
*/
Publisher<Collection<V>> pollFirst(int count);
/**
* Removes and returns the tail elements or {@code null} if this sorted set is empty.
*
* @param count - elements amount
* @return the tail element or {@code null} if this sorted set is empty
*/
Publisher<Collection<V>> pollLast(int count);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*
* @return the head element,
* or {@code null} if this sorted set is empty
*/
Publisher<V> pollFirst();
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
Publisher<V> pollLast();
Publisher<V> iterator();
/**
* Returns the head element or {@code null} if this sorted set is empty.
*
* @return the head element or {@code null} if this sorted set is empty
*/
Publisher<V> first();
/**
* Returns the tail element or {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
Publisher<V> last();
/**
* Returns score of the head element or returns {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
Publisher<Double> firstScore();
/**
* Returns score of the tail element or returns {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
Publisher<Double> lastScore();
/**
* Returns an iterator over elements in this set.
* If <code>pattern</code> is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @return iterator
*/
Publisher<V> iterator(String pattern);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Publisher<V> iterator(int count);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Publisher<V> iterator(String pattern, int count);
Publisher<V> iterator();
Publisher<Integer> removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Publisher<Integer> removeRangeByRank(int startIndex, int endIndex);
/**
* Returns rank of value, with the scores ordered from low to high.
*
* @param o - object
* @return rank or <code>null</code> if value does not exist
*/
Publisher<Integer> rank(V o);
/**
* Returns rank of value, with the scores ordered from high to low.
*
* @param o - object
* @return rank or <code>null</code> if value does not exist
*/
Publisher<Integer> revRank(V o);
Publisher<Double> getScore(V o);
/**
* Adds element to this set, overrides previous score if it has been already added.
*
* @param score - object score
* @param object - object itself
* @return <code>true</code> if element has added and <code>false</code> if not.
*/
Publisher<Boolean> add(double score, V object);
Publisher<Long> addAll(Map<V, Double> objects);
/**
* Adds element to this set, overrides previous score if it has been already added.
* Finally return the rank of the item
*
* @param score - object score
* @param object - object itself
* @return rank
*/
Publisher<Integer> addAndGetRank(double score, V object);
/**
* Adds element to this set, overrides previous score if it has been already added.
* Finally return the reverse rank of the item
*
* @param score - object score
* @param object - object itself
* @return reverse rank
*/
Publisher<Integer> addAndGetRevRank(double score, V object);
/**
* Adds element to this set only if has not been added before.
* <p>
* Requires <b>Redis 3.0.2 and higher.</b>
*
* @param score - object score
* @param object - object itself
* @return <code>true</code> if element has added and <code>false</code> if not.
*/
Publisher<Boolean> tryAdd(double score, V object);
Publisher<Boolean> remove(V object);
Publisher<Integer> size();
@ -58,6 +261,24 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive {
Publisher<Double> addScore(V object, Number value);
/**
* Adds score to element and returns its reverse rank
*
* @param object - object itself
* @param value - object score
* @return reverse rank
*/
Publisher<Integer> addScoreAndGetRevRank(V object, Number value);
/**
* Adds score to element and returns its rank
*
* @param object - object itself
* @param value - object score
* @return rank
*/
Publisher<Integer> addScoreAndGetRank(V object, Number value);
Publisher<Collection<V>> valueRange(int startIndex, int endIndex);
Publisher<Collection<ScoredEntry<V>>> entryRange(int startIndex, int endIndex);

@ -15,7 +15,6 @@
*/
package org.redisson.api;
import java.util.Iterator;
import java.util.Set;
import org.reactivestreams.Publisher;
@ -27,7 +26,7 @@ import org.reactivestreams.Publisher;
*
* @param <V> value
*/
public interface RSetReactive<V> extends RCollectionReactive<V> {
public interface RSetReactive<V> extends RCollectionReactive<V>, RSortableReactive<Set<V>> {
/**
* Returns an iterator over elements in this set.

@ -0,0 +1,159 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
* @param <V> object type
*/
public interface RSortableReactive<V> {
/**
* Read data in sorted view
*
* @param order for sorted data
* @return sorted collection
*/
Publisher<V> readSorted(SortOrder order);
/**
* Read data in sorted view
*
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return sorted collection
*/
Publisher<V> readSorted(SortOrder order, int offset, int count);
/**
* Read data in sorted view
*
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @return sorted collection
*/
Publisher<V> readSorted(String byPattern, SortOrder order);
/**
* Read data in sorted view
*
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return sorted collection
*/
Publisher<V> readSorted(String byPattern, SortOrder order, int offset, int count);
/**
* Read data in sorted view
*
* @param <T> object type
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @return sorted collection
*/
<T> Publisher<Collection<T>> readSorted(String byPattern, List<String> getPatterns, SortOrder order);
/**
* Read data in sorted view
*
* @param <T> object type
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return sorted collection
*/
<T> Publisher<Collection<T>> readSorted(String byPattern, List<String> getPatterns, SortOrder order, int offset, int count);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param order for sorted data
* @return length of sorted data
*/
Publisher<Integer> sortTo(String destName, SortOrder order);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return length of sorted data
*/
Publisher<Integer> sortTo(String destName, SortOrder order, int offset, int count);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @return length of sorted data
*/
Publisher<Integer> sortTo(String destName, String byPattern, SortOrder order);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return length of sorted data
*/
Publisher<Integer> sortTo(String destName, String byPattern, SortOrder order, int offset, int count);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @return length of sorted data
*/
Publisher<Integer> sortTo(String destName, String byPattern, List<String> getPatterns, SortOrder order);
/**
* Sort data and store to <code>destName</code> list
*
* @param destName list object destination
* @param byPattern that is used to generate the keys that are used for sorting
* @param getPatterns that is used to load values by keys in sorted view
* @param order for sorted data
* @param offset of sorted data
* @param count of sorted data
* @return length of sorted data
*/
Publisher<Integer> sortTo(String destName, String byPattern, List<String> getPatterns, SortOrder order, int offset, int count);
}

@ -30,6 +30,34 @@ import org.redisson.config.Config;
*/
public interface RedissonReactiveClient {
/**
* Returns geospatial items holder instance by <code>name</code>.
*
* @param <V> type of value
* @param name - name of object
* @return Geo object
*/
<V> RGeoReactive<V> getGeo(String name);
/**
* Returns geospatial items holder instance by <code>name</code>
* using provided codec for geospatial members.
*
* @param <V> type of value
* @param name - name of object
* @param codec - codec for value
* @return Geo object
*/
<V> RGeoReactive<V> getGeo(String name, Codec codec);
/**
* Returns rate limiter instance by <code>name</code>
*
* @param name of rate limiter
* @return RateLimiter object
*/
RRateLimiterReactive getRateLimiter(String name);
/**
* Returns semaphore instance by name
*
@ -55,6 +83,16 @@ public interface RedissonReactiveClient {
*/
RReadWriteLockReactive getReadWriteLock(String name);
/**
* Returns lock instance by name.
* <p>
* Implements a <b>fair</b> locking so it guarantees an acquire order by threads.
*
* @param name - name of object
* @return Lock object
*/
RLockReactive getFairLock(String name);
/**
* Returns lock instance by name.
* <p>

@ -33,12 +33,16 @@ import org.redisson.command.CommandReactiveExecutor;
*/
abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive {
protected final RExpirableAsync instance;
RedissonExpirableReactive(CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
RedissonExpirableReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RExpirableAsync instance) {
super(codec, connectionManager, name, instance);
this.instance = instance;
}
@Override

@ -0,0 +1,430 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonGeo;
import org.redisson.api.GeoEntry;
import org.redisson.api.GeoOrder;
import org.redisson.api.GeoPosition;
import org.redisson.api.GeoUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RGeoAsync;
import org.redisson.api.RGeoReactive;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonGeoReactive<V> extends RedissonScoredSortedSetReactive<V> implements RGeoReactive<V> {
private final RGeoAsync<V> instance;
public RedissonGeoReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor, name, new RedissonGeo<V>(commandExecutor, name, null));
}
public RedissonGeoReactive(CommandReactiveExecutor commandExecutor, String name, RGeoAsync<V> instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
public RedissonGeoReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
this(codec, commandExecutor, name, new RedissonGeo<V>(codec, commandExecutor, name, null));
}
public RedissonGeoReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RGeoAsync<V> instance) {
super(codec, commandExecutor, name, instance);
this.instance = instance;
}
@Override
public Publisher<Long> add(final double longitude, final double latitude, final V member) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.addAsync(longitude, latitude, member);
}
});
}
@Override
public Publisher<Long> add(final GeoEntry... entries) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.addAsync(entries);
}
});
}
@Override
public Publisher<Double> dist(final V firstMember, final V secondMember, final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.distAsync(firstMember, secondMember, geoUnit);
}
});
}
@Override
public Publisher<Map<V, String>> hash(final V... members) {
return reactive(new Supplier<RFuture<Map<V, String>>>() {
@Override
public RFuture<Map<V, String>> get() {
return instance.hashAsync(members);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> pos(final V... members) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.posAsync(members);
}
});
}
@Override
public Publisher<List<V>> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(longitude, latitude, radius, geoUnit);
}
});
}
@Override
public Publisher<List<V>> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(longitude, latitude, radius, geoUnit, count);
}
});
}
@Override
public Publisher<List<V>> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit,
final GeoOrder geoOrder) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(longitude, latitude, radius, geoUnit, geoOrder);
}
});
}
@Override
public Publisher<List<V>> radius(final double longitude, final double latitude, final double radius, final GeoUnit geoUnit,
final GeoOrder geoOrder, final int count) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(longitude, latitude, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit, count);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final GeoOrder geoOrder) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit, geoOrder);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(longitude, latitude, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit, count);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final GeoOrder geoOrder) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit, geoOrder);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(longitude, latitude, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<List<V>> radius(final V member, final double radius, final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(member, radius, geoUnit);
}
});
}
@Override
public Publisher<List<V>> radius(final V member, final double radius, final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(member, radius, geoUnit, count);
}
});
}
@Override
public Publisher<List<V>> radius(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(member, radius, geoUnit, geoOrder);
}
});
}
@Override
public Publisher<List<V>> radius(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.radiusAsync(member, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(member, radius, geoUnit);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(member, radius, geoUnit, count);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(member, radius, geoUnit, geoOrder);
}
});
}
@Override
public Publisher<Map<V, Double>> radiusWithDistance(final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder,
final int count) {
return reactive(new Supplier<RFuture<Map<V, Double>>>() {
@Override
public RFuture<Map<V, Double>> get() {
return instance.radiusWithDistanceAsync(member, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(member, radius, geoUnit);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(member, radius, geoUnit, count);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit,
final GeoOrder geoOrder) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(member, radius, geoUnit, geoOrder);
}
});
}
@Override
public Publisher<Map<V, GeoPosition>> radiusWithPosition(final V member, final double radius, final GeoUnit geoUnit,
final GeoOrder geoOrder, final int count) {
return reactive(new Supplier<RFuture<Map<V, GeoPosition>>>() {
@Override
public RFuture<Map<V, GeoPosition>> get() {
return instance.radiusWithPositionAsync(member, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<Long> radiusStoreTo(final String destName, final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.radiusStoreToAsync(destName, longitude, latitude, radius, geoUnit);
}
});
}
@Override
public Publisher<Long> radiusStoreTo(final String destName, final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.radiusStoreToAsync(destName, longitude, latitude, radius, geoUnit, count);
}
});
}
@Override
public Publisher<Long> radiusStoreTo(final String destName, final double longitude, final double latitude, final double radius,
final GeoUnit geoUnit, final GeoOrder geoOrder, final int count) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.radiusStoreToAsync(destName, longitude, latitude, radius, geoUnit, geoOrder, count);
}
});
}
@Override
public Publisher<Long> radiusStoreTo(final String destName, final V member, final double radius, final GeoUnit geoUnit) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.radiusStoreToAsync(destName, member, radius, geoUnit);
}
});
}
@Override
public Publisher<Long> radiusStoreTo(final String destName, final V member, final double radius, final GeoUnit geoUnit, final int count) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.radiusStoreToAsync(destName, member, radius, geoUnit, count);
}
});
}
@Override
public Publisher<Long> radiusStoreTo(final String destName, final V member, final double radius, final GeoUnit geoUnit, final GeoOrder geoOrder,
final int count) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.radiusStoreToAsync(destName, member, radius, geoUnit, geoOrder, count);
}
});
}
}

@ -36,6 +36,7 @@ import org.redisson.RedissonList;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RListReactive;
import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
@ -433,4 +434,126 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
return hash;
}
@Override
public Publisher<List<V>> readSorted(final SortOrder order) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.readSortAsync(order);
}
});
}
@Override
public Publisher<List<V>> readSorted(final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.readSortAsync(order, offset, count);
}
});
}
@Override
public Publisher<List<V>> readSorted(final String byPattern, final SortOrder order) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.readSortAsync(byPattern, order);
}
});
}
@Override
public Publisher<List<V>> readSorted(final String byPattern, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<List<V>>>() {
@Override
public RFuture<List<V>> get() {
return instance.readSortAsync(byPattern, order, offset, count);
}
});
}
@Override
public <T> Publisher<Collection<T>> readSorted(final String byPattern, final List<String> getPatterns, final SortOrder order) {
return reactive(new Supplier<RFuture<Collection<T>>>() {
@Override
public RFuture<Collection<T>> get() {
return instance.readSortAsync(byPattern, getPatterns, order);
}
});
}
@Override
public <T> Publisher<Collection<T>> readSorted(final String byPattern, final List<String> getPatterns, final SortOrder order,
final int offset, final int count) {
return reactive(new Supplier<RFuture<Collection<T>>>() {
@Override
public RFuture<Collection<T>> get() {
return instance.readSortAsync(byPattern, getPatterns, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final List<String> getPatterns, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, getPatterns, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final List<String> getPatterns, final SortOrder order,
final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, getPatterns, order, offset, count);
}
});
}
}

@ -18,11 +18,12 @@ package org.redisson.reactive;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonReference;
import org.redisson.api.RExpirableAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RObjectAsync;
import org.redisson.api.RObjectReactive;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
@ -42,9 +43,9 @@ abstract class RedissonObjectReactive implements RObjectReactive {
final CommandReactiveExecutor commandExecutor;
private final String name;
final Codec codec;
protected RExpirableAsync instance;
protected RObjectAsync instance;
public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) {
public RedissonObjectReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) {
this.codec = codec;
this.name = name;
this.commandExecutor = commandExecutor;
@ -55,7 +56,7 @@ abstract class RedissonObjectReactive implements RObjectReactive {
return commandExecutor.reactive(supplier);
}
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RExpirableAsync instance) {
public RedissonObjectReactive(CommandReactiveExecutor commandExecutor, String name, RObjectAsync instance) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name, instance);
}
@ -124,6 +125,86 @@ abstract class RedissonObjectReactive implements RObjectReactive {
}
}
@Override
public Publisher<Void> restore(final byte[] state) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAsync(state);
}
});
}
@Override
public Publisher<Void> restore(final byte[] state, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAsync(state, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Void> restoreAndReplace(final byte[] state) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAndReplaceAsync(state);
}
});
}
@Override
public Publisher<Void> restoreAndReplace(final byte[] state, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.restoreAndReplaceAsync(state, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<byte[]> dump() {
return reactive(new Supplier<RFuture<byte[]>>() {
@Override
public RFuture<byte[]> get() {
return instance.dumpAsync();
}
});
}
@Override
public Publisher<Boolean> touch() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.touchAsync();
}
});
}
@Override
public Publisher<Boolean> unlink() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.unlinkAsync();
}
});
}
@Override
public Publisher<Void> copy(final String host, final int port, final int database, final long timeout) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.copyAsync(host, port, database, timeout);
}
});
}
@Override
public Publisher<Void> rename(final String newName) {
return reactive(new Supplier<RFuture<Void>>() {

@ -0,0 +1,119 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonRateLimiter;
import org.redisson.api.RFuture;
import org.redisson.api.RRateLimiterAsync;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonRateLimiterReactive extends RedissonObjectReactive implements RRateLimiterReactive {
private final RRateLimiterAsync instance;
public RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name) {
this(connectionManager, name, new RedissonRateLimiter(connectionManager, name));
}
private RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name, RRateLimiterAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
@Override
public Publisher<Boolean> trySetRate(final RateType mode, final long rate, final long rateInterval,
final RateIntervalUnit rateIntervalUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetRateAsync(mode, rate, rateInterval, rateIntervalUnit);
}
});
}
@Override
public Publisher<Boolean> tryAcquire() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync();
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits);
}
});
}
@Override
public Publisher<Void> acquire() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync();
}
});
}
@Override
public Publisher<Void> acquire(final long permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync(permits);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long timeout, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(timeout, unit);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long permits, final long timeout, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits, timeout, unit);
}
});
}
}

@ -16,8 +16,11 @@
package org.redisson.reactive;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonScoredSortedSet;
@ -25,9 +28,9 @@ import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.SortOrder;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
@ -182,20 +185,25 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
});
}
private Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Publisher<V> iterator() {
private Publisher<V> scanIteratorReactive(final String pattern, final int count) {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<Object>> scanIteratorReactive(RedisClient client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
protected Publisher<ListScanResult<Object>> scanIteratorReactive(final RedisClient client, final long nextIterPos) {
return reactive(new Supplier<RFuture<ListScanResult<Object>>>() {
@Override
public RFuture<ListScanResult<Object>> get() {
return ((RedissonScoredSortedSet<V>)instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
});
}
});
}
@Override
public Publisher<V> iterator() {
return scanIteratorReactive(null, 10);
}
@Override
public Publisher<Boolean> containsAll(final Collection<?> c) {
return reactive(new Supplier<RFuture<Boolean>>() {
@ -461,4 +469,291 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
});
}
@Override
public Publisher<V> pollLastFromAny(final long timeout, final TimeUnit unit, final String... queueNames) {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.pollLastFromAnyAsync(timeout, unit, queueNames);
}
});
}
@Override
public Publisher<V> pollFirstFromAny(final long timeout, final TimeUnit unit, final String... queueNames) {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.pollFirstFromAnyAsync(timeout, unit, queueNames);
}
});
}
@Override
public Publisher<V> pollFirst(final long timeout, final TimeUnit unit) {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.pollFirstAsync(timeout, unit);
}
});
}
@Override
public Publisher<V> pollLast(final long timeout, final TimeUnit unit) {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.pollLastAsync(timeout, unit);
}
});
}
@Override
public Publisher<Collection<V>> pollFirst(final int count) {
return reactive(new Supplier<RFuture<Collection<V>>>() {
@Override
public RFuture<Collection<V>> get() {
return instance.pollFirstAsync(count);
}
});
}
@Override
public Publisher<Collection<V>> pollLast(final int count) {
return reactive(new Supplier<RFuture<Collection<V>>>() {
@Override
public RFuture<Collection<V>> get() {
return instance.pollLastAsync(count);
}
});
}
@Override
public Publisher<Double> firstScore() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.firstScoreAsync();
}
});
}
@Override
public Publisher<Double> lastScore() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.lastScoreAsync();
}
});
}
@Override
public Publisher<V> iterator(String pattern) {
return scanIteratorReactive(pattern, 10);
}
@Override
public Publisher<V> iterator(int count) {
return scanIteratorReactive(null, count);
}
@Override
public Publisher<V> iterator(String pattern, int count) {
return scanIteratorReactive(pattern, count);
}
@Override
public Publisher<Integer> revRank(final V o) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.revRankAsync(o);
}
});
}
@Override
public Publisher<Long> addAll(final Map<V, Double> objects) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.addAllAsync(objects);
}
});
}
@Override
public Publisher<Integer> addAndGetRank(final double score, final V object) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.addAndGetRankAsync(score, object);
}
});
}
@Override
public Publisher<Integer> addAndGetRevRank(final double score, final V object) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.addAndGetRevRankAsync(score, object);
}
});
}
@Override
public Publisher<Boolean> tryAdd(final double score, final V object) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAddAsync(score, object);
}
});
}
@Override
public Publisher<Integer> addScoreAndGetRevRank(final V object, final Number value) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.addScoreAndGetRevRankAsync(object, value);
}
});
}
@Override
public Publisher<Integer> addScoreAndGetRank(final V object, final Number value) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.addScoreAndGetRankAsync(object, value);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final SortOrder order) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(order);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(order, offset, count);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final String byPattern, final SortOrder order) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(byPattern, order);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final String byPattern, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(byPattern, order, offset, count);
}
});
}
@Override
public <T> Publisher<Collection<T>> readSorted(final String byPattern, final List<String> getPatterns, final SortOrder order) {
return reactive(new Supplier<RFuture<Collection<T>>>() {
@Override
public RFuture<Collection<T>> get() {
return instance.readSortAsync(byPattern, getPatterns, order);
}
});
}
@Override
public <T> Publisher<Collection<T>> readSorted(final String byPattern, final List<String> getPatterns, final SortOrder order,
final int offset, final int count) {
return reactive(new Supplier<RFuture<Collection<T>>>() {
@Override
public RFuture<Collection<T>> get() {
return instance.readSortAsync(byPattern, getPatterns, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final List<String> getPatterns, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, getPatterns, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final List<String> getPatterns, final SortOrder order,
final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, getPatterns, order, offset, count);
}
});
}
}

@ -27,6 +27,7 @@ import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RSetAsync;
import org.redisson.api.RSetReactive;
import org.redisson.api.SortOrder;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
@ -281,4 +282,126 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
return iterator(null, 10);
}
@Override
public Publisher<Set<V>> readSorted(final SortOrder order) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(order);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(order, offset, count);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final String byPattern, final SortOrder order) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(byPattern, order);
}
});
}
@Override
public Publisher<Set<V>> readSorted(final String byPattern, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Set<V>>>() {
@Override
public RFuture<Set<V>> get() {
return instance.readSortAsync(byPattern, order, offset, count);
}
});
}
@Override
public <T> Publisher<Collection<T>> readSorted(final String byPattern, final List<String> getPatterns, final SortOrder order) {
return reactive(new Supplier<RFuture<Collection<T>>>() {
@Override
public RFuture<Collection<T>> get() {
return instance.readSortAsync(byPattern, getPatterns, order);
}
});
}
@Override
public <T> Publisher<Collection<T>> readSorted(final String byPattern, final List<String> getPatterns, final SortOrder order,
final int offset, final int count) {
return reactive(new Supplier<RFuture<Collection<T>>>() {
@Override
public RFuture<Collection<T>> get() {
return instance.readSortAsync(byPattern, getPatterns, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final SortOrder order, final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, order, offset, count);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final List<String> getPatterns, final SortOrder order) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, getPatterns, order);
}
});
}
@Override
public Publisher<Integer> sortTo(final String destName, final String byPattern, final List<String> getPatterns, final SortOrder order,
final int offset, final int count) {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sortToAsync(destName, byPattern, getPatterns, order, offset, count);
}
});
}
}

Loading…
Cancel
Save