diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 00459f7dd..0aa341ec5 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -1066,4 +1066,24 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray()); } + + @Override + public RFuture takeFirstAsync() { + return pollFirstAsync(0, TimeUnit.SECONDS); + } + + @Override + public RFuture takeLastAsync() { + return pollLastAsync(0, TimeUnit.SECONDS); + } + + @Override + public V takeFirst() { + return get(takeFirstAsync()); + } + + @Override + public V takeLast() { + return get(takeLastAsync()); + } } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index d9005de60..d8bbf29fc 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -78,7 +78,21 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @return the head element, or {@code null} if all sorted sets are empty */ V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); - + + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + V takeFirst(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + V takeLast(); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index a8f481507..804ac5ab3 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -78,6 +78,20 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn */ RFuture pollFirstAsync(long timeout, TimeUnit unit); + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + RFuture takeFirstAsync(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + RFuture takeLastAsync(); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. *

diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index df8fdb95a..2c59678d9 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -415,5 +415,18 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab */ Publisher union(Aggregate aggregate, Map nameWithWeight); + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + Publisher takeFirst(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + Publisher takeLast(); } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java index c2610bbdb..9703c13ff 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java @@ -416,5 +416,18 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> */ Flowable union(Aggregate aggregate, Map nameWithWeight); + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + Flowable takeFirst(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + Flowable takeLast(); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java index 98c84a652..29f36ee26 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java @@ -32,7 +32,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder @Override public Object decode(List parts, State state) { if (!parts.isEmpty()) { - return parts.get(2); + return parts.get(1); } return null; } @@ -42,7 +42,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder if (paramNum == 0) { return StringCodec.INSTANCE.getValueDecoder(); } - if (paramNum == 1) { + if (paramNum == 2) { return DoubleCodec.INSTANCE.getValueDecoder(); } return null; diff --git a/redisson/src/main/java/org/redisson/rx/ElementsStream.java b/redisson/src/main/java/org/redisson/rx/ElementsStream.java new file mode 100644 index 000000000..95fc51e07 --- /dev/null +++ b/redisson/src/main/java/org/redisson/rx/ElementsStream.java @@ -0,0 +1,79 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.rx; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.api.RFuture; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.reactivex.Flowable; +import io.reactivex.functions.Action; +import io.reactivex.functions.LongConsumer; +import io.reactivex.processors.ReplayProcessor; + +/** + * + * @author Nikita Koksharov + * + */ +public class ElementsStream { + + public static Flowable takeElements(final Callable> callable) { + final ReplayProcessor p = ReplayProcessor.create(); + return p.doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Exception { + final AtomicLong counter = new AtomicLong(n); + final AtomicReference> futureRef = new AtomicReference>(); + + take(callable, p, counter, futureRef); + + p.doOnCancel(new Action() { + @Override + public void run() throws Exception { + futureRef.get().cancel(true); + } + }); + } + }); + } + + private static void take(final Callable> factory, final ReplayProcessor p, final AtomicLong counter, final AtomicReference> futureRef) throws Exception { + RFuture future = factory.call(); + futureRef.set(future); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + p.onError(future.cause()); + return; + } + + p.onNext(future.getNow()); + if (counter.decrementAndGet() == 0) { + p.onComplete(); + } + + take(factory, p, counter, futureRef); + } + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java index d66285b17..d6f1934bd 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java @@ -38,7 +38,7 @@ public class RedissonBlockingDequeRx extends RedissonBlockingQueueRx { } public Flowable takeFirstElements() { - return takeElements(new Callable>() { + return ElementsStream.takeElements(new Callable>() { @Override public RFuture call() throws Exception { return queue.takeFirstAsync(); @@ -47,7 +47,7 @@ public class RedissonBlockingDequeRx extends RedissonBlockingQueueRx { } public Flowable takeLastElements() { - return takeElements(new Callable>() { + return ElementsStream.takeElements(new Callable>() { @Override public RFuture call() throws Exception { return queue.takeLastAsync(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java index 82aaa11d8..05d233aea 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java @@ -16,19 +16,12 @@ package org.redisson.rx; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.reactivex.Flowable; -import io.reactivex.functions.Action; -import io.reactivex.functions.LongConsumer; -import io.reactivex.processors.ReplayProcessor; /** * @@ -46,7 +39,7 @@ public class RedissonBlockingQueueRx extends RedissonListRx { } public Flowable takeElements() { - return takeElements(new Callable>() { + return ElementsStream.takeElements(new Callable>() { @Override public RFuture call() throws Exception { return queue.takeAsync(); @@ -54,45 +47,4 @@ public class RedissonBlockingQueueRx extends RedissonListRx { }); } - protected final Flowable takeElements(final Callable> callable) { - final ReplayProcessor p = ReplayProcessor.create(); - return p.doOnRequest(new LongConsumer() { - @Override - public void accept(long n) throws Exception { - final AtomicLong counter = new AtomicLong(n); - final AtomicReference> futureRef = new AtomicReference>(); - - take(callable, p, counter, futureRef); - - p.doOnCancel(new Action() { - @Override - public void run() throws Exception { - futureRef.get().cancel(true); - } - }); - } - }); - } - - private void take(final Callable> factory, final ReplayProcessor p, final AtomicLong counter, final AtomicReference> futureRef) throws Exception { - RFuture future = factory.call(); - futureRef.set(future); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - p.onError(future.cause()); - return; - } - - p.onNext(future.getNow()); - if (counter.decrementAndGet() == 0) { - p.onComplete(); - } - - take(factory, p, counter, futureRef); - } - }); - } - } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java index ad06baff7..2ce92e155 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java @@ -15,7 +15,8 @@ */ package org.redisson.rx; -import org.reactivestreams.Publisher; +import java.util.concurrent.Callable; + import org.redisson.RedissonScoredSortedSet; import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSetAsync; @@ -47,23 +48,41 @@ public class RedissonScoredSortedSetRx { }.create(); } + public Flowable takeFirst() { + return ElementsStream.takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return instance.takeFirstAsync(); + } + }); + } + + public Flowable takeLast() { + return ElementsStream.takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return instance.takeLastAsync(); + } + }); + } + public String getName() { return ((RedissonScoredSortedSet)instance).getName(); } - public Publisher iterator() { + public Flowable iterator() { return scanIteratorReactive(null, 10); } - public Publisher iterator(String pattern) { + public Flowable iterator(String pattern) { return scanIteratorReactive(pattern, 10); } - public Publisher iterator(int count) { + public Flowable iterator(int count) { return scanIteratorReactive(null, count); } - public Publisher iterator(String pattern, int count) { + public Flowable iterator(String pattern, int count) { return scanIteratorReactive(pattern, count); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 2cbcd1403..101297ea8 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -30,6 +30,21 @@ import org.redisson.client.protocol.ScoredEntry; public class RedissonScoredSortedSetTest extends BaseTest { + @Test + public void testTakeFirst() { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue1.add(0.1, 1); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.takeFirst(); + Assert.assertEquals(1, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + @Test public void testPollFirstFromAny() throws InterruptedException { final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany");