diff --git a/README.md b/README.md index a5f3797f3..8e3b45ed4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -Redisson: Redis based In-Memory Data Grid for Java. +Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis client ==== -[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** +[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 6d43a5471..a5d0d66eb 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -92,7 +92,7 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonExecutorService implements RScheduledExecutorService { - private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class); private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS); @@ -250,6 +250,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" + "if retryInterval ~= false then " + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "for i = 1, #expiredTaskIds, 1 do " + "local name = expiredTaskIds[i];" + "local scheduledName = expiredTaskIds[i];" @@ -266,7 +267,12 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "if v[1] == expiredTaskIds[i] then " + "redis.call('publish', KEYS[3], startTime); " + "end;" - + "redis.call('rpush', KEYS[1], name);" + + + "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then " + + "redis.call('rpush', KEYS[1], name); " + + "else " + + "redis.call('lrem', KEYS[1], -1, name); " + + "end; " + "end; " + "else " + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" @@ -279,7 +285,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "end " + "return nil;", Arrays.asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), - System.currentTimeMillis(), 100); + System.currentTimeMillis(), 50); } }; queueTransferService.schedule(getName(), task); @@ -303,8 +309,6 @@ public class RedissonExecutorService implements RScheduledExecutorService { } }); } - - private long repeatInterval = 5000; @Override public void execute(Runnable task) { diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 927624228..369893943 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -70,9 +70,17 @@ public abstract class RedissonObject implements RObject { return "{" + name + "}:" + suffix; } - protected V get(RFuture future) { + protected final V get(RFuture future) { return commandExecutor.get(future); } + + protected final long toSeconds(long timeout, TimeUnit unit) { + long seconds = unit.toSeconds(timeout); + if (timeout != 0 && seconds == 0) { + seconds = 1; + } + return seconds; + } @Override public String getName() { diff --git a/redisson/src/main/java/org/redisson/RedissonQueue.java b/redisson/src/main/java/org/redisson/RedissonQueue.java index f48ac7abb..fa25de8c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonQueue.java @@ -68,14 +68,6 @@ public class RedissonQueue extends RedissonList implements RQueue { return value; } - protected long toSeconds(long timeout, TimeUnit unit) { - long seconds = unit.toSeconds(timeout); - if (timeout != 0 && seconds == 0) { - seconds = 1; - } - return seconds; - } - @Override public V remove() { return removeFirst(); diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 36a03e64d..ef5044c91 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.Set; import org.redisson.api.RFuture; @@ -36,6 +37,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScanCodec; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; @@ -89,25 +91,104 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return get(pollLastAsync()); } + @Override + public Collection pollFirst(int count) { + return get(pollFirstAsync(count)); + } + + @Override + public Collection pollLast(int count) { + return get(pollLastAsync(count)); + } + + @Override + public RFuture> pollFirstAsync(int count) { + if (count <= 0) { + return RedissonPromise.>newSucceededFuture(Collections.emptyList()); + } + + return poll(0, count-1, RedisCommands.EVAL_LIST); + } + + @Override + public RFuture> pollLastAsync(int count) { + if (count <= 0) { + return RedissonPromise.>newSucceededFuture(Collections.emptyList()); + } + return poll(-count, -1, RedisCommands.EVAL_LIST); + } + @Override public RFuture pollFirstAsync() { - return poll(0); + return poll(0, 0, RedisCommands.EVAL_FIRST_LIST); } @Override public RFuture pollLastAsync() { - return poll(-1); + return poll(-1, -1, RedisCommands.EVAL_FIRST_LIST); } - private RFuture poll(int index) { - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, + private RFuture poll(int from, int to, RedisCommand command) { + return commandExecutor.evalWriteAsync(getName(), codec, command, "local v = redis.call('zrange', KEYS[1], ARGV[1], ARGV[2]); " - + "if v[1] ~= nil then " + + "if #v > 0 then " + "redis.call('zremrangebyrank', KEYS[1], ARGV[1], ARGV[2]); " - + "return v[1]; " + + "return v; " + "end " - + "return nil;", - Collections.singletonList(getName()), index, index); + + "return v;", + Collections.singletonList(getName()), from, to); + } + + @Override + public V pollFirst(long timeout, TimeUnit unit) { + return get(pollFirstAsync(timeout, unit)); + } + + @Override + public RFuture pollFirstAsync(long timeout, TimeUnit unit) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit)); + } + + @Override + public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) { + return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(toSeconds(timeout, unit)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, params.toArray()); + } + + @Override + public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) { + return get(pollLastFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(toSeconds(timeout, unit)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, params.toArray()); + } + + @Override + public V pollLast(long timeout, TimeUnit unit) { + return get(pollLastAsync(timeout, unit)); + } + + @Override + public RFuture pollLastAsync(long timeout, TimeUnit unit) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, getName(), toSeconds(timeout, unit)); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RBitSetAsync.java b/redisson/src/main/java/org/redisson/api/RBitSetAsync.java index 6af40ffd6..ff4d592b5 100644 --- a/redisson/src/main/java/org/redisson/api/RBitSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBitSetAsync.java @@ -32,7 +32,6 @@ public interface RBitSetAsync extends RExpirableAsync { * Returns zero if there are no any set bit. * * @return "logical size" = index of highest set bit plus one - * @return void */ RFuture lengthAsync(); diff --git a/redisson/src/main/java/org/redisson/api/RBitSetReactive.java b/redisson/src/main/java/org/redisson/api/RBitSetReactive.java index b056df5a4..eac201faa 100644 --- a/redisson/src/main/java/org/redisson/api/RBitSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBitSetReactive.java @@ -20,6 +20,7 @@ import java.util.BitSet; import org.reactivestreams.Publisher; /** + * Vector of bits that grows as needed. * * @author Nikita Koksharov * @@ -30,36 +31,141 @@ public interface RBitSetReactive extends RExpirableReactive { Publisher toByteArray(); + /** + * Returns "logical size" = index of highest set bit plus one. + * Returns zero if there are no any set bit. + * + * @return "logical size" = index of highest set bit plus one + */ Publisher length(); + /** + * Set all bits to value from fromIndex (inclusive) to toIndex (exclusive) + * + * @param fromIndex inclusive + * @param toIndex exclusive + * @param value true = 1, false = 0 + * @return void + * + */ Publisher set(long fromIndex, long toIndex, boolean value); + /** + * Set all bits to zero from fromIndex (inclusive) to toIndex (exclusive) + * + * @param fromIndex inclusive + * @param toIndex exclusive + * @return void + * + */ Publisher clear(long fromIndex, long toIndex); + /** + * Copy bits state of source BitSet object to this object + * + * @param bs - BitSet source + * @return void + */ Publisher set(BitSet bs); + /** + * Executes NOT operation over all bits + * + * @return void + */ Publisher not(); + /** + * Set all bits to one from fromIndex (inclusive) to toIndex (exclusive) + * + * @param fromIndex inclusive + * @param toIndex exclusive + * @return void + */ Publisher set(long fromIndex, long toIndex); + /** + * Returns number of set bits. + * + * @return number of set bits. + */ Publisher size(); + /** + * Returns true if bit set to one and false overwise. + * + * @param bitIndex - index of bit + * @return true if bit set to one and false overwise. + */ Publisher get(long bitIndex); + /** + * Set bit to one at specified bitIndex + * + * @param bitIndex - index of bit + * @return true - if previous value was true, + * false - if previous value was false + */ Publisher set(long bitIndex); + /** + * Set bit to value at specified bitIndex + * + * @param bitIndex - index of bit + * @param value true = 1, false = 0 + * @return true - if previous value was true, + * false - if previous value was false + */ Publisher set(long bitIndex, boolean value); + /** + * Returns the number of bits set to one. + * + * @return number of bits + */ Publisher cardinality(); + /** + * Set bit to zero at specified bitIndex + * + * @param bitIndex - index of bit + * @return true - if previous value was true, + * false - if previous value was false + */ Publisher clear(long bitIndex); + /** + * Set all bits to zero + * + * @return void + */ Publisher clear(); + /** + * Executes OR operation over this object and specified bitsets. + * Stores result into this object. + * + * @param bitSetNames - name of stored bitsets + * @return void + */ Publisher or(String... bitSetNames); + /** + * Executes AND operation over this object and specified bitsets. + * Stores result into this object. + * + * @param bitSetNames - name of stored bitsets + * @return void + */ Publisher and(String... bitSetNames); + /** + * Executes XOR operation over this object and specified bitsets. + * Stores result into this object. + * + * @param bitSetNames - name of stored bitsets + * @return void + */ Publisher xor(String... bitSetNames); } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 518178702..9eae10c10 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.protocol.ScoredEntry; @@ -45,16 +46,119 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ RCollectionMapReduce mapReduce(); + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + */ + V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + V pollFirst(long timeout, TimeUnit unit); + + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + V pollLast(long timeout, TimeUnit unit); + + /** + * Removes and returns the head elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the head element, + * or {@code null} if this sorted set is empty + */ + Collection pollFirst(int count); + + /** + * Removes and returns the tail elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the tail element or {@code null} if this sorted set is empty + */ + Collection pollLast(int count); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element, + * or {@code null} if this sorted set is empty + */ V pollFirst(); + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ V pollLast(); + /** + * Returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element or {@code null} if this sorted set is empty + */ V first(); + /** + * Returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ V last(); - + + /** + * Returns score of the tail element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ Double firstScore(); - + + /** + * Returns score of the head element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ Double lastScore(); Long addAll(Map objects); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index c2abadf63..7b6cead46 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; @@ -30,16 +31,124 @@ import org.redisson.client.protocol.ScoredEntry; */ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsync> { - RFuture pollLastAsync(); + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + * + */ + RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + RFuture pollFirstAsync(long timeout, TimeUnit unit); + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture pollLastAsync(long timeout, TimeUnit unit); + + /** + * Removes and returns the head elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the head element, + * or {@code null} if this sorted set is empty + */ + RFuture> pollFirstAsync(int count); + + /** + * Removes and returns the tail elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture> pollLastAsync(int count); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element, + * or {@code null} if this sorted set is empty + */ RFuture pollFirstAsync(); + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture pollLastAsync(); + + /** + * Returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element or {@code null} if this sorted set is empty + */ RFuture firstAsync(); + /** + * Returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ RFuture lastAsync(); - + + /** + * Returns score of the head element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ RFuture firstScoreAsync(); - + + /** + * Returns score of the tail element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ RFuture lastScoreAsync(); RFuture addAllAsync(Map objects); @@ -48,8 +157,20 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn RFuture removeRangeByRankAsync(int startIndex, int endIndex); + /** + * Returns rank of value, with the scores ordered from low to high. + * + * @param o - object + * @return rank or null if value does not exist + */ RFuture rankAsync(V o); + /** + * Returns rank of value, with the scores ordered from high to low. + * + * @param o - object + * @return rank or null if value does not exist + */ RFuture revRankAsync(V o); /** @@ -90,7 +211,7 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn /** * Adds element to this set only if has not been added before. *

- * Works only with Redis 3.0.2 and higher. + * Requires Redis 3.0.2 and higher. * * @param score - object score * @param object - object itself diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 7b225ddaa..ec60ff7f2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -24,7 +24,6 @@ import java.util.Set; import org.redisson.api.RType; import org.redisson.client.protocol.RedisCommand.ValueType; -import org.redisson.client.protocol.convertor.BitSetReplayConvertor; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; @@ -36,13 +35,14 @@ import org.redisson.client.protocol.convertor.DoubleNullSafeReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor; -import org.redisson.client.protocol.convertor.TimeObjectDecoder; import org.redisson.client.protocol.convertor.LongReplayConvertor; +import org.redisson.client.protocol.convertor.TimeObjectDecoder; import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.TypeConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; +import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResult; @@ -51,12 +51,12 @@ import org.redisson.client.protocol.decoder.Long2MultiDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; -import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; @@ -93,8 +93,6 @@ public interface RedisCommands { RedisStrictCommand BITPOS = new RedisStrictCommand("BITPOS", new IntegerReplayConvertor()); RedisStrictCommand SETBIT_VOID = new RedisStrictCommand("SETBIT", new VoidReplayConvertor()); RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BooleanReplayConvertor()); - RedisStrictCommand SETBIT_TRUE = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor(0)); - RedisStrictCommand SETBIT_FALSE = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor(1)); RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor()); RedisStrictCommand WAIT = new RedisStrictCommand("WAIT", new IntegerReplayConvertor()); @@ -119,9 +117,11 @@ public interface RedisCommands { RedisStrictCommand ZSCORE = new RedisStrictCommand("ZSCORE", new DoubleReplayConvertor()); RedisCommand ZRANK_INT = new RedisCommand("ZRANK", new IntegerReplayConvertor()); RedisCommand ZREVRANK_INT = new RedisCommand("ZREVRANK", new IntegerReplayConvertor()); - RedisCommand ZRANGE_SINGLE = new RedisCommand("ZRANGE", new ObjectFirstResultReplayDecoder()); + RedisCommand ZRANGE_SINGLE = new RedisCommand("ZRANGE", new ListFirstObjectDecoder()); RedisStrictCommand ZRANGE_SINGLE_SCORE = new RedisStrictCommand("ZRANGE", new ObjectFirstScoreReplayDecoder()); RedisCommand> ZRANGE = new RedisCommand>("ZRANGE", new ObjectListReplayDecoder()); + RedisCommand> ZPOPMIN = new RedisCommand>("ZPOPMIN", new ObjectListReplayDecoder()); + RedisCommand> ZPOPMAX = new RedisCommand>("ZPOPMAX", new ObjectListReplayDecoder()); RedisStrictCommand ZREMRANGEBYRANK = new RedisStrictCommand("ZREMRANGEBYRANK", new IntegerReplayConvertor()); RedisStrictCommand ZREMRANGEBYSCORE = new RedisStrictCommand("ZREMRANGEBYSCORE", new IntegerReplayConvertor()); RedisStrictCommand ZREMRANGEBYLEX = new RedisStrictCommand("ZREMRANGEBYLEX", new IntegerReplayConvertor()); @@ -191,9 +191,11 @@ public interface RedisCommands { RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); + RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); + RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); Set BLOCKING_COMMANDS = new HashSet( - Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName())); + Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName())); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor()); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); @@ -228,6 +230,7 @@ public interface RedisCommands { RedisStrictCommand EVAL_LONG = new RedisStrictCommand("EVAL"); RedisStrictCommand EVAL_LONG_SAFE = new RedisStrictCommand("EVAL", new LongReplayConvertor()); RedisStrictCommand EVAL_VOID = new RedisStrictCommand("EVAL", new VoidReplayConvertor()); + RedisCommand EVAL_FIRST_LIST = new RedisCommand("EVAL", new ListFirstObjectDecoder()); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); RedisCommand> EVAL_SET = new RedisCommand>("EVAL", new ObjectSetReplayDecoder()); RedisCommand EVAL_OBJECT = new RedisCommand("EVAL"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java deleted file mode 100644 index 31bfa6294..000000000 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.convertor; - -/** - * - * @author Nikita Koksharov - * - */ -public class BitSetReplayConvertor extends SingleConvertor { - - private final int expectedValue; - - public BitSetReplayConvertor(int expectedValue) { - super(); - this.expectedValue = expectedValue; - } - - @Override - public Boolean convert(Object obj) { - return expectedValue == (Long)obj; - } - - -} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java index f8576b88e..e8ab3db02 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java @@ -23,6 +23,7 @@ import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterSlotRange; +import org.redisson.cluster.ClusterNodeInfo.Flag; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -53,18 +54,20 @@ public class ClusterNodesDecoder implements Decoder> { String nodeId = params[0]; node.setNodeId(nodeId); - String protocol = "redis://"; - if (ssl) { - protocol = "rediss://"; - } - String addr = protocol + params[1].split("@")[0]; - node.setAddress(addr); - String flags = params[2]; for (String flag : flags.split(",")) { String flagValue = flag.toUpperCase().replaceAll("\\?", ""); node.addFlag(ClusterNodeInfo.Flag.valueOf(flagValue)); } + + if (!node.containsFlag(Flag.NOADDR)) { + String protocol = "redis://"; + if (ssl) { + protocol = "rediss://"; + } + String addr = protocol + params[1].split("@")[0]; + node.setAddress(addr); + } String slaveOf = params[3]; if (!"-".equals(slaveOf)) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java index a6cc0ff0a..3e886c541 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java @@ -39,6 +39,9 @@ public class ObjectFirstScoreReplayDecoder implements MultiDecoder { @Override public Double decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } return (Double) parts.get(1); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java similarity index 63% rename from redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java rename to redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java index 1466652a4..98c84a652 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java @@ -17,6 +17,8 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -24,17 +26,25 @@ import org.redisson.client.protocol.Decoder; * * @author Nikita Koksharov * - * @param type */ -public class ObjectFirstResultReplayDecoder implements MultiDecoder { +public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder { @Override - public T decode(List parts, State state) { - return (T) parts.get(0); + public Object decode(List parts, State state) { + if (!parts.isEmpty()) { + return parts.get(2); + } + return null; } @Override public Decoder getDecoder(int paramNum, State state) { + if (paramNum == 0) { + return StringCodec.INSTANCE.getValueDecoder(); + } + if (paramNum == 1) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } return null; } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 20d1f28be..dd5585e50 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -288,7 +288,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void run() { if (isConfigEndpoint) { final URI uri = cfg.getNodeAddresses().iterator().next(); - final AddressResolver resolver = createResolverGroup().getResolver(getGroup().next()); + final AddressResolver resolver = resolverGroup.getResolver(getGroup().next()); Future> allNodes = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); allNodes.addListener(new FutureListener>() { @Override diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java b/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java index 03e622c3e..c37d9225b 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java @@ -16,6 +16,7 @@ package org.redisson.cluster; import java.net.URI; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import org.redisson.misc.URIBuilder; @@ -33,7 +34,7 @@ public class ClusterNodeInfo { private String nodeId; private URI address; - private final Set flags = new HashSet(); + private final Set flags = EnumSet.noneOf(Flag.class); private String slaveOf; private final Set slotRanges = new HashSet(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index e9150eb58..fb1e0693b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -211,19 +211,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.commandExecutor = new CommandSyncService(this); } - /* - * Remove it once https://github.com/netty/netty/issues/7882 get resolved - */ - protected DnsAddressResolverGroup createResolverGroup() { - if (cfg.getTransportMode() == TransportMode.EPOLL) { - return cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); - } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { - return cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); - } - - return cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); - } - protected void closeNodeConnections() { List> futures = new ArrayList>(); for (RedisConnection connection : nodeConnections.values()) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 5ea876118..ae1c801cd 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -32,7 +32,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; @@ -249,7 +248,7 @@ public class MasterSlaveEntry { return; } - RFuture newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE); + RFuture newConnection = connectionWriteOp(commandData.getCommand()); newConnection.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -263,7 +262,7 @@ public class MasterSlaveEntry { final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - releaseRead(newConnection); + releaseWrite(newConnection); } }; commandData.getPromise().addListener(listener); @@ -277,7 +276,7 @@ public class MasterSlaveEntry { if (!future.isSuccess()) { listener.operationComplete(null); commandData.getPromise().removeListener(listener); - releaseRead(newConnection); + releaseWrite(newConnection); log.error("Can't resubscribe blocking queue {}", commandData); } } diff --git a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java index 341fb7435..71b2ce14e 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -37,6 +37,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { private static MasterSlaveServersConfig create(SingleServerConfig cfg) { MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig(); + newconfig.setPingConnectionInterval(cfg.getPingConnectionInterval()); newconfig.setSslEnableEndpointIdentification(cfg.isSslEnableEndpointIdentification()); newconfig.setSslProvider(cfg.getSslProvider()); newconfig.setSslTruststore(cfg.getSslTruststore()); diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java index 08b023799..1ed966325 100644 --- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -35,27 +35,16 @@ public class LockPubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { - value.getLatch().release(); - while (true) { - Runnable runnableToExecute = null; - synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnableToExecute = runnable; - } else { - value.addListener(runnable); - } - } + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute == null) { + break; } - if (runnableToExecute != null) { - runnableToExecute.run(); - } else { - return; - } + runnableToExecute.run(); } + + value.getLatch().release(); } } diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java index 31268d737..c76a886f2 100644 --- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -32,27 +32,16 @@ public class SemaphorePubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { - value.getLatch().release(message.intValue()); - while (true) { - Runnable runnableToExecute = null; - synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnableToExecute = runnable; - } else { - value.addListener(runnable); - } - } + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute == null) { + break; } - if (runnableToExecute != null) { - runnableToExecute.run(); - } else { - return; - } + runnableToExecute.run(); } + + value.getLatch().release(message.intValue()); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 7b7ec8c78..ed9ab52ea 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -85,7 +85,12 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sizeAsync(); + } + }); } @Override diff --git a/redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd b/redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd new file mode 100644 index 000000000..9123f4ce2 --- /dev/null +++ b/redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd @@ -0,0 +1,2427 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + netty-tcnative lib is required to be in classpath. + ]]> + + + + + + + + + + + + + + + <qualifier> is not used. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + timeout time + and current connections amount bigger than minimum idle connections pool + size, then it will closed and removed from pool. + Value in milliseconds. + + Default: 10000 + ]]> + + + + + Node.ping and Node.pingAll + operation. Value in milliseconds. + + Default: 1000 + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + slaveFailsInterval value. + + Default is 60000 + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + each slave + node. + + Default: 10 + ]]> + + + + + each slave + node. + + Default: 64 + ]]> + + + + + each slave + node. + + Default: 10 + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + NB: applications must ensure the JVM DNS cache TTL is low enough to + support this. e.g., http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-jvm-ttl.html + + Default: false + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true then invalidation + message which removes corresponding entry from cache will be sent to all + other RLocalCachedMap instances on each entry update/remove operation. + if false then invalidation message won't be sent. + ]]> + + + + + LRU - uses cache with LRU (least recently used) eviction + policy. +

LFU - uses cache with LFU (least frequently used) + eviction policy. +

SOFT - uses cache with soft references. The garbage + collector will evict items from the cache when the JVM is + running out of memory. JVM flag -XX:SoftRefLRUPolicyMSPerMB=??? + is required to function. +

NONE - doesn't use eviction policy, but timeToLive and + maxIdleTime params are still working. + ]]> + + + + + 0 then local cache is unbounded. + ]]> + + + + + 0 then timeout is not applied. + + Default unit is MILLISECONDS + ]]> + + + + + + + + + + 0 then timeout is not applied. + + Default unit is MILLISECONDS + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + NOT mandatory + since the class will also be registered lazily when it is first used. + + All classed registered with the service is stored in a class cache. + + The cache is independent between different RedissonClient instances. When + a class is registered in one RLiveObjectService instance it is also + accessible in another RLiveObjectService instance so long as they are + created by the same RedissonClient instance. + ]]> + + + + + + + + + + + + + + + + + + NOT mandatory + since the class will also be registered lazily when it is first used. + + All classed registered with the service is stored in a class cache. + + The cache is independent between different RedissonClient instances. When + a class is registered in one RLiveObjectService instance it is also + accessible in another RLiveObjectService instance so long as they are + created by the same RedissonClient instance. + + One of "object-id" or "object-id-ref" attribute is required. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Set eviction + + Redisson distributed Set for Java with eviction support implemented by + separate RSetCache object which extends RSet interface. It also + implements java.util.Set interface. + + Current redis implementation doesn't has set value eviction + functionality. Therefore expired values are cleaned by + org.redisson.EvictionScheduler. It removes 100 expired values at once. + Task launch time tuned automatically and depends on expired entries + amount deleted in previous time and varies between 1 second to 2 hours. + Thus if clean task deletes 100 values each time it will be executed + every second (minimum execution delay). But if current expired values + amount is lower than previous one then execution delay will be increased + by 1.5 times. + ]]> + + + + + + + + + + + Map eviction + + Redisson distributed Map for Java with eviction support implemented by + separate RMapCache object which extends RMap interface. It keeps + elements in insertion order and implements + java.util.concurrent.ConcurrentMap and java.util.Map interfaces. + Redisson has a Spring Cache integration which based on Map and MapCache + objects. + + Current redis implementation doesn't has map entry eviction + functionality. Therefore expired entries are cleaned by + org.redisson.EvictionScheduler. It removes 100 expired entries at once. + Task launch time tuned automatically and depends on expired entries + amount deleted in previous time and varies between 1 second to 2 hours. + Thus if clean task deletes 100 entries each time it will be executed + every second (minimum execution delay). But if current expired entries + amount is lower than previous one then execution delay will be increased + by 1.5 times. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Map local cache + + In case when a Map is used mostly for read operations and/or network + roundtrips are undesirable. Redisson offers RLocalCachedMap object which + caches Map entries on Redisson side. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + interface. + Keeps elements uniqueness via element state comparison. + ]]> + + + + + + + + + + + interface. + Keeps elements uniqueness via element state comparison. + ]]> + + + + + + + + + + + + + + + + + interface. + Keeps elements uniqueness via element state comparison. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Live distributed object (also abbreviated as live object) refers to a + running instance of a distributed multi-party (or peer-to-peer) protocol, + viewed from the object-oriented perspective, as an entity that has a + distinct identity, may encapsulate internal state and threads of + execution, and that exhibits a well-defined externally visible behavior. + + + Redisson Live Object (RLO) realised this idea by mapping all the fields + inside a Java class to a redis hash through a runtime-constructed proxy + class. All the get/set methods of each field are translated to hget/hset + commands operated on the redis hash, making it accessable to/from any + clients connected to the same redis server. As we all know, the field + values of an object represent its state; having them stored in a remote + repository, redis, makes it a distributed object. This object is a + Redisson Live Object. + + By using RLO, sharing an object between applications and/or servers is + the same as sharing one in a standalone application. This removes the + need for serialization and deserialization, and at the same time reduces + the complexity of the programming model: Changes made to one field + is (almost^) immediately accessable to other processes, applications and + servers. (^Redis' eventual consistant replication rule still applies + when connected to slave nodes) + + Since the redis server is a single-threaded application, all field + access to the live object is automatically executed in atomic fashion: a + value will not be changed when you are reading it. + + With RLO, you can treat the redis server as a shared Heap space for all + connected JVMs. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Define and create a Redisson instance. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Define and create a RedisClient instance. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java index aa57a684f..afe7fc356 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java @@ -16,7 +16,7 @@ public class RedissonBlockingDequeTest extends BaseTest { RBlockingDeque blockingDeque = redisson.getBlockingDeque("blocking_deque"); long start = System.currentTimeMillis(); String redisTask = blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS); - assertThat(System.currentTimeMillis() - start).isBetween(950L, 1050L); + assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L); assertThat(redisTask).isNull(); } diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java index 15b997fbd..55f044b38 100644 --- a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -35,7 +35,7 @@ public class RedissonRateLimiterTest extends BaseTest { } @Test - public void test3() throws InterruptedException { + public void testConcurrency() throws InterruptedException { RRateLimiter rr = redisson.getRateLimiter("test"); assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue(); assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse(); @@ -55,7 +55,8 @@ public class RedissonRateLimiterTest extends BaseTest { } } try { - Thread.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); + } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index fbc23b52d..ff7c6abdb 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -12,6 +12,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Assume; @@ -28,6 +30,42 @@ import org.redisson.client.protocol.ScoredEntry; public class RedissonScoredSortedSetTest extends BaseTest { + @Test + public void testPollFirstFromAny() throws InterruptedException { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue3.add(0.1, 2); + queue1.add(0.1, 1); + queue2.add(0.1, 3); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.pollFirstFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + + @Test + public void testPollLastFromAny() throws InterruptedException { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue3.add(0.1, 2); + queue1.add(0.1, 1); + queue2.add(0.1, 3); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.pollLastFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + @Test public void testSortOrder() { RScoredSortedSet set = redisson.getScoredSortedSet("list", IntegerCodec.INSTANCE); @@ -249,7 +287,59 @@ public class RedissonScoredSortedSetTest extends BaseTest { Assert.assertEquals("c", set.pollLast()); assertThat(set).containsExactly("a", "b"); } + + @Test + public void testPollLastAmount() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollLast(2)).isEmpty(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollLast(2)).containsExactly("b", "c"); + assertThat(set).containsExactly("a"); + } + + @Test + public void testPollLastTimeout() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollLast(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollLast(1, TimeUnit.SECONDS)).isEqualTo("c"); + assertThat(set).containsExactly("a", "b"); + } + + @Test + public void testPollFirstTimeout() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a"); + assertThat(set).containsExactly("b", "c"); + } + + @Test + public void testPollFistAmount() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(2)).isEmpty(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + assertThat(set.pollFirst(2)).containsExactly("a", "b"); + assertThat(set).containsExactly("c"); + } + @Test public void testPollFirst() { RScoredSortedSet set = redisson.getScoredSortedSet("simple"); @@ -271,6 +361,9 @@ public class RedissonScoredSortedSetTest extends BaseTest { set.add(0.3, "c"); set.add(0.4, "d"); + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + assertThat(set2.first()).isNull(); + assertThat(set2.last()).isNull(); Assert.assertEquals("a", set.first()); Assert.assertEquals("d", set.last()); } @@ -283,6 +376,9 @@ public class RedissonScoredSortedSetTest extends BaseTest { set.add(0.3, "c"); set.add(0.4, "d"); + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + assertThat(set2.firstScore()).isNull(); + assertThat(set2.lastScore()).isNull(); assertThat(set.firstScore()).isEqualTo(0.1); assertThat(set.lastScore()).isEqualTo(0.4); } diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java index 6e9d521aa..27c7c4240 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java @@ -1,9 +1,12 @@ package org.redisson.transaction; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.redisson.BaseTest; @@ -17,6 +20,25 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest { protected abstract RMap getTransactionalMap(RTransaction transaction); + @Test + public void testFastPut() throws InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(16); + for (int i = 0; i < 500; i++) { + executor.submit(() -> { + for (int j = 0; j < 100; j++) { + RTransaction t = redisson.createTransaction(TransactionOptions.defaults()); + RMap map = getTransactionalMap(t); + map.fastPut("1", "1"); + t.commit(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + } + + @Test public void testPutAll() { RMap m = getMap();