RScoredSortedSet.pollFirstFromAny and pollLastFromAny methods added. #1452

pull/1461/head
Nikita 7 years ago
parent 3b4f11cc03
commit 2275a553fd

@ -149,6 +149,38 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit));
}
@Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames));
}
@Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, params.toArray());
}
@Override
public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) {
return get(pollLastFromAnyAsync(timeout, unit, queueNames));
}
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, params.toArray());
}
@Override
public V pollLast(long timeout, TimeUnit unit) {
return get(pollLastAsync(timeout, unit));

@ -46,6 +46,38 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
<KOut, VOut> RCollectionMapReduce<V, KOut, VOut> mapReduce();
/**
* Removes and returns first available tail element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty
*/
V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element, or {@code null} if all sorted sets are empty
*/
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*

@ -31,8 +31,43 @@ import org.redisson.client.protocol.ScoredEntry;
*/
public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsync<Set<V>> {
/**
* Removes and returns first available tail element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the tail element, or {@code null} if all sorted sets are empty
*/
RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param queueNames - names of queue
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head element, or {@code null} if all sorted sets are empty
*
*/
RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
@ -45,6 +80,8 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
@ -174,7 +211,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
/**
* Adds element to this set only if has not been added before.
* <p>
* Works only with <b>Redis 3.0.2 and higher.</b>
* Requires <b>Redis 3.0.2 and higher.</b>
*
* @param score - object score
* @param object - object itself

@ -32,7 +32,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
@ -249,7 +248,7 @@ public class MasterSlaveEntry {
return;
}
RFuture<RedisConnection> newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE);
RFuture<RedisConnection> newConnection = connectionWriteOp(commandData.getCommand());
newConnection.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
@ -263,7 +262,7 @@ public class MasterSlaveEntry {
final FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
releaseRead(newConnection);
releaseWrite(newConnection);
}
};
commandData.getPromise().addListener(listener);
@ -277,7 +276,7 @@ public class MasterSlaveEntry {
if (!future.isSuccess()) {
listener.operationComplete(null);
commandData.getPromise().removeListener(listener);
releaseRead(newConnection);
releaseWrite(newConnection);
log.error("Can't resubscribe blocking queue {}", commandData);
}
}

@ -12,6 +12,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
@ -29,6 +30,42 @@ import org.redisson.client.protocol.ScoredEntry;
public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testPollFirstFromAny() throws InterruptedException {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue3.add(0.1, 2);
queue1.add(0.1, 1);
queue2.add(0.1, 3);
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollFirstFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollLastFromAny() throws InterruptedException {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue3.add(0.1, 2);
queue1.add(0.1, 1);
queue2.add(0.1, 3);
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollLastFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testSortOrder() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("list", IntegerCodec.INSTANCE);

Loading…
Cancel
Save