RScoredSortedSet.pollFirst and pollLast methods with timeout added. #1452

pull/1461/head
Nikita 7 years ago
parent 4b95d19229
commit 3b4f11cc03

@ -70,9 +70,17 @@ public abstract class RedissonObject implements RObject {
return "{" + name + "}:" + suffix;
}
protected <V> V get(RFuture<V> future) {
protected final <V> V get(RFuture<V> future) {
return commandExecutor.get(future);
}
protected final long toSeconds(long timeout, TimeUnit unit) {
long seconds = unit.toSeconds(timeout);
if (timeout != 0 && seconds == 0) {
seconds = 1;
}
return seconds;
}
@Override
public String getName() {

@ -68,14 +68,6 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
return value;
}
protected long toSeconds(long timeout, TimeUnit unit) {
long seconds = unit.toSeconds(timeout);
if (timeout != 0 && seconds == 0) {
seconds = 1;
}
return seconds;
}
@Override
public V remove() {
return removeFirst();

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import org.redisson.api.RFuture;
@ -138,6 +139,26 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
Collections.<Object>singletonList(getName()), from, to);
}
@Override
public V pollFirst(long timeout, TimeUnit unit) {
return get(pollFirstAsync(timeout, unit));
}
@Override
public RFuture<V> pollFirstAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit));
}
@Override
public V pollLast(long timeout, TimeUnit unit) {
return get(pollLastAsync(timeout, unit));
}
@Override
public RFuture<V> pollLastAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, getName(), toSeconds(timeout, unit));
}
@Override
public boolean add(double score, V object) {
return get(addAsync(score, object));

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.protocol.ScoredEntry;
@ -45,6 +46,29 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element,
* or {@code null} if this sorted set is empty
*/
V pollFirst(long timeout, TimeUnit unit);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element or {@code null} if this sorted set is empty
*/
V pollLast(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements or {@code null} if this sorted set is empty.
*

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.ScoredEntry;
@ -30,6 +31,29 @@ import org.redisson.client.protocol.ScoredEntry;
*/
public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsync<Set<V>> {
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element,
* or {@code null} if this sorted set is empty
*/
RFuture<V> pollFirstAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<V> pollLastAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements or {@code null} if this sorted set is empty.
*

@ -24,7 +24,6 @@ import java.util.Set;
import org.redisson.api.RType;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitSetReplayConvertor;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
@ -57,6 +56,7 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
@ -93,8 +93,6 @@ public interface RedisCommands {
RedisStrictCommand<Integer> BITPOS = new RedisStrictCommand<Integer>("BITPOS", new IntegerReplayConvertor());
RedisStrictCommand<Void> SETBIT_VOID = new RedisStrictCommand<Void>("SETBIT", new VoidReplayConvertor());
RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> SETBIT_TRUE = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor(0));
RedisStrictCommand<Boolean> SETBIT_FALSE = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor(1));
RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor());
RedisStrictCommand<Integer> WAIT = new RedisStrictCommand<Integer>("WAIT", new IntegerReplayConvertor());
@ -193,9 +191,11 @@ public interface RedisCommands {
RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH");
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());
Set<String> BLOCKING_COMMANDS = new HashSet<String>(
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName()));
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName()));
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor());
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");

@ -1,38 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
/**
*
* @author Nikita Koksharov
*
*/
public class BitSetReplayConvertor extends SingleConvertor<Boolean> {
private final int expectedValue;
public BitSetReplayConvertor(int expectedValue) {
super();
this.expectedValue = expectedValue;
}
@Override
public Boolean convert(Object obj) {
return expectedValue == (Long)obj;
}
}

@ -0,0 +1,51 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
*/
public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
return parts.get(2);
}
return null;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum == 0) {
return StringCodec.INSTANCE.getValueDecoder();
}
if (paramNum == 1) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
}

@ -12,6 +12,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Assume;
@ -262,7 +263,33 @@ public class RedissonScoredSortedSetTest extends BaseTest {
assertThat(set.pollLast(2)).containsExactly("b", "c");
assertThat(set).containsExactly("a");
}
@Test
public void testPollLastTimeout() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollLast(1, TimeUnit.SECONDS)).isNull();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
assertThat(set.pollLast(1, TimeUnit.SECONDS)).isEqualTo("c");
assertThat(set).containsExactly("a", "b");
}
@Test
public void testPollFirstTimeout() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a");
assertThat(set).containsExactly("b", "c");
}
@Test
public void testPollFistAmount() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");

Loading…
Cancel
Save