Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 8ace4ceddd

@ -1,6 +1,6 @@
Redisson: Redis based In-Memory Data Grid for Java. Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis client
==== ====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.

@ -92,7 +92,7 @@ import io.netty.util.internal.PlatformDependent;
*/ */
public class RedissonExecutorService implements RScheduledExecutorService { public class RedissonExecutorService implements RScheduledExecutorService {
private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class);
private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS); private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
@ -250,6 +250,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "if retryInterval ~= false then " + "if retryInterval ~= false then "
+ "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);"
+ "for i = 1, #expiredTaskIds, 1 do " + "for i = 1, #expiredTaskIds, 1 do "
+ "local name = expiredTaskIds[i];" + "local name = expiredTaskIds[i];"
+ "local scheduledName = expiredTaskIds[i];" + "local scheduledName = expiredTaskIds[i];"
@ -266,7 +267,12 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "if v[1] == expiredTaskIds[i] then " + "if v[1] == expiredTaskIds[i] then "
+ "redis.call('publish', KEYS[3], startTime); " + "redis.call('publish', KEYS[3], startTime); "
+ "end;" + "end;"
+ "redis.call('rpush', KEYS[1], name);"
+ "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then "
+ "redis.call('rpush', KEYS[1], name); "
+ "else "
+ "redis.call('lrem', KEYS[1], -1, name); "
+ "end; "
+ "end; " + "end; "
+ "else " + "else "
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
@ -279,7 +285,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
+ "end " + "end "
+ "return nil;", + "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), Arrays.<Object>asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName),
System.currentTimeMillis(), 100); System.currentTimeMillis(), 50);
} }
}; };
queueTransferService.schedule(getName(), task); queueTransferService.schedule(getName(), task);
@ -304,8 +310,6 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}); });
} }
private long repeatInterval = 5000;
@Override @Override
public void execute(Runnable task) { public void execute(Runnable task) {
check(task); check(task);

@ -70,10 +70,18 @@ public abstract class RedissonObject implements RObject {
return "{" + name + "}:" + suffix; return "{" + name + "}:" + suffix;
} }
protected <V> V get(RFuture<V> future) { protected final <V> V get(RFuture<V> future) {
return commandExecutor.get(future); return commandExecutor.get(future);
} }
protected final long toSeconds(long timeout, TimeUnit unit) {
long seconds = unit.toSeconds(timeout);
if (timeout != 0 && seconds == 0) {
seconds = 1;
}
return seconds;
}
@Override @Override
public String getName() { public String getName() {
return name; return name;

@ -68,14 +68,6 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
return value; return value;
} }
protected long toSeconds(long timeout, TimeUnit unit) {
long seconds = unit.toSeconds(timeout);
if (timeout != 0 && seconds == 0) {
seconds = 1;
}
return seconds;
}
@Override @Override
public V remove() { public V remove() {
return removeFirst(); return removeFirst();

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.Set; import java.util.Set;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
@ -36,6 +37,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
@ -89,25 +91,104 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return get(pollLastAsync()); return get(pollLastAsync());
} }
@Override
public Collection<V> pollFirst(int count) {
return get(pollFirstAsync(count));
}
@Override
public Collection<V> pollLast(int count) {
return get(pollLastAsync(count));
}
@Override
public RFuture<Collection<V>> pollFirstAsync(int count) {
if (count <= 0) {
return RedissonPromise.<Collection<V>>newSucceededFuture(Collections.<V>emptyList());
}
return poll(0, count-1, RedisCommands.EVAL_LIST);
}
@Override
public RFuture<Collection<V>> pollLastAsync(int count) {
if (count <= 0) {
return RedissonPromise.<Collection<V>>newSucceededFuture(Collections.<V>emptyList());
}
return poll(-count, -1, RedisCommands.EVAL_LIST);
}
@Override @Override
public RFuture<V> pollFirstAsync() { public RFuture<V> pollFirstAsync() {
return poll(0); return poll(0, 0, RedisCommands.EVAL_FIRST_LIST);
} }
@Override @Override
public RFuture<V> pollLastAsync() { public RFuture<V> pollLastAsync() {
return poll(-1); return poll(-1, -1, RedisCommands.EVAL_FIRST_LIST);
} }
private RFuture<V> poll(int index) { private <T> RFuture<T> poll(int from, int to, RedisCommand<?> command) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, return commandExecutor.evalWriteAsync(getName(), codec, command,
"local v = redis.call('zrange', KEYS[1], ARGV[1], ARGV[2]); " "local v = redis.call('zrange', KEYS[1], ARGV[1], ARGV[2]); "
+ "if v[1] ~= nil then " + "if #v > 0 then "
+ "redis.call('zremrangebyrank', KEYS[1], ARGV[1], ARGV[2]); " + "redis.call('zremrangebyrank', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v[1]; " + "return v; "
+ "end " + "end "
+ "return nil;", + "return v;",
Collections.<Object>singletonList(getName()), index, index); Collections.<Object>singletonList(getName()), from, to);
}
@Override
public V pollFirst(long timeout, TimeUnit unit) {
return get(pollFirstAsync(timeout, unit));
}
@Override
public RFuture<V> pollFirstAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit));
}
@Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
}
@Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, params.toArray());
}
@Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) {
return get(pollLastFromAnyAsync(timeout, unit, queueNames));
}
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, params.toArray());
}
@Override
public V pollLast(long timeout, TimeUnit unit) {
return get(pollLastAsync(timeout, unit));
}
@Override
public RFuture<V> pollLastAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, getName(), toSeconds(timeout, unit));
} }
@Override @Override

@ -32,7 +32,6 @@ public interface RBitSetAsync extends RExpirableAsync {
* Returns zero if there are no any set bit. * Returns zero if there are no any set bit.
* *
* @return "logical size" = index of highest set bit plus one * @return "logical size" = index of highest set bit plus one
* @return void
*/ */
RFuture<Long> lengthAsync(); RFuture<Long> lengthAsync();

@ -20,6 +20,7 @@ import java.util.BitSet;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
/** /**
* Vector of bits that grows as needed.
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
@ -30,36 +31,141 @@ public interface RBitSetReactive extends RExpirableReactive {
Publisher<byte[]> toByteArray(); Publisher<byte[]> toByteArray();
/**
* Returns "logical size" = index of highest set bit plus one.
* Returns zero if there are no any set bit.
*
* @return "logical size" = index of highest set bit plus one
*/
Publisher<Long> length(); Publisher<Long> length();
/**
* Set all bits to <code>value</code> from <code>fromIndex</code> (inclusive) to <code>toIndex</code> (exclusive)
*
* @param fromIndex inclusive
* @param toIndex exclusive
* @param value true = 1, false = 0
* @return void
*
*/
Publisher<Void> set(long fromIndex, long toIndex, boolean value); Publisher<Void> set(long fromIndex, long toIndex, boolean value);
/**
* Set all bits to zero from <code>fromIndex</code> (inclusive) to <code>toIndex</code> (exclusive)
*
* @param fromIndex inclusive
* @param toIndex exclusive
* @return void
*
*/
Publisher<Void> clear(long fromIndex, long toIndex); Publisher<Void> clear(long fromIndex, long toIndex);
/**
* Copy bits state of source BitSet object to this object
*
* @param bs - BitSet source
* @return void
*/
Publisher<Void> set(BitSet bs); Publisher<Void> set(BitSet bs);
/**
* Executes NOT operation over all bits
*
* @return void
*/
Publisher<Void> not(); Publisher<Void> not();
/**
* Set all bits to one from <code>fromIndex</code> (inclusive) to <code>toIndex</code> (exclusive)
*
* @param fromIndex inclusive
* @param toIndex exclusive
* @return void
*/
Publisher<Void> set(long fromIndex, long toIndex); Publisher<Void> set(long fromIndex, long toIndex);
/**
* Returns number of set bits.
*
* @return number of set bits.
*/
Publisher<Long> size(); Publisher<Long> size();
/**
* Returns <code>true</code> if bit set to one and <code>false</code> overwise.
*
* @param bitIndex - index of bit
* @return <code>true</code> if bit set to one and <code>false</code> overwise.
*/
Publisher<Boolean> get(long bitIndex); Publisher<Boolean> get(long bitIndex);
/**
* Set bit to one at specified bitIndex
*
* @param bitIndex - index of bit
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
Publisher<Boolean> set(long bitIndex); Publisher<Boolean> set(long bitIndex);
/**
* Set bit to <code>value</code> at specified <code>bitIndex</code>
*
* @param bitIndex - index of bit
* @param value true = 1, false = 0
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
Publisher<Boolean> set(long bitIndex, boolean value); Publisher<Boolean> set(long bitIndex, boolean value);
/**
* Returns the number of bits set to one.
*
* @return number of bits
*/
Publisher<Long> cardinality(); Publisher<Long> cardinality();
/**
* Set bit to zero at specified <code>bitIndex</code>
*
* @param bitIndex - index of bit
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
Publisher<Boolean> clear(long bitIndex); Publisher<Boolean> clear(long bitIndex);
/**
* Set all bits to zero
*
* @return void
*/
Publisher<Void> clear(); Publisher<Void> clear();
/**
* Executes OR operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
* @return void
*/
Publisher<Void> or(String... bitSetNames); Publisher<Void> or(String... bitSetNames);
/**
* Executes AND operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
* @return void
*/
Publisher<Void> and(String... bitSetNames); Publisher<Void> and(String... bitSetNames);
/**
* Executes XOR operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
* @return void
*/
Publisher<Void> xor(String... bitSetNames); Publisher<Void> xor(String... bitSetNames);
} }

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.ScoredEntry;
@ -45,16 +46,119 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/ */
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce(); <KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Removes and returns first available tail element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty
*/
V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element, or {@code null} if all sorted sets are empty
*/
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element,
* or {@code null} if this sorted set is empty
*/
V pollFirst(long timeout, TimeUnit unit);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element or {@code null} if this sorted set is empty
*/
V pollLast(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements or {@code null} if this sorted set is empty.
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this sorted set is empty
*/
Collection<V> pollFirst(int count);
/**
* Removes and returns the tail elements or {@code null} if this sorted set is empty.
*
* @param count - elements amount
* @return the tail element or {@code null} if this sorted set is empty
*/
Collection<V> pollLast(int count);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*
* @return the head element,
* or {@code null} if this sorted set is empty
*/
V pollFirst(); V pollFirst();
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
V pollLast(); V pollLast();
/**
* Returns the head element or {@code null} if this sorted set is empty.
*
* @return the head element or {@code null} if this sorted set is empty
*/
V first(); V first();
/**
* Returns the tail element or {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
V last(); V last();
/**
* Returns score of the tail element or returns {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
Double firstScore(); Double firstScore();
/**
* Returns score of the head element or returns {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
Double lastScore(); Double lastScore();
Long addAll(Map<V, Double> objects); Long addAll(Map<V, Double> objects);

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.ScoredEntry;
@ -30,16 +31,124 @@ import org.redisson.client.protocol.ScoredEntry;
*/ */
public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsync<Set<V>> { public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsync<Set<V>> {
RFuture<V> pollLastAsync(); /**
* Removes and returns first available tail element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty
*/
RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element, or {@code null} if all sorted sets are empty
*
*/
RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element,
* or {@code null} if this sorted set is empty
*/
RFuture<V> pollFirstAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<V> pollLastAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements or {@code null} if this sorted set is empty.
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this sorted set is empty
*/
RFuture<Collection<V>> pollFirstAsync(int count);
/**
* Removes and returns the tail elements or {@code null} if this sorted set is empty.
*
* @param count - elements amount
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<Collection<V>> pollLastAsync(int count);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*
* @return the head element,
* or {@code null} if this sorted set is empty
*/
RFuture<V> pollFirstAsync(); RFuture<V> pollFirstAsync();
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<V> pollLastAsync();
/**
* Returns the head element or {@code null} if this sorted set is empty.
*
* @return the head element or {@code null} if this sorted set is empty
*/
RFuture<V> firstAsync(); RFuture<V> firstAsync();
/**
* Returns the tail element or {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<V> lastAsync(); RFuture<V> lastAsync();
/**
* Returns score of the head element or returns {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<Double> firstScoreAsync(); RFuture<Double> firstScoreAsync();
/**
* Returns score of the tail element or returns {@code null} if this sorted set is empty.
*
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<Double> lastScoreAsync(); RFuture<Double> lastScoreAsync();
RFuture<Long> addAllAsync(Map<V, Double> objects); RFuture<Long> addAllAsync(Map<V, Double> objects);
@ -48,8 +157,20 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
RFuture<Integer> removeRangeByRankAsync(int startIndex, int endIndex); RFuture<Integer> removeRangeByRankAsync(int startIndex, int endIndex);
/**
* Returns rank of value, with the scores ordered from low to high.
*
* @param o - object
* @return rank or <code>null</code> if value does not exist
*/
RFuture<Integer> rankAsync(V o); RFuture<Integer> rankAsync(V o);
/**
* Returns rank of value, with the scores ordered from high to low.
*
* @param o - object
* @return rank or <code>null</code> if value does not exist
*/
RFuture<Integer> revRankAsync(V o); RFuture<Integer> revRankAsync(V o);
/** /**
@ -90,7 +211,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
/** /**
* Adds element to this set only if has not been added before. * Adds element to this set only if has not been added before.
* <p> * <p>
* Works only with <b>Redis 3.0.2 and higher.</b> * Requires <b>Redis 3.0.2 and higher.</b>
* *
* @param score - object score * @param score - object score
* @param object - object itself * @param object - object itself

@ -24,7 +24,6 @@ import java.util.Set;
import org.redisson.api.RType; import org.redisson.api.RType;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitSetReplayConvertor;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
@ -36,13 +35,14 @@ import org.redisson.client.protocol.convertor.DoubleNullSafeReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor;
import org.redisson.client.protocol.convertor.TimeObjectDecoder;
import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.TimeObjectDecoder;
import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.TypeConvertor; import org.redisson.client.protocol.convertor.TypeConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
@ -51,12 +51,12 @@ import org.redisson.client.protocol.decoder.Long2MultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
@ -93,8 +93,6 @@ public interface RedisCommands {
RedisStrictCommand<Integer> BITPOS = new RedisStrictCommand<Integer>("BITPOS", new IntegerReplayConvertor()); RedisStrictCommand<Integer> BITPOS = new RedisStrictCommand<Integer>("BITPOS", new IntegerReplayConvertor());
RedisStrictCommand<Void> SETBIT_VOID = new RedisStrictCommand<Void>("SETBIT", new VoidReplayConvertor()); RedisStrictCommand<Void> SETBIT_VOID = new RedisStrictCommand<Void>("SETBIT", new VoidReplayConvertor());
RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> SETBIT_TRUE = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor(0));
RedisStrictCommand<Boolean> SETBIT_FALSE = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor(1));
RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor()); RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor());
RedisStrictCommand<Integer> WAIT = new RedisStrictCommand<Integer>("WAIT", new IntegerReplayConvertor()); RedisStrictCommand<Integer> WAIT = new RedisStrictCommand<Integer>("WAIT", new IntegerReplayConvertor());
@ -119,9 +117,11 @@ public interface RedisCommands {
RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor()); RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor());
RedisCommand<Integer> ZRANK_INT = new RedisCommand<Integer>("ZRANK", new IntegerReplayConvertor()); RedisCommand<Integer> ZRANK_INT = new RedisCommand<Integer>("ZRANK", new IntegerReplayConvertor());
RedisCommand<Integer> ZREVRANK_INT = new RedisCommand<Integer>("ZREVRANK", new IntegerReplayConvertor()); RedisCommand<Integer> ZREVRANK_INT = new RedisCommand<Integer>("ZREVRANK", new IntegerReplayConvertor());
RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ObjectFirstResultReplayDecoder<Object>()); RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ListFirstObjectDecoder());
RedisStrictCommand<Double> ZRANGE_SINGLE_SCORE = new RedisStrictCommand<Double>("ZRANGE", new ObjectFirstScoreReplayDecoder()); RedisStrictCommand<Double> ZRANGE_SINGLE_SCORE = new RedisStrictCommand<Double>("ZRANGE", new ObjectFirstScoreReplayDecoder());
RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZPOPMIN = new RedisCommand<List<Object>>("ZPOPMIN", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZPOPMAX = new RedisCommand<List<Object>>("ZPOPMAX", new ObjectListReplayDecoder<Object>());
RedisStrictCommand<Integer> ZREMRANGEBYRANK = new RedisStrictCommand<Integer>("ZREMRANGEBYRANK", new IntegerReplayConvertor()); RedisStrictCommand<Integer> ZREMRANGEBYRANK = new RedisStrictCommand<Integer>("ZREMRANGEBYRANK", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZREMRANGEBYSCORE = new RedisStrictCommand<Integer>("ZREMRANGEBYSCORE", new IntegerReplayConvertor()); RedisStrictCommand<Integer> ZREMRANGEBYSCORE = new RedisStrictCommand<Integer>("ZREMRANGEBYSCORE", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZREMRANGEBYLEX = new RedisStrictCommand<Integer>("ZREMRANGEBYLEX", new IntegerReplayConvertor()); RedisStrictCommand<Integer> ZREMRANGEBYLEX = new RedisStrictCommand<Integer>("ZREMRANGEBYLEX", new IntegerReplayConvertor());
@ -191,9 +191,11 @@ public interface RedisCommands {
RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH"); RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH");
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());
Set<String> BLOCKING_COMMANDS = new HashSet<String>( Set<String> BLOCKING_COMMANDS = new HashSet<String>(
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName())); Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName()));
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor()); RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor());
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT"); RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");
@ -228,6 +230,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL"); RedisStrictCommand<Long> EVAL_LONG = new RedisStrictCommand<Long>("EVAL");
RedisStrictCommand<Long> EVAL_LONG_SAFE = new RedisStrictCommand<Long>("EVAL", new LongReplayConvertor()); RedisStrictCommand<Long> EVAL_LONG_SAFE = new RedisStrictCommand<Long>("EVAL", new LongReplayConvertor());
RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor()); RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor());
RedisCommand<Object> EVAL_FIRST_LIST = new RedisCommand<Object>("EVAL", new ListFirstObjectDecoder());
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>()); RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL"); RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");

@ -1,38 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class BitSetReplayConvertor extends SingleConvertor<Boolean> {
private final int expectedValue;
public BitSetReplayConvertor(int expectedValue) {
super();
this.expectedValue = expectedValue;
}
@Override
public Boolean convert(Object obj) {
return expectedValue == (Long)obj;
}
}

@ -23,6 +23,7 @@ import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
@ -53,17 +54,19 @@ public class ClusterNodesDecoder implements Decoder<List<ClusterNodeInfo>> {
String nodeId = params[0]; String nodeId = params[0];
node.setNodeId(nodeId); node.setNodeId(nodeId);
String flags = params[2];
for (String flag : flags.split(",")) {
String flagValue = flag.toUpperCase().replaceAll("\\?", "");
node.addFlag(ClusterNodeInfo.Flag.valueOf(flagValue));
}
if (!node.containsFlag(Flag.NOADDR)) {
String protocol = "redis://"; String protocol = "redis://";
if (ssl) { if (ssl) {
protocol = "rediss://"; protocol = "rediss://";
} }
String addr = protocol + params[1].split("@")[0]; String addr = protocol + params[1].split("@")[0];
node.setAddress(addr); node.setAddress(addr);
String flags = params[2];
for (String flag : flags.split(",")) {
String flagValue = flag.toUpperCase().replaceAll("\\?", "");
node.addFlag(ClusterNodeInfo.Flag.valueOf(flagValue));
} }
String slaveOf = params[3]; String slaveOf = params[3];

@ -39,6 +39,9 @@ public class ObjectFirstScoreReplayDecoder implements MultiDecoder<Double> {
@Override @Override
public Double decode(List<Object> parts, State state) { public Double decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
return (Double) parts.get(1); return (Double) parts.get(1);
} }

@ -17,6 +17,8 @@ package org.redisson.client.protocol.decoder;
import java.util.List; import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State; import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
@ -24,17 +26,25 @@ import org.redisson.client.protocol.Decoder;
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
* @param <T> type
*/ */
public class ObjectFirstResultReplayDecoder<T> implements MultiDecoder<T> { public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder<Object> {
@Override @Override
public T decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
return (T) parts.get(0); if (!parts.isEmpty()) {
return parts.get(2);
}
return null;
} }
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum == 0) {
return StringCodec.INSTANCE.getValueDecoder();
}
if (paramNum == 1) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null; return null;
} }

@ -288,7 +288,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void run() { public void run() {
if (isConfigEndpoint) { if (isConfigEndpoint) {
final URI uri = cfg.getNodeAddresses().iterator().next(); final URI uri = cfg.getNodeAddresses().iterator().next();
final AddressResolver<InetSocketAddress> resolver = createResolverGroup().getResolver(getGroup().next()); final AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(getGroup().next());
Future<List<InetSocketAddress>> allNodes = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); Future<List<InetSocketAddress>> allNodes = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() { allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override @Override

@ -16,6 +16,7 @@
package org.redisson.cluster; package org.redisson.cluster;
import java.net.URI; import java.net.URI;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;
@ -33,7 +34,7 @@ public class ClusterNodeInfo {
private String nodeId; private String nodeId;
private URI address; private URI address;
private final Set<Flag> flags = new HashSet<Flag>(); private final Set<Flag> flags = EnumSet.noneOf(Flag.class);
private String slaveOf; private String slaveOf;
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>(); private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();

@ -211,19 +211,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.commandExecutor = new CommandSyncService(this); this.commandExecutor = new CommandSyncService(this);
} }
/*
* Remove it once https://github.com/netty/netty/issues/7882 get resolved
*/
protected DnsAddressResolverGroup createResolverGroup() {
if (cfg.getTransportMode() == TransportMode.EPOLL) {
return cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
} else if (cfg.getTransportMode() == TransportMode.KQUEUE) {
return cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
return cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
protected void closeNodeConnections() { protected void closeNodeConnections() {
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
for (RedisConnection connection : nodeConnections.values()) { for (RedisConnection connection : nodeConnections.values()) {

@ -32,7 +32,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
@ -249,7 +248,7 @@ public class MasterSlaveEntry {
return; return;
} }
RFuture<RedisConnection> newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE); RFuture<RedisConnection> newConnection = connectionWriteOp(commandData.getCommand());
newConnection.addListener(new FutureListener<RedisConnection>() { newConnection.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -263,7 +262,7 @@ public class MasterSlaveEntry {
final FutureListener<Object> listener = new FutureListener<Object>() { final FutureListener<Object> listener = new FutureListener<Object>() {
@Override @Override
public void operationComplete(Future<Object> future) throws Exception { public void operationComplete(Future<Object> future) throws Exception {
releaseRead(newConnection); releaseWrite(newConnection);
} }
}; };
commandData.getPromise().addListener(listener); commandData.getPromise().addListener(listener);
@ -277,7 +276,7 @@ public class MasterSlaveEntry {
if (!future.isSuccess()) { if (!future.isSuccess()) {
listener.operationComplete(null); listener.operationComplete(null);
commandData.getPromise().removeListener(listener); commandData.getPromise().removeListener(listener);
releaseRead(newConnection); releaseWrite(newConnection);
log.error("Can't resubscribe blocking queue {}", commandData); log.error("Can't resubscribe blocking queue {}", commandData);
} }
} }

@ -37,6 +37,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
private static MasterSlaveServersConfig create(SingleServerConfig cfg) { private static MasterSlaveServersConfig create(SingleServerConfig cfg) {
MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig(); MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig();
newconfig.setPingConnectionInterval(cfg.getPingConnectionInterval());
newconfig.setSslEnableEndpointIdentification(cfg.isSslEnableEndpointIdentification()); newconfig.setSslEnableEndpointIdentification(cfg.isSslEnableEndpointIdentification());
newconfig.setSslProvider(cfg.getSslProvider()); newconfig.setSslProvider(cfg.getSslProvider());
newconfig.setSslTruststore(cfg.getSslTruststore()); newconfig.setSslTruststore(cfg.getSslTruststore());

@ -35,27 +35,16 @@ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
@Override @Override
protected void onMessage(RedissonLockEntry value, Long message) { protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(unlockMessage)) { if (message.equals(unlockMessage)) {
value.getLatch().release();
while (true) { while (true) {
Runnable runnableToExecute = null; Runnable runnableToExecute = value.getListeners().poll();
synchronized (value) { if (runnableToExecute == null) {
Runnable runnable = value.getListeners().poll(); break;
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
} }
if (runnableToExecute != null) {
runnableToExecute.run(); runnableToExecute.run();
} else {
return;
}
} }
value.getLatch().release();
} }
} }

@ -32,27 +32,16 @@ public class SemaphorePubSub extends PublishSubscribe<RedissonLockEntry> {
@Override @Override
protected void onMessage(RedissonLockEntry value, Long message) { protected void onMessage(RedissonLockEntry value, Long message) {
value.getLatch().release(message.intValue());
while (true) { while (true) {
Runnable runnableToExecute = null; Runnable runnableToExecute = value.getListeners().poll();
synchronized (value) { if (runnableToExecute == null) {
Runnable runnable = value.getListeners().poll(); break;
if (runnable != null) {
if (value.getLatch().tryAcquire()) {
runnableToExecute = runnable;
} else {
value.addListener(runnable);
}
}
} }
if (runnableToExecute != null) {
runnableToExecute.run(); runnableToExecute.run();
} else {
return;
}
} }
value.getLatch().release(message.intValue());
} }
} }

@ -85,7 +85,12 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
@Override @Override
public Publisher<Integer> size() { public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
} }
@Override @Override

@ -16,7 +16,7 @@ public class RedissonBlockingDequeTest extends BaseTest {
RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("blocking_deque"); RBlockingDeque<String> blockingDeque = redisson.getBlockingDeque("blocking_deque");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
String redisTask = blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS); String redisTask = blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - start).isBetween(950L, 1050L); assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L);
assertThat(redisTask).isNull(); assertThat(redisTask).isNull();
} }

@ -35,7 +35,7 @@ public class RedissonRateLimiterTest extends BaseTest {
} }
@Test @Test
public void test3() throws InterruptedException { public void testConcurrency() throws InterruptedException {
RRateLimiter rr = redisson.getRateLimiter("test"); RRateLimiter rr = redisson.getRateLimiter("test");
assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue(); assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue();
assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse(); assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse();
@ -55,7 +55,8 @@ public class RedissonRateLimiterTest extends BaseTest {
} }
} }
try { try {
Thread.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { Thread.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }

@ -12,6 +12,8 @@ import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
@ -28,6 +30,42 @@ import org.redisson.client.protocol.ScoredEntry;
public class RedissonScoredSortedSetTest extends BaseTest { public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testPollFirstFromAny() throws InterruptedException {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue3.add(0.1, 2);
queue1.add(0.1, 1);
queue2.add(0.1, 3);
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollFirstFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollLastFromAny() throws InterruptedException {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue3.add(0.1, 2);
queue1.add(0.1, 1);
queue2.add(0.1, 3);
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollLastFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test @Test
public void testSortOrder() { public void testSortOrder() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("list", IntegerCodec.INSTANCE); RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("list", IntegerCodec.INSTANCE);
@ -250,6 +288,58 @@ public class RedissonScoredSortedSetTest extends BaseTest {
assertThat(set).containsExactly("a", "b"); assertThat(set).containsExactly("a", "b");
} }
@Test
public void testPollLastAmount() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollLast(2)).isEmpty();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
assertThat(set.pollLast(2)).containsExactly("b", "c");
assertThat(set).containsExactly("a");
}
@Test
public void testPollLastTimeout() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollLast(1, TimeUnit.SECONDS)).isNull();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
assertThat(set.pollLast(1, TimeUnit.SECONDS)).isEqualTo("c");
assertThat(set).containsExactly("a", "b");
}
@Test
public void testPollFirstTimeout() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a");
assertThat(set).containsExactly("b", "c");
}
@Test
public void testPollFistAmount() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollFirst(2)).isEmpty();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
assertThat(set.pollFirst(2)).containsExactly("a", "b");
assertThat(set).containsExactly("c");
}
@Test @Test
public void testPollFirst() { public void testPollFirst() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple"); RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
@ -271,6 +361,9 @@ public class RedissonScoredSortedSetTest extends BaseTest {
set.add(0.3, "c"); set.add(0.3, "c");
set.add(0.4, "d"); set.add(0.4, "d");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
assertThat(set2.first()).isNull();
assertThat(set2.last()).isNull();
Assert.assertEquals("a", set.first()); Assert.assertEquals("a", set.first());
Assert.assertEquals("d", set.last()); Assert.assertEquals("d", set.last());
} }
@ -283,6 +376,9 @@ public class RedissonScoredSortedSetTest extends BaseTest {
set.add(0.3, "c"); set.add(0.3, "c");
set.add(0.4, "d"); set.add(0.4, "d");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
assertThat(set2.firstScore()).isNull();
assertThat(set2.lastScore()).isNull();
assertThat(set.firstScore()).isEqualTo(0.1); assertThat(set.firstScore()).isEqualTo(0.1);
assertThat(set.lastScore()).isEqualTo(0.4); assertThat(set.lastScore()).isEqualTo(0.4);
} }

@ -1,9 +1,12 @@
package org.redisson.transaction; package org.redisson.transaction;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.redisson.BaseTest; import org.redisson.BaseTest;
@ -17,6 +20,25 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest {
protected abstract RMap<String, String> getTransactionalMap(RTransaction transaction); protected abstract RMap<String, String> getTransactionalMap(RTransaction transaction);
@Test
public void testFastPut() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(16);
for (int i = 0; i < 500; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
RTransaction t = redisson.createTransaction(TransactionOptions.defaults());
RMap<String, String> map = getTransactionalMap(t);
map.fastPut("1", "1");
t.commit();
}
});
}
executor.shutdown();
assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
}
@Test @Test
public void testPutAll() { public void testPutAll() {
RMap<String, String> m = getMap(); RMap<String, String> m = getMap();

Loading…
Cancel
Save