|
|
|
@ -20,11 +20,16 @@ import java.util.Collection;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
import org.redisson.RedissonLexSortedSet;
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.api.RLexSortedSetAsync;
|
|
|
|
|
import org.redisson.api.RLexSortedSetReactive;
|
|
|
|
|
import org.redisson.client.codec.StringCodec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.command.CommandReactiveExecutor;
|
|
|
|
|
|
|
|
|
|
import reactor.fn.Supplier;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
|
* @author Nikita Koksharov
|
|
|
|
@ -32,8 +37,11 @@ import org.redisson.command.CommandReactiveExecutor;
|
|
|
|
|
*/
|
|
|
|
|
public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactive<String> implements RLexSortedSetReactive {
|
|
|
|
|
|
|
|
|
|
private final RLexSortedSetAsync instance;
|
|
|
|
|
|
|
|
|
|
public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
|
|
|
|
|
super(StringCodec.INSTANCE, commandExecutor, name);
|
|
|
|
|
instance = new RedissonLexSortedSet(commandExecutor, name, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -42,101 +50,128 @@ public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactiv
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> removeRangeHeadByLex(String toElement, boolean toInclusive) {
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), "-", toValue);
|
|
|
|
|
public Publisher<Integer> removeRangeHead(final String toElement, final boolean toInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
return instance.removeRangeHeadAsync(toElement, toInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> removeRangeTailByLex(String fromElement, boolean fromInclusive) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), fromValue, "+");
|
|
|
|
|
public Publisher<Integer> removeRangeTail(final String fromElement, final boolean fromInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
return instance.removeRangeTailAsync(fromElement, fromInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> removeRangeByLex(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), fromValue, toValue);
|
|
|
|
|
public Publisher<Integer> removeRange(final String fromElement, final boolean fromInclusive, final String toElement, final boolean toInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
return instance.removeRangeAsync(fromElement, fromInclusive, toElement, toInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> lexRangeHead(String toElement, boolean toInclusive) {
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), "-", toValue);
|
|
|
|
|
public Publisher<Collection<String>> rangeHead(final String toElement, final boolean toInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Collection<String>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Collection<String>> get() {
|
|
|
|
|
return instance.rangeHeadAsync(toElement, toInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> lexRangeTail(String fromElement, boolean fromInclusive) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, "+");
|
|
|
|
|
public Publisher<Collection<String>> rangeTail(final String fromElement, final boolean fromInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Collection<String>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Collection<String>> get() {
|
|
|
|
|
return instance.rangeTailAsync(fromElement, fromInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, toValue);
|
|
|
|
|
public Publisher<Collection<String>> range(final String fromElement, final boolean fromInclusive, final String toElement, final boolean toInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Collection<String>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Collection<String>> get() {
|
|
|
|
|
return instance.rangeAsync(fromElement, fromInclusive, toElement, toInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> lexRangeHead(String toElement, boolean toInclusive, int offset, int count) {
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), "-", toValue, "LIMIT", offset, count);
|
|
|
|
|
public Publisher<Collection<String>> rangeHead(final String toElement, final boolean toInclusive, final int offset, final int count) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Collection<String>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Collection<String>> get() {
|
|
|
|
|
return instance.rangeHeadAsync(toElement, toInclusive, offset, count);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> lexRangeTail(String fromElement, boolean fromInclusive, int offset, int count) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, "+", "LIMIT", offset, count);
|
|
|
|
|
public Publisher<Collection<String>> rangeTail(final String fromElement, final boolean fromInclusive, final int offset, final int count) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Collection<String>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Collection<String>> get() {
|
|
|
|
|
return instance.rangeTailAsync(fromElement, fromInclusive, offset, count);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive, int offset, int count) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, toValue, "LIMIT", offset, count);
|
|
|
|
|
public Publisher<Collection<String>> range(final String fromElement, final boolean fromInclusive, final String toElement, final boolean toInclusive, final int offset, final int count) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Collection<String>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Collection<String>> get() {
|
|
|
|
|
return instance.rangeAsync(fromElement, fromInclusive, toElement, toInclusive, offset, count);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> lexCountTail(String fromElement, boolean fromInclusive) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZLEXCOUNT, getName(), fromValue, "+");
|
|
|
|
|
public Publisher<Integer> countTail(final String fromElement, final boolean fromInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
return instance.countTailAsync(fromElement, fromInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> lexCountHead(String toElement, boolean toInclusive) {
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZLEXCOUNT, getName(), "-", toValue);
|
|
|
|
|
public Publisher<Integer> countHead(final String toElement, final boolean toInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
return instance.countHeadAsync(toElement, toInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> lexCount(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
|
|
|
|
|
String fromValue = value(fromElement, fromInclusive);
|
|
|
|
|
String toValue = value(toElement, toInclusive);
|
|
|
|
|
|
|
|
|
|
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZLEXCOUNT, getName(), fromValue, toValue);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String value(String fromElement, boolean fromInclusive) {
|
|
|
|
|
String fromValue = fromElement.toString();
|
|
|
|
|
if (fromInclusive) {
|
|
|
|
|
fromValue = "[" + fromValue;
|
|
|
|
|
} else {
|
|
|
|
|
fromValue = "(" + fromValue;
|
|
|
|
|
}
|
|
|
|
|
return fromValue;
|
|
|
|
|
public Publisher<Integer> count(final String fromElement, final boolean fromInclusive, final String toElement, final boolean toInclusive) {
|
|
|
|
|
return reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
return instance.countAsync(fromElement, fromInclusive, toElement, toInclusive);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Integer> add(String e) {
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZADD_RAW, getName(), 0, e);
|
|
|
|
|
public Publisher<Integer> add(final String e) {
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZADD_INT, getName(), 0, e);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -146,7 +181,12 @@ public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactiv
|
|
|
|
|
params.add(0);
|
|
|
|
|
params.add(param);
|
|
|
|
|
}
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZADD_RAW, getName(), params.toArray());
|
|
|
|
|
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZADD_INT, getName(), params.toArray());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Collection<String>> range(int startIndex, int endIndex) {
|
|
|
|
|
return valueRange(startIndex, endIndex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|