From 2275a553fd42d0fd6f542f292d7b50088af0e49f Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 May 2018 10:09:39 +0300 Subject: [PATCH] RScoredSortedSet.pollFirstFromAny and pollLastFromAny methods added. #1452 --- .../org/redisson/RedissonScoredSortedSet.java | 32 +++++++++++++++ .../org/redisson/api/RScoredSortedSet.java | 32 +++++++++++++++ .../redisson/api/RScoredSortedSetAsync.java | 39 ++++++++++++++++++- .../redisson/connection/MasterSlaveEntry.java | 7 ++-- .../redisson/RedissonScoredSortedSetTest.java | 37 ++++++++++++++++++ 5 files changed, 142 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index ca8b4a84b..ef5044c91 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -149,6 +149,38 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, getName(), toSeconds(timeout, unit)); } + @Override + public V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames) { + return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(toSeconds(timeout, unit)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, params.toArray()); + } + + @Override + public V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames) { + return get(pollLastFromAnyAsync(timeout, unit, queueNames)); + } + + @Override + public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); + for (Object name : queueNames) { + params.add(name); + } + params.add(toSeconds(timeout, unit)); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, params.toArray()); + } + @Override public V pollLast(long timeout, TimeUnit unit) { return get(pollLastAsync(timeout, unit)); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 4750aa576..9eae10c10 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -46,6 +46,38 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ RCollectionMapReduce mapReduce(); + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + V pollLastFromAny(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + */ + V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index fa034e916..7b6cead46 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -31,8 +31,43 @@ import org.redisson.client.protocol.ScoredEntry; */ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsync> { + /** + * Removes and returns first available tail element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the tail element, or {@code null} if all sorted sets are empty + */ + RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); + + /** + * Removes and returns first available head element of any sorted set, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param queueNames - names of queue + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head element, or {@code null} if all sorted sets are empty + * + */ + RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. * * @param timeout how long to wait before giving up, in units of * {@code unit} @@ -45,6 +80,8 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn /** * Removes and returns the tail element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. * * @param timeout how long to wait before giving up, in units of * {@code unit} @@ -174,7 +211,7 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn /** * Adds element to this set only if has not been added before. *

- * Works only with Redis 3.0.2 and higher. + * Requires Redis 3.0.2 and higher. * * @param score - object score * @param object - object itself diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 5ea876118..ae1c801cd 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -32,7 +32,6 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.RedisCommand; -import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; @@ -249,7 +248,7 @@ public class MasterSlaveEntry { return; } - RFuture newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE); + RFuture newConnection = connectionWriteOp(commandData.getCommand()); newConnection.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -263,7 +262,7 @@ public class MasterSlaveEntry { final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - releaseRead(newConnection); + releaseWrite(newConnection); } }; commandData.getPromise().addListener(listener); @@ -277,7 +276,7 @@ public class MasterSlaveEntry { if (!future.isSuccess()) { listener.operationComplete(null); commandData.getPromise().removeListener(listener); - releaseRead(newConnection); + releaseWrite(newConnection); log.error("Can't resubscribe blocking queue {}", commandData); } } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index f15c51054..ff7c6abdb 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -29,6 +30,42 @@ import org.redisson.client.protocol.ScoredEntry; public class RedissonScoredSortedSetTest extends BaseTest { + @Test + public void testPollFirstFromAny() throws InterruptedException { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue3.add(0.1, 2); + queue1.add(0.1, 1); + queue2.add(0.1, 3); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.pollFirstFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + + @Test + public void testPollLastFromAny() throws InterruptedException { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue3.add(0.1, 2); + queue1.add(0.1, 1); + queue2.add(0.1, 3); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.pollLastFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"); + + Assert.assertEquals(2, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + @Test public void testSortOrder() { RScoredSortedSet set = redisson.getScoredSortedSet("list", IntegerCodec.INSTANCE);