From cac13b7653c0a6615a9fcece885479c921e34615 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 24 May 2018 10:20:26 +0300 Subject: [PATCH 01/16] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a5f3797f3..ab7390e10 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Redisson: Redis based In-Memory Data Grid for Java. +Redisson: Redis based In-Memory Data Grid for Java. Leading Redis client ==== [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** From a9d2956a798c9580ec896d1a6d07b5f8fc63617b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 24 May 2018 10:28:24 +0300 Subject: [PATCH 02/16] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ab7390e10..8f0cd4307 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Redisson: Redis based In-Memory Data Grid for Java. Leading Redis client +Redisson: Redis based In-Memory Data Grid for Java. State of the Art Redis client ==== [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** From 1c4daa684a28ca1fc9e4df0bdfbb8148d04742b8 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 24 May 2018 10:28:41 +0300 Subject: [PATCH 03/16] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8f0cd4307..f2b596f83 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Redisson: Redis based In-Memory Data Grid for Java. State of the Art Redis client +Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis client ==== [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** From 6f148f6443aff2c0cf2479c55014beb977adbe03 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 24 May 2018 14:57:38 +0300 Subject: [PATCH 04/16] RScoredSortedSet.pollFirst and pollLast with count parameter added --- .../org/redisson/RedissonScoredSortedSet.java | 44 ++++++++++--- .../org/redisson/api/RScoredSortedSet.java | 52 ++++++++++++++- .../redisson/api/RScoredSortedSetAsync.java | 66 ++++++++++++++++++- .../client/protocol/RedisCommands.java | 9 ++- .../ObjectFirstResultReplayDecoder.java | 41 ------------ .../ObjectFirstScoreReplayDecoder.java | 3 + .../redisson/RedissonScoredSortedSetTest.java | 32 +++++++++ 7 files changed, 190 insertions(+), 57 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 36a03e64d..2decb2198 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -36,6 +36,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScanCodec; +import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; @@ -89,25 +90,52 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return get(pollLastAsync()); } + @Override + public Collection pollFirst(int count) { + return get(pollFirstAsync(count)); + } + + @Override + public Collection pollLast(int count) { + return get(pollLastAsync(count)); + } + + @Override + public RFuture> pollFirstAsync(int count) { + if (count <= 0) { + return RedissonPromise.>newSucceededFuture(Collections.emptyList()); + } + + return poll(0, count-1, RedisCommands.EVAL_LIST); + } + + @Override + public RFuture> pollLastAsync(int count) { + if (count <= 0) { + return RedissonPromise.>newSucceededFuture(Collections.emptyList()); + } + return poll(-count, -1, RedisCommands.EVAL_LIST); + } + @Override public RFuture pollFirstAsync() { - return poll(0); + return poll(0, 0, RedisCommands.EVAL_FIRST_LIST); } @Override public RFuture pollLastAsync() { - return poll(-1); + return poll(-1, -1, RedisCommands.EVAL_FIRST_LIST); } - private RFuture poll(int index) { - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, + private RFuture poll(int from, int to, RedisCommand command) { + return commandExecutor.evalWriteAsync(getName(), codec, command, "local v = redis.call('zrange', KEYS[1], ARGV[1], ARGV[2]); " - + "if v[1] ~= nil then " + + "if #v > 0 then " + "redis.call('zremrangebyrank', KEYS[1], ARGV[1], ARGV[2]); " - + "return v[1]; " + + "return v; " + "end " - + "return nil;", - Collections.singletonList(getName()), index, index); + + "return v;", + Collections.singletonList(getName()), from, to); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 518178702..7b107bd84 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -45,16 +45,64 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ RCollectionMapReduce mapReduce(); + /** + * Removes and returns the head elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the head element, + * or {@code null} if this sorted set is empty + */ + Collection pollFirst(int count); + + /** + * Removes and returns the tail elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the tail element or {@code null} if this sorted set is empty + */ + Collection pollLast(int count); + + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element, + * or {@code null} if this sorted set is empty + */ V pollFirst(); + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ V pollLast(); + /** + * Returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element or {@code null} if this sorted set is empty + */ V first(); + /** + * Returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ V last(); - + + /** + * Returns score of the tail element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ Double firstScore(); - + + /** + * Returns score of the head element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ Double lastScore(); Long addAll(Map objects); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index c2abadf63..87d9e67ec 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -30,16 +30,64 @@ import org.redisson.client.protocol.ScoredEntry; */ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsync> { - RFuture pollLastAsync(); + /** + * Removes and returns the head elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the head element, + * or {@code null} if this sorted set is empty + */ + RFuture> pollFirstAsync(int count); + + /** + * Removes and returns the tail elements or {@code null} if this sorted set is empty. + * + * @param count - elements amount + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture> pollLastAsync(int count); + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element, + * or {@code null} if this sorted set is empty + */ RFuture pollFirstAsync(); + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture pollLastAsync(); + + /** + * Returns the head element or {@code null} if this sorted set is empty. + * + * @return the head element or {@code null} if this sorted set is empty + */ RFuture firstAsync(); + /** + * Returns the tail element or {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ RFuture lastAsync(); - + + /** + * Returns score of the head element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ RFuture firstScoreAsync(); - + + /** + * Returns score of the tail element or returns {@code null} if this sorted set is empty. + * + * @return the tail element or {@code null} if this sorted set is empty + */ RFuture lastScoreAsync(); RFuture addAllAsync(Map objects); @@ -48,8 +96,20 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn RFuture removeRangeByRankAsync(int startIndex, int endIndex); + /** + * Returns rank of value, with the scores ordered from low to high. + * + * @param o - object + * @return rank or null if value does not exist + */ RFuture rankAsync(V o); + /** + * Returns rank of value, with the scores ordered from high to low. + * + * @param o - object + * @return rank or null if value does not exist + */ RFuture revRankAsync(V o); /** diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 7b225ddaa..ea2efa058 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -36,13 +36,14 @@ import org.redisson.client.protocol.convertor.DoubleNullSafeReplayConvertor; import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor; -import org.redisson.client.protocol.convertor.TimeObjectDecoder; import org.redisson.client.protocol.convertor.LongReplayConvertor; +import org.redisson.client.protocol.convertor.TimeObjectDecoder; import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.TypeConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; +import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResult; @@ -51,7 +52,6 @@ import org.redisson.client.protocol.decoder.Long2MultiDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; -import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; @@ -119,9 +119,11 @@ public interface RedisCommands { RedisStrictCommand ZSCORE = new RedisStrictCommand("ZSCORE", new DoubleReplayConvertor()); RedisCommand ZRANK_INT = new RedisCommand("ZRANK", new IntegerReplayConvertor()); RedisCommand ZREVRANK_INT = new RedisCommand("ZREVRANK", new IntegerReplayConvertor()); - RedisCommand ZRANGE_SINGLE = new RedisCommand("ZRANGE", new ObjectFirstResultReplayDecoder()); + RedisCommand ZRANGE_SINGLE = new RedisCommand("ZRANGE", new ListFirstObjectDecoder()); RedisStrictCommand ZRANGE_SINGLE_SCORE = new RedisStrictCommand("ZRANGE", new ObjectFirstScoreReplayDecoder()); RedisCommand> ZRANGE = new RedisCommand>("ZRANGE", new ObjectListReplayDecoder()); + RedisCommand> ZPOPMIN = new RedisCommand>("ZPOPMIN", new ObjectListReplayDecoder()); + RedisCommand> ZPOPMAX = new RedisCommand>("ZPOPMAX", new ObjectListReplayDecoder()); RedisStrictCommand ZREMRANGEBYRANK = new RedisStrictCommand("ZREMRANGEBYRANK", new IntegerReplayConvertor()); RedisStrictCommand ZREMRANGEBYSCORE = new RedisStrictCommand("ZREMRANGEBYSCORE", new IntegerReplayConvertor()); RedisStrictCommand ZREMRANGEBYLEX = new RedisStrictCommand("ZREMRANGEBYLEX", new IntegerReplayConvertor()); @@ -228,6 +230,7 @@ public interface RedisCommands { RedisStrictCommand EVAL_LONG = new RedisStrictCommand("EVAL"); RedisStrictCommand EVAL_LONG_SAFE = new RedisStrictCommand("EVAL", new LongReplayConvertor()); RedisStrictCommand EVAL_VOID = new RedisStrictCommand("EVAL", new VoidReplayConvertor()); + RedisCommand EVAL_FIRST_LIST = new RedisCommand("EVAL", new ListFirstObjectDecoder()); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); RedisCommand> EVAL_SET = new RedisCommand>("EVAL", new ObjectSetReplayDecoder()); RedisCommand EVAL_OBJECT = new RedisCommand("EVAL"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java deleted file mode 100644 index 1466652a4..000000000 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstResultReplayDecoder.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.decoder; - -import java.util.List; - -import org.redisson.client.handler.State; -import org.redisson.client.protocol.Decoder; - -/** - * - * @author Nikita Koksharov - * - * @param type - */ -public class ObjectFirstResultReplayDecoder implements MultiDecoder { - - @Override - public T decode(List parts, State state) { - return (T) parts.get(0); - } - - @Override - public Decoder getDecoder(int paramNum, State state) { - return null; - } - -} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java index a6cc0ff0a..3e886c541 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectFirstScoreReplayDecoder.java @@ -39,6 +39,9 @@ public class ObjectFirstScoreReplayDecoder implements MultiDecoder { @Override public Double decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } return (Double) parts.get(1); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index fbc23b52d..21405cfdb 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -249,7 +249,33 @@ public class RedissonScoredSortedSetTest extends BaseTest { Assert.assertEquals("c", set.pollLast()); assertThat(set).containsExactly("a", "b"); } + + @Test + public void testPollLastAmount() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollLast(2)).isEmpty(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollLast(2)).containsExactly("b", "c"); + assertThat(set).containsExactly("a"); + } + + @Test + public void testPollFistAmount() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(2)).isEmpty(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + assertThat(set.pollFirst(2)).containsExactly("a", "b"); + assertThat(set).containsExactly("c"); + } + @Test public void testPollFirst() { RScoredSortedSet set = redisson.getScoredSortedSet("simple"); @@ -271,6 +297,9 @@ public class RedissonScoredSortedSetTest extends BaseTest { set.add(0.3, "c"); set.add(0.4, "d"); + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + assertThat(set2.first()).isNull(); + assertThat(set2.last()).isNull(); Assert.assertEquals("a", set.first()); Assert.assertEquals("d", set.last()); } @@ -283,6 +312,9 @@ public class RedissonScoredSortedSetTest extends BaseTest { set.add(0.3, "c"); set.add(0.4, "d"); + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + assertThat(set2.firstScore()).isNull(); + assertThat(set2.lastScore()).isNull(); assertThat(set.firstScore()).isEqualTo(0.1); assertThat(set.lastScore()).isEqualTo(0.4); } From 3b4f11cc0365b537ddda73026b7848bf87f1ebed Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 24 May 2018 16:39:29 +0300 Subject: [PATCH 05/16] RScoredSortedSet.pollFirst and pollLast methods with timeout added. #1452 --- .../java/org/redisson/RedissonObject.java | 10 +++- .../main/java/org/redisson/RedissonQueue.java | 8 --- .../org/redisson/RedissonScoredSortedSet.java | 21 ++++++++ .../org/redisson/api/RScoredSortedSet.java | 24 +++++++++ .../redisson/api/RScoredSortedSetAsync.java | 24 +++++++++ .../client/protocol/RedisCommands.java | 8 +-- .../convertor/BitSetReplayConvertor.java | 38 -------------- .../ScoredSortedSetPolledObjectDecoder.java | 51 +++++++++++++++++++ .../redisson/RedissonScoredSortedSetTest.java | 27 ++++++++++ 9 files changed, 160 insertions(+), 51 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 927624228..369893943 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -70,9 +70,17 @@ public abstract class RedissonObject implements RObject { return "{" + name + "}:" + suffix; } - protected V get(RFuture future) { + protected final V get(RFuture future) { return commandExecutor.get(future); } + + protected final long toSeconds(long timeout, TimeUnit unit) { + long seconds = unit.toSeconds(timeout); + if (timeout != 0 && seconds == 0) { + seconds = 1; + } + return seconds; + } @Override public String getName() { diff --git a/redisson/src/main/java/org/redisson/RedissonQueue.java b/redisson/src/main/java/org/redisson/RedissonQueue.java index f48ac7abb..fa25de8c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonQueue.java @@ -68,14 +68,6 @@ public class RedissonQueue extends RedissonList implements RQueue { return value; } - protected long toSeconds(long timeout, TimeUnit unit) { - long seconds = unit.toSeconds(timeout); - if (timeout != 0 && seconds == 0) { - seconds = 1; - } - return seconds; - } - @Override public V remove() { return removeFirst(); diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 2decb2198..ca8b4a84b 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.Set; import org.redisson.api.RFuture; @@ -138,6 +139,26 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc Collections.singletonList(getName()), from, to); } + @Override + public V pollFirst(long timeout, TimeUnit unit) { + return get(pollFirstAsync(timeout, unit)); + } + + @Override + public RFuture pollFirstAsync(long timeout, TimeUnit unit) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit)); + } + + @Override + public V pollLast(long timeout, TimeUnit unit) { + return get(pollLastAsync(timeout, unit)); + } + + @Override + public RFuture pollLastAsync(long timeout, TimeUnit unit) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, getName(), toSeconds(timeout, unit)); + } + @Override public boolean add(double score, V object) { return get(addAsync(score, object)); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 7b107bd84..4750aa576 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.protocol.ScoredEntry; @@ -45,6 +46,29 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ RCollectionMapReduce mapReduce(); + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + V pollFirst(long timeout, TimeUnit unit); + + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + V pollLast(long timeout, TimeUnit unit); + /** * Removes and returns the head elements or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 87d9e67ec..fa034e916 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; @@ -30,6 +31,29 @@ import org.redisson.client.protocol.ScoredEntry; */ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsync> { + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + RFuture pollFirstAsync(long timeout, TimeUnit unit); + + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture pollLastAsync(long timeout, TimeUnit unit); + /** * Removes and returns the head elements or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index ea2efa058..ec60ff7f2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -24,7 +24,6 @@ import java.util.Set; import org.redisson.api.RType; import org.redisson.client.protocol.RedisCommand.ValueType; -import org.redisson.client.protocol.convertor.BitSetReplayConvertor; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; @@ -57,6 +56,7 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; @@ -93,8 +93,6 @@ public interface RedisCommands { RedisStrictCommand BITPOS = new RedisStrictCommand("BITPOS", new IntegerReplayConvertor()); RedisStrictCommand SETBIT_VOID = new RedisStrictCommand("SETBIT", new VoidReplayConvertor()); RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BooleanReplayConvertor()); - RedisStrictCommand SETBIT_TRUE = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor(0)); - RedisStrictCommand SETBIT_FALSE = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor(1)); RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor()); RedisStrictCommand WAIT = new RedisStrictCommand("WAIT", new IntegerReplayConvertor()); @@ -193,9 +191,11 @@ public interface RedisCommands { RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); + RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); + RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); Set BLOCKING_COMMANDS = new HashSet( - Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName())); + Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName())); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor()); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java deleted file mode 100644 index 31bfa6294..000000000 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.convertor; - -/** - * - * @author Nikita Koksharov - * - */ -public class BitSetReplayConvertor extends SingleConvertor { - - private final int expectedValue; - - public BitSetReplayConvertor(int expectedValue) { - super(); - this.expectedValue = expectedValue; - } - - @Override - public Boolean convert(Object obj) { - return expectedValue == (Long)obj; - } - - -} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java new file mode 100644 index 000000000..98c84a652 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java @@ -0,0 +1,51 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder { + + @Override + public Object decode(List parts, State state) { + if (!parts.isEmpty()) { + return parts.get(2); + } + return null; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + if (paramNum == 0) { + return StringCodec.INSTANCE.getValueDecoder(); + } + if (paramNum == 1) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + return null; + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 21405cfdb..f15c51054 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Assume; @@ -262,7 +263,33 @@ public class RedissonScoredSortedSetTest extends BaseTest { assertThat(set.pollLast(2)).containsExactly("b", "c"); assertThat(set).containsExactly("a"); } + + @Test + public void testPollLastTimeout() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollLast(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollLast(1, TimeUnit.SECONDS)).isEqualTo("c"); + assertThat(set).containsExactly("a", "b"); + } + @Test + public void testPollFirstTimeout() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a"); + assertThat(set).containsExactly("b", "c"); + } + @Test public void testPollFistAmount() { RScoredSortedSet set = redisson.getScoredSortedSet("simple"); From 2275a553fd42d0fd6f542f292d7b50088af0e49f Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 May 2018 10:09:39 +0300 Subject: [PATCH 06/16] RScoredSortedSet.pollFirstFromAny and pollLastFromAny methods added. #1452 --- .../org/redisson/RedissonScoredSortedSet.java | 32 +++++++++++++++ .../org/redisson/api/RScoredSortedSet.java | 32 +++++++++++++++ .../redisson/api/RScoredSortedSetAsync.java | 39 ++++++++++++++++++- .../redisson/connection/MasterSlaveEntry.java | 7 ++-- .../redisson/RedissonScoredSortedSetTest.java | 37 ++++++++++++++++++ 5 files changed, 142 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index ca8b4a84b..ef5044c91 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -149,6 +149,38 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit)); } + @Override + public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) { + return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(toSeconds(timeout, unit)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, params.toArray()); + } + + @Override + public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) { + return get(pollLastFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(toSeconds(timeout, unit)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, params.toArray()); + } + @Override public V pollLast(long timeout, TimeUnit unit) { return get(pollLastAsync(timeout, unit)); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 4750aa576..9eae10c10 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -46,6 +46,38 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ RCollectionMapReduce mapReduce(); + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + */ + V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index fa034e916..7b6cead46 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -31,8 +31,43 @@ import org.redisson.client.protocol.ScoredEntry; */ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsync> { + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + * + */ + RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. * * @param timeout how long to wait before giving up, in units of * {@code unit} @@ -45,6 +80,8 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn /** * Removes and returns the tail element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. * * @param timeout how long to wait before giving up, in units of * {@code unit} @@ -174,7 +211,7 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn /** * Adds element to this set only if has not been added before. *

- * Works only with Redis 3.0.2 and higher. + * Requires Redis 3.0.2 and higher. * * @param score - object score * @param object - object itself diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 5ea876118..ae1c801cd 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -32,7 +32,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; @@ -249,7 +248,7 @@ public class MasterSlaveEntry { return; } - RFuture newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE); + RFuture newConnection = connectionWriteOp(commandData.getCommand()); newConnection.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -263,7 +262,7 @@ public class MasterSlaveEntry { final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - releaseRead(newConnection); + releaseWrite(newConnection); } }; commandData.getPromise().addListener(listener); @@ -277,7 +276,7 @@ public class MasterSlaveEntry { if (!future.isSuccess()) { listener.operationComplete(null); commandData.getPromise().removeListener(listener); - releaseRead(newConnection); + releaseWrite(newConnection); log.error("Can't resubscribe blocking queue {}", commandData); } } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index f15c51054..ff7c6abdb 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -29,6 +30,42 @@ import org.redisson.client.protocol.ScoredEntry; public class RedissonScoredSortedSetTest extends BaseTest { + @Test + public void testPollFirstFromAny() throws InterruptedException { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue3.add(0.1, 2); + queue1.add(0.1, 1); + queue2.add(0.1, 3); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.pollFirstFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + + @Test + public void testPollLastFromAny() throws InterruptedException { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue3.add(0.1, 2); + queue1.add(0.1, 1); + queue2.add(0.1, 3); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.pollLastFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + @Test public void testSortOrder() { RScoredSortedSet set = redisson.getScoredSortedSet("list", IntegerCodec.INSTANCE); From ddcb93badcdc0ff9041e5e5bbb4fbb1c8d75329b Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 May 2018 10:09:52 +0300 Subject: [PATCH 07/16] test fixed --- .../src/test/java/org/redisson/RedissonBlockingDequeTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java index aa57a684f..afe7fc356 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingDequeTest.java @@ -16,7 +16,7 @@ public class RedissonBlockingDequeTest extends BaseTest { RBlockingDeque blockingDeque = redisson.getBlockingDeque("blocking_deque"); long start = System.currentTimeMillis(); String redisTask = blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS); - assertThat(System.currentTimeMillis() - start).isBetween(950L, 1050L); + assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L); assertThat(redisTask).isNull(); } From 2ba163a32ff2492eedd7fb2b8f8f075908fefbe8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 May 2018 10:22:32 +0300 Subject: [PATCH 08/16] refactoring --- .../java/org/redisson/api/RBitSetAsync.java | 1 - .../org/redisson/api/RBitSetReactive.java | 106 ++++++++++++++++++ .../reactive/RedissonSetCacheReactive.java | 7 +- 3 files changed, 112 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RBitSetAsync.java b/redisson/src/main/java/org/redisson/api/RBitSetAsync.java index 6af40ffd6..ff4d592b5 100644 --- a/redisson/src/main/java/org/redisson/api/RBitSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBitSetAsync.java @@ -32,7 +32,6 @@ public interface RBitSetAsync extends RExpirableAsync { * Returns zero if there are no any set bit. * * @return "logical size" = index of highest set bit plus one - * @return void */ RFuture lengthAsync(); diff --git a/redisson/src/main/java/org/redisson/api/RBitSetReactive.java b/redisson/src/main/java/org/redisson/api/RBitSetReactive.java index b056df5a4..eac201faa 100644 --- a/redisson/src/main/java/org/redisson/api/RBitSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBitSetReactive.java @@ -20,6 +20,7 @@ import java.util.BitSet; import org.reactivestreams.Publisher; /** + * Vector of bits that grows as needed. * * @author Nikita Koksharov * @@ -30,36 +31,141 @@ public interface RBitSetReactive extends RExpirableReactive { Publisher toByteArray(); + /** + * Returns "logical size" = index of highest set bit plus one. + * Returns zero if there are no any set bit. + * + * @return "logical size" = index of highest set bit plus one + */ Publisher length(); + /** + * Set all bits to value from fromIndex (inclusive) to toIndex (exclusive) + * + * @param fromIndex inclusive + * @param toIndex exclusive + * @param value true = 1, false = 0 + * @return void + * + */ Publisher set(long fromIndex, long toIndex, boolean value); + /** + * Set all bits to zero from fromIndex (inclusive) to toIndex (exclusive) + * + * @param fromIndex inclusive + * @param toIndex exclusive + * @return void + * + */ Publisher clear(long fromIndex, long toIndex); + /** + * Copy bits state of source BitSet object to this object + * + * @param bs - BitSet source + * @return void + */ Publisher set(BitSet bs); + /** + * Executes NOT operation over all bits + * + * @return void + */ Publisher not(); + /** + * Set all bits to one from fromIndex (inclusive) to toIndex (exclusive) + * + * @param fromIndex inclusive + * @param toIndex exclusive + * @return void + */ Publisher set(long fromIndex, long toIndex); + /** + * Returns number of set bits. + * + * @return number of set bits. + */ Publisher size(); + /** + * Returns true if bit set to one and false overwise. + * + * @param bitIndex - index of bit + * @return true if bit set to one and false overwise. + */ Publisher get(long bitIndex); + /** + * Set bit to one at specified bitIndex + * + * @param bitIndex - index of bit + * @return true - if previous value was true, + * false - if previous value was false + */ Publisher set(long bitIndex); + /** + * Set bit to value at specified bitIndex + * + * @param bitIndex - index of bit + * @param value true = 1, false = 0 + * @return true - if previous value was true, + * false - if previous value was false + */ Publisher set(long bitIndex, boolean value); + /** + * Returns the number of bits set to one. + * + * @return number of bits + */ Publisher cardinality(); + /** + * Set bit to zero at specified bitIndex + * + * @param bitIndex - index of bit + * @return true - if previous value was true, + * false - if previous value was false + */ Publisher clear(long bitIndex); + /** + * Set all bits to zero + * + * @return void + */ Publisher clear(); + /** + * Executes OR operation over this object and specified bitsets. + * Stores result into this object. + * + * @param bitSetNames - name of stored bitsets + * @return void + */ Publisher or(String... bitSetNames); + /** + * Executes AND operation over this object and specified bitsets. + * Stores result into this object. + * + * @param bitSetNames - name of stored bitsets + * @return void + */ Publisher and(String... bitSetNames); + /** + * Executes XOR operation over this object and specified bitsets. + * Stores result into this object. + * + * @param bitSetNames - name of stored bitsets + * @return void + */ Publisher xor(String... bitSetNames); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index ff4ad7e93..5eb9d2e39 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -84,7 +84,12 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple @Override public Publisher size() { - return commandExecutor.readReactive(getName(), codec, RedisCommands.ZCARD_INT, getName()); + return reactive(new Supplier>() { + @Override + public RFuture get() { + return instance.sizeAsync(); + } + }); } @Override From 16368a5ed8e5401595b65ab4d2e669f8b51a753f Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 May 2018 12:12:54 +0300 Subject: [PATCH 09/16] Fixed - setPingConnectionInterval is not propagated for single server configuration. #1455 --- .../java/org/redisson/connection/SingleConnectionManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java index 341fb7435..71b2ce14e 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -37,6 +37,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { private static MasterSlaveServersConfig create(SingleServerConfig cfg) { MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig(); + newconfig.setPingConnectionInterval(cfg.getPingConnectionInterval()); newconfig.setSslEnableEndpointIdentification(cfg.isSslEnableEndpointIdentification()); newconfig.setSslProvider(cfg.getSslProvider()); newconfig.setSslTruststore(cfg.getSslTruststore()); From 7c346cb2b36f8283023d1827f5e730404b7f2940 Mon Sep 17 00:00:00 2001 From: Rui Gu Date: Sat, 26 May 2018 11:27:34 +0100 Subject: [PATCH 10/16] Changed Javadoc version --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f2b596f83..8e3b45ed4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis client ==== -[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.6.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** +[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. From 3f33c4a9d73b5cb764f055ef3fede3c5e0c68038 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 28 May 2018 14:21:47 +0300 Subject: [PATCH 11/16] Fixed --- .../src/test/java/org/redisson/RedissonRateLimiterTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java index 15b997fbd..55f044b38 100644 --- a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -35,7 +35,7 @@ public class RedissonRateLimiterTest extends BaseTest { } @Test - public void test3() throws InterruptedException { + public void testConcurrency() throws InterruptedException { RRateLimiter rr = redisson.getRateLimiter("test"); assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue(); assertThat(rr.trySetRate(RateType.OVERALL, 20, 1, RateIntervalUnit.SECONDS)).isFalse(); @@ -55,7 +55,8 @@ public class RedissonRateLimiterTest extends BaseTest { } } try { - Thread.sleep(ThreadLocalRandom.current().nextInt(10)); } catch (InterruptedException e) { + Thread.sleep(ThreadLocalRandom.current().nextInt(10)); + } catch (InterruptedException e) { e.printStackTrace(); } From 31d5140b7636cf598722a37b2300cc2b59d2cc0b Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 31 May 2018 09:45:05 +0300 Subject: [PATCH 12/16] Fixed - ClusterConnectionManager should use shared resolverGroup --- .../redisson/cluster/ClusterConnectionManager.java | 2 +- .../connection/MasterSlaveConnectionManager.java | 13 ------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 20d1f28be..dd5585e50 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -288,7 +288,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { public void run() { if (isConfigEndpoint) { final URI uri = cfg.getNodeAddresses().iterator().next(); - final AddressResolver resolver = createResolverGroup().getResolver(getGroup().next()); + final AddressResolver resolver = resolverGroup.getResolver(getGroup().next()); Future> allNodes = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); allNodes.addListener(new FutureListener>() { @Override diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index e9150eb58..fb1e0693b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -211,19 +211,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { this.commandExecutor = new CommandSyncService(this); } - /* - * Remove it once https://github.com/netty/netty/issues/7882 get resolved - */ - protected DnsAddressResolverGroup createResolverGroup() { - if (cfg.getTransportMode() == TransportMode.EPOLL) { - return cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); - } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { - return cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); - } - - return cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, DnsServerAddressStreamProviders.platformDefault()); - } - protected void closeNodeConnections() { List> futures = new ArrayList>(); for (RedisConnection connection : nodeConnections.values()) { From c3cdb4f5a8d1988c577c113fd7230c5d4d4c18ff Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 31 May 2018 15:55:44 +0300 Subject: [PATCH 13/16] ExecutorService task failover implemented. #1291, #1120 --- .../java/org/redisson/RedissonExecutorService.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 6d43a5471..a5d0d66eb 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -92,7 +92,7 @@ import io.netty.util.internal.PlatformDependent; */ public class RedissonExecutorService implements RScheduledExecutorService { - private static final Logger log = LoggerFactory.getLogger(RedissonExecutorService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class); private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS); @@ -250,6 +250,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));" + "if retryInterval ~= false then " + "local startTime = tonumber(ARGV[1]) + tonumber(retryInterval);" + + "for i = 1, #expiredTaskIds, 1 do " + "local name = expiredTaskIds[i];" + "local scheduledName = expiredTaskIds[i];" @@ -266,7 +267,12 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "if v[1] == expiredTaskIds[i] then " + "redis.call('publish', KEYS[3], startTime); " + "end;" - + "redis.call('rpush', KEYS[1], name);" + + + "if redis.call('linsert', KEYS[1], 'before', name, name) < 1 then " + + "redis.call('rpush', KEYS[1], name); " + + "else " + + "redis.call('lrem', KEYS[1], -1, name); " + + "end; " + "end; " + "else " + "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));" @@ -279,7 +285,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { + "end " + "return nil;", Arrays.asList(requestQueueName, schedulerQueueName, schedulerChannelName, tasksRetryIntervalName), - System.currentTimeMillis(), 100); + System.currentTimeMillis(), 50); } }; queueTransferService.schedule(getName(), task); @@ -303,8 +309,6 @@ public class RedissonExecutorService implements RScheduledExecutorService { } }); } - - private long repeatInterval = 5000; @Override public void execute(Runnable task) { From 8d347ad22be930b9a92fd55724a5c90440fa0a3c Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 1 Jun 2018 10:38:43 +0300 Subject: [PATCH 14/16] Fixed - methods belongs to transactional objects get blocked at high concurrency. #1459 --- .../java/org/redisson/pubsub/LockPubSub.java | 23 +++++------------- .../org/redisson/pubsub/SemaphorePubSub.java | 23 +++++------------- .../RedissonBaseTransactionalMapTest.java | 24 ++++++++++++++++++- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java index 08b023799..1ed966325 100644 --- a/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java @@ -35,27 +35,16 @@ public class LockPubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { - value.getLatch().release(); - while (true) { - Runnable runnableToExecute = null; - synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnableToExecute = runnable; - } else { - value.addListener(runnable); - } - } + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute == null) { + break; } - if (runnableToExecute != null) { - runnableToExecute.run(); - } else { - return; - } + runnableToExecute.run(); } + + value.getLatch().release(); } } diff --git a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java index 31268d737..c76a886f2 100644 --- a/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java +++ b/redisson/src/main/java/org/redisson/pubsub/SemaphorePubSub.java @@ -32,27 +32,16 @@ public class SemaphorePubSub extends PublishSubscribe { @Override protected void onMessage(RedissonLockEntry value, Long message) { - value.getLatch().release(message.intValue()); - while (true) { - Runnable runnableToExecute = null; - synchronized (value) { - Runnable runnable = value.getListeners().poll(); - if (runnable != null) { - if (value.getLatch().tryAcquire()) { - runnableToExecute = runnable; - } else { - value.addListener(runnable); - } - } + Runnable runnableToExecute = value.getListeners().poll(); + if (runnableToExecute == null) { + break; } - if (runnableToExecute != null) { - runnableToExecute.run(); - } else { - return; - } + runnableToExecute.run(); } + + value.getLatch().release(message.intValue()); } } diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java index 6e9d521aa..27c7c4240 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonBaseTransactionalMapTest.java @@ -1,9 +1,12 @@ package org.redisson.transaction; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.redisson.BaseTest; @@ -17,6 +20,25 @@ public abstract class RedissonBaseTransactionalMapTest extends BaseTest { protected abstract RMap getTransactionalMap(RTransaction transaction); + @Test + public void testFastPut() throws InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(16); + for (int i = 0; i < 500; i++) { + executor.submit(() -> { + for (int j = 0; j < 100; j++) { + RTransaction t = redisson.createTransaction(TransactionOptions.defaults()); + RMap map = getTransactionalMap(t); + map.fastPut("1", "1"); + t.commit(); + } + }); + } + + executor.shutdown(); + assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + } + + @Test public void testPutAll() { RMap m = getMap(); From ec2ad2e2cd6a7d82a6828deb96bc0fba122378b3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 1 Jun 2018 11:32:02 +0300 Subject: [PATCH 15/16] Fixed - Redis nodes with noaddr flag should be parsed correctly. --- .../protocol/decoder/ClusterNodesDecoder.java | 17 ++++++++++------- .../org/redisson/cluster/ClusterNodeInfo.java | 3 ++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java index f8576b88e..e8ab3db02 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ClusterNodesDecoder.java @@ -23,6 +23,7 @@ import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.cluster.ClusterNodeInfo; import org.redisson.cluster.ClusterSlotRange; +import org.redisson.cluster.ClusterNodeInfo.Flag; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -53,18 +54,20 @@ public class ClusterNodesDecoder implements Decoder> { String nodeId = params[0]; node.setNodeId(nodeId); - String protocol = "redis://"; - if (ssl) { - protocol = "rediss://"; - } - String addr = protocol + params[1].split("@")[0]; - node.setAddress(addr); - String flags = params[2]; for (String flag : flags.split(",")) { String flagValue = flag.toUpperCase().replaceAll("\\?", ""); node.addFlag(ClusterNodeInfo.Flag.valueOf(flagValue)); } + + if (!node.containsFlag(Flag.NOADDR)) { + String protocol = "redis://"; + if (ssl) { + protocol = "rediss://"; + } + String addr = protocol + params[1].split("@")[0]; + node.setAddress(addr); + } String slaveOf = params[3]; if (!"-".equals(slaveOf)) { diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java b/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java index 03e622c3e..c37d9225b 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterNodeInfo.java @@ -16,6 +16,7 @@ package org.redisson.cluster; import java.net.URI; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import org.redisson.misc.URIBuilder; @@ -33,7 +34,7 @@ public class ClusterNodeInfo { private String nodeId; private URI address; - private final Set flags = new HashSet(); + private final Set flags = EnumSet.noneOf(Flag.class); private String slaveOf; private final Set slotRanges = new HashSet(); From ece61c8647305730323e29ab37948f9099fe32c7 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 1 Jun 2018 11:42:31 +0300 Subject: [PATCH 16/16] Fixed - reconnectionTimeout and failedAttempts should be renamed in xsd schema. #1394 --- .../redisson/spring/support/redisson-1.2.xsd | 2427 +++++++++++++++++ 1 file changed, 2427 insertions(+) create mode 100644 redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd diff --git a/redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd b/redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd new file mode 100644 index 000000000..9123f4ce2 --- /dev/null +++ b/redisson/src/main/resources/org/redisson/spring/support/redisson-1.2.xsd @@ -0,0 +1,2427 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + netty-tcnative lib is required to be in classpath. + ]]> + + + + + + + + + + + + + + + <qualifier> is not used. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + timeout time + and current connections amount bigger than minimum idle connections pool + size, then it will closed and removed from pool. + Value in milliseconds. + + Default: 10000 + ]]> + + + + + Node.ping and Node.pingAll + operation. Value in milliseconds. + + Default: 1000 + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + slaveFailsInterval value. + + Default is 60000 + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + each slave + node. + + Default: 10 + ]]> + + + + + each slave + node. + + Default: 64 + ]]> + + + + + each slave + node. + + Default: 10 + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + NB: applications must ensure the JVM DNS cache TTL is low enough to + support this. e.g., http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-jvm-ttl.html + + Default: false + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true then invalidation + message which removes corresponding entry from cache will be sent to all + other RLocalCachedMap instances on each entry update/remove operation. + if false then invalidation message won't be sent. + ]]> + + + + + LRU - uses cache with LRU (least recently used) eviction + policy. +

LFU - uses cache with LFU (least frequently used) + eviction policy. +

SOFT - uses cache with soft references. The garbage + collector will evict items from the cache when the JVM is + running out of memory. JVM flag -XX:SoftRefLRUPolicyMSPerMB=??? + is required to function. +

NONE - doesn't use eviction policy, but timeToLive and + maxIdleTime params are still working. + ]]> + + + + + 0 then local cache is unbounded. + ]]> + + + + + 0 then timeout is not applied. + + Default unit is MILLISECONDS + ]]> + + + + + + + + + + 0 then timeout is not applied. + + Default unit is MILLISECONDS + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + NOT mandatory + since the class will also be registered lazily when it is first used. + + All classed registered with the service is stored in a class cache. + + The cache is independent between different RedissonClient instances. When + a class is registered in one RLiveObjectService instance it is also + accessible in another RLiveObjectService instance so long as they are + created by the same RedissonClient instance. + ]]> + + + + + + + + + + + + + + + + + + NOT mandatory + since the class will also be registered lazily when it is first used. + + All classed registered with the service is stored in a class cache. + + The cache is independent between different RedissonClient instances. When + a class is registered in one RLiveObjectService instance it is also + accessible in another RLiveObjectService instance so long as they are + created by the same RedissonClient instance. + + One of "object-id" or "object-id-ref" attribute is required. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Set eviction + + Redisson distributed Set for Java with eviction support implemented by + separate RSetCache object which extends RSet interface. It also + implements java.util.Set interface. + + Current redis implementation doesn't has set value eviction + functionality. Therefore expired values are cleaned by + org.redisson.EvictionScheduler. It removes 100 expired values at once. + Task launch time tuned automatically and depends on expired entries + amount deleted in previous time and varies between 1 second to 2 hours. + Thus if clean task deletes 100 values each time it will be executed + every second (minimum execution delay). But if current expired values + amount is lower than previous one then execution delay will be increased + by 1.5 times. + ]]> + + + + + + + + + + + Map eviction + + Redisson distributed Map for Java with eviction support implemented by + separate RMapCache object which extends RMap interface. It keeps + elements in insertion order and implements + java.util.concurrent.ConcurrentMap and java.util.Map interfaces. + Redisson has a Spring Cache integration which based on Map and MapCache + objects. + + Current redis implementation doesn't has map entry eviction + functionality. Therefore expired entries are cleaned by + org.redisson.EvictionScheduler. It removes 100 expired entries at once. + Task launch time tuned automatically and depends on expired entries + amount deleted in previous time and varies between 1 second to 2 hours. + Thus if clean task deletes 100 entries each time it will be executed + every second (minimum execution delay). But if current expired entries + amount is lower than previous one then execution delay will be increased + by 1.5 times. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Map local cache + + In case when a Map is used mostly for read operations and/or network + roundtrips are undesirable. Redisson offers RLocalCachedMap object which + caches Map entries on Redisson side. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + interface. + Keeps elements uniqueness via element state comparison. + ]]> + + + + + + + + + + + interface. + Keeps elements uniqueness via element state comparison. + ]]> + + + + + + + + + + + + + + + + + interface. + Keeps elements uniqueness via element state comparison. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Live distributed object (also abbreviated as live object) refers to a + running instance of a distributed multi-party (or peer-to-peer) protocol, + viewed from the object-oriented perspective, as an entity that has a + distinct identity, may encapsulate internal state and threads of + execution, and that exhibits a well-defined externally visible behavior. + + + Redisson Live Object (RLO) realised this idea by mapping all the fields + inside a Java class to a redis hash through a runtime-constructed proxy + class. All the get/set methods of each field are translated to hget/hset + commands operated on the redis hash, making it accessable to/from any + clients connected to the same redis server. As we all know, the field + values of an object represent its state; having them stored in a remote + repository, redis, makes it a distributed object. This object is a + Redisson Live Object. + + By using RLO, sharing an object between applications and/or servers is + the same as sharing one in a standalone application. This removes the + need for serialization and deserialization, and at the same time reduces + the complexity of the programming model: Changes made to one field + is (almost^) immediately accessable to other processes, applications and + servers. (^Redis' eventual consistant replication rule still applies + when connected to slave nodes) + + Since the redis server is a single-threaded application, all field + access to the live object is automatically executed in atomic fashion: a + value will not be changed when you are reading it. + + With RLO, you can treat the redis server as a shared Heap space for all + connected JVMs. + ]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Define and create a Redisson instance. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Define and create a RedisClient instance. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +