Feature - takeFirst and takeLast methods added to RScoredSortedSet object

pull/1792/head
Nikita Koksharov 6 years ago
parent e9edbd27d9
commit 45120859ae

@ -1066,4 +1066,24 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray());
}
@Override
public RFuture<V> takeFirstAsync() {
return pollFirstAsync(0, TimeUnit.SECONDS);
}
@Override
public RFuture<V> takeLastAsync() {
return pollLastAsync(0, TimeUnit.SECONDS);
}
@Override
public V takeFirst() {
return get(takeFirstAsync());
}
@Override
public V takeLast() {
return get(takeLastAsync());
}
}

@ -78,7 +78,21 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @return the head element, or {@code null} if all sorted sets are empty
*/
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
V takeFirst();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
V takeLast();
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
*

@ -78,6 +78,20 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/
RFuture<V> pollFirstAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
RFuture<V> takeFirstAsync();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
RFuture<V> takeLastAsync();
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>

@ -415,5 +415,18 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/
Publisher<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
Publisher<V> takeFirst();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
Publisher<V> takeLast();
}

@ -416,5 +416,18 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/
Flowable<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
Flowable<V> takeFirst();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
Flowable<V> takeLast();
}

@ -32,7 +32,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder<Object>
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
return parts.get(2);
return parts.get(1);
}
return null;
}
@ -42,7 +42,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder<Object>
if (paramNum == 0) {
return StringCodec.INSTANCE.getValueDecoder();
}
if (paramNum == 1) {
if (paramNum == 2) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;

@ -0,0 +1,79 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
*/
public class ElementsStream {
public static <V> Flowable<V> takeElements(final Callable<RFuture<V>> callable) {
final ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
final AtomicLong counter = new AtomicLong(n);
final AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
private static <V> void take(final Callable<RFuture<V>> factory, final ReplayProcessor<V> p, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) throws Exception {
RFuture<V> future = factory.call();
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
p.onNext(future.getNow());
if (counter.decrementAndGet() == 0) {
p.onComplete();
}
take(factory, p, counter, futureRef);
}
});
}
}

@ -38,7 +38,7 @@ public class RedissonBlockingDequeRx<V> extends RedissonBlockingQueueRx<V> {
}
public Flowable<V> takeFirstElements() {
return takeElements(new Callable<RFuture<V>>() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeFirstAsync();
@ -47,7 +47,7 @@ public class RedissonBlockingDequeRx<V> extends RedissonBlockingQueueRx<V> {
}
public Flowable<V> takeLastElements() {
return takeElements(new Callable<RFuture<V>>() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeLastAsync();

@ -16,19 +16,12 @@
package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
@ -46,7 +39,7 @@ public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
}
public Flowable<V> takeElements() {
return takeElements(new Callable<RFuture<V>>() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeAsync();
@ -54,45 +47,4 @@ public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
});
}
protected final Flowable<V> takeElements(final Callable<RFuture<V>> callable) {
final ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
final AtomicLong counter = new AtomicLong(n);
final AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
private void take(final Callable<RFuture<V>> factory, final ReplayProcessor<V> p, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) throws Exception {
RFuture<V> future = factory.call();
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
p.onNext(future.getNow());
if (counter.decrementAndGet() == 0) {
p.onComplete();
}
take(factory, p, counter, futureRef);
}
});
}
}

@ -15,7 +15,8 @@
*/
package org.redisson.rx;
import org.reactivestreams.Publisher;
import java.util.concurrent.Callable;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSetAsync;
@ -47,23 +48,41 @@ public class RedissonScoredSortedSetRx<V> {
}.create();
}
public Flowable<V> takeFirst() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return instance.takeFirstAsync();
}
});
}
public Flowable<V> takeLast() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return instance.takeLastAsync();
}
});
}
public String getName() {
return ((RedissonScoredSortedSet<V>)instance).getName();
}
public Publisher<V> iterator() {
public Flowable<V> iterator() {
return scanIteratorReactive(null, 10);
}
public Publisher<V> iterator(String pattern) {
public Flowable<V> iterator(String pattern) {
return scanIteratorReactive(pattern, 10);
}
public Publisher<V> iterator(int count) {
public Flowable<V> iterator(int count) {
return scanIteratorReactive(null, count);
}
public Publisher<V> iterator(String pattern, int count) {
public Flowable<V> iterator(String pattern, int count) {
return scanIteratorReactive(pattern, count);
}

@ -30,6 +30,21 @@ import org.redisson.client.protocol.ScoredEntry;
public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testTakeFirst() {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue1.add(0.1, 1);
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.takeFirst();
Assert.assertEquals(1, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollFirstFromAny() throws InterruptedException {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");

Loading…
Cancel
Save