From 3b4f11cc0365b537ddda73026b7848bf87f1ebed Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 24 May 2018 16:39:29 +0300 Subject: [PATCH] RScoredSortedSet.pollFirst and pollLast methods with timeout added. #1452 --- .../java/org/redisson/RedissonObject.java | 10 +++- .../main/java/org/redisson/RedissonQueue.java | 8 --- .../org/redisson/RedissonScoredSortedSet.java | 21 ++++++++ .../org/redisson/api/RScoredSortedSet.java | 24 +++++++++ .../redisson/api/RScoredSortedSetAsync.java | 24 +++++++++ .../client/protocol/RedisCommands.java | 8 +-- .../convertor/BitSetReplayConvertor.java | 38 -------------- .../ScoredSortedSetPolledObjectDecoder.java | 51 +++++++++++++++++++ .../redisson/RedissonScoredSortedSetTest.java | 27 ++++++++++ 9 files changed, 160 insertions(+), 51 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 927624228..369893943 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -70,9 +70,17 @@ public abstract class RedissonObject implements RObject { return "{" + name + "}:" + suffix; } - protected V get(RFuture future) { + protected final V get(RFuture future) { return commandExecutor.get(future); } + + protected final long toSeconds(long timeout, TimeUnit unit) { + long seconds = unit.toSeconds(timeout); + if (timeout != 0 && seconds == 0) { + seconds = 1; + } + return seconds; + } @Override public String getName() { diff --git a/redisson/src/main/java/org/redisson/RedissonQueue.java b/redisson/src/main/java/org/redisson/RedissonQueue.java index f48ac7abb..fa25de8c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonQueue.java @@ -68,14 +68,6 @@ public class RedissonQueue extends RedissonList implements RQueue { return value; } - protected long toSeconds(long timeout, TimeUnit unit) { - long seconds = unit.toSeconds(timeout); - if (timeout != 0 && seconds == 0) { - seconds = 1; - } - return seconds; - } - @Override public V remove() { return removeFirst(); diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 2decb2198..ca8b4a84b 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.Set; import org.redisson.api.RFuture; @@ -138,6 +139,26 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc Collections.singletonList(getName()), from, to); } + @Override + public V pollFirst(long timeout, TimeUnit unit) { + return get(pollFirstAsync(timeout, unit)); + } + + @Override + public RFuture pollFirstAsync(long timeout, TimeUnit unit) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit)); + } + + @Override + public V pollLast(long timeout, TimeUnit unit) { + return get(pollLastAsync(timeout, unit)); + } + + @Override + public RFuture pollLastAsync(long timeout, TimeUnit unit) { + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, getName(), toSeconds(timeout, unit)); + } + @Override public boolean add(double score, V object) { return get(addAsync(score, object)); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 7b107bd84..4750aa576 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.protocol.ScoredEntry; @@ -45,6 +46,29 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ RCollectionMapReduce mapReduce(); + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + V pollFirst(long timeout, TimeUnit unit); + + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + V pollLast(long timeout, TimeUnit unit); + /** * Removes and returns the head elements or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 87d9e67ec..fa034e916 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; @@ -30,6 +31,29 @@ import org.redisson.client.protocol.ScoredEntry; */ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsync> { + /** + * Removes and returns the head element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, + * or {@code null} if this sorted set is empty + */ + RFuture pollFirstAsync(long timeout, TimeUnit unit); + + /** + * Removes and returns the tail element or {@code null} if this sorted set is empty. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element or {@code null} if this sorted set is empty + */ + RFuture pollLastAsync(long timeout, TimeUnit unit); + /** * Removes and returns the head elements or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index ea2efa058..ec60ff7f2 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -24,7 +24,6 @@ import java.util.Set; import org.redisson.api.RType; import org.redisson.client.protocol.RedisCommand.ValueType; -import org.redisson.client.protocol.convertor.BitSetReplayConvertor; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor; @@ -57,6 +56,7 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; @@ -93,8 +93,6 @@ public interface RedisCommands { RedisStrictCommand BITPOS = new RedisStrictCommand("BITPOS", new IntegerReplayConvertor()); RedisStrictCommand SETBIT_VOID = new RedisStrictCommand("SETBIT", new VoidReplayConvertor()); RedisStrictCommand SETBIT = new RedisStrictCommand("SETBIT", new BooleanReplayConvertor()); - RedisStrictCommand SETBIT_TRUE = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor(0)); - RedisStrictCommand SETBIT_FALSE = new RedisStrictCommand("SETBIT", new BitSetReplayConvertor(1)); RedisStrictCommand BITOP = new RedisStrictCommand("BITOP", new VoidReplayConvertor()); RedisStrictCommand WAIT = new RedisStrictCommand("WAIT", new IntegerReplayConvertor()); @@ -193,9 +191,11 @@ public interface RedisCommands { RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); + RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); + RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); Set BLOCKING_COMMANDS = new HashSet( - Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName())); + Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName())); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor()); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java deleted file mode 100644 index 31bfa6294..000000000 --- a/redisson/src/main/java/org/redisson/client/protocol/convertor/BitSetReplayConvertor.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.client.protocol.convertor; - -/** - * - * @author Nikita Koksharov - * - */ -public class BitSetReplayConvertor extends SingleConvertor { - - private final int expectedValue; - - public BitSetReplayConvertor(int expectedValue) { - super(); - this.expectedValue = expectedValue; - } - - @Override - public Boolean convert(Object obj) { - return expectedValue == (Long)obj; - } - - -} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java new file mode 100644 index 000000000..98c84a652 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java @@ -0,0 +1,51 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder { + + @Override + public Object decode(List parts, State state) { + if (!parts.isEmpty()) { + return parts.get(2); + } + return null; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + if (paramNum == 0) { + return StringCodec.INSTANCE.getValueDecoder(); + } + if (paramNum == 1) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + return null; + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 21405cfdb..f15c51054 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Assume; @@ -262,7 +263,33 @@ public class RedissonScoredSortedSetTest extends BaseTest { assertThat(set.pollLast(2)).containsExactly("b", "c"); assertThat(set).containsExactly("a"); } + + @Test + public void testPollLastTimeout() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollLast(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollLast(1, TimeUnit.SECONDS)).isEqualTo("c"); + assertThat(set).containsExactly("a", "b"); + } + @Test + public void testPollFirstTimeout() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a"); + assertThat(set).containsExactly("b", "c"); + } + @Test public void testPollFistAmount() { RScoredSortedSet set = redisson.getScoredSortedSet("simple");