Merge branch 'master' into 3.0.0

pull/1821/head
Nikita Koksharov 6 years ago
commit 4abd81e49f

@ -1,5 +1,7 @@
## Redisson standalone node ## Redisson standalone node
Redisson offers ability to run as standalone node and participate in distributed computing. Such standalone nodes could be used to run [ExecutorService](./9.-distributed-services#93-executor-service), [ScheduledExecutorService](https://github.com/mrniko/redisson/wiki/9.-distributed-services#94-scheduled-executor-service) tasks or [RemoteService](./9.-distributed-services#91-remote-service) services. It's just a single jar and could be downloaded from [here](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.3.0&e=jar) Redisson offers ability to run as standalone node and participate in distributed computing. Such Nodes are used to run [MapReduce](./9.-distributed-services/#95-distributed-mapreduce-service), [ExecutorService](./9.-distributed-services#93-executor-service), [ScheduledExecutorService](https://github.com/mrniko/redisson/wiki/9.-distributed-services#94-scheduled-executor-service) tasks or [RemoteService](./9.-distributed-services#91-remote-service) services. All tasks are kept in Redis until their execution moment.
Packaged as a single jar and could be [downloaded](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.9.0&e=jar).
[Documentation](https://github.com/mrniko/redisson/wiki/12.-Standalone-node) about Redisson standalone node. [Documentation](https://github.com/mrniko/redisson/wiki/12.-Standalone-node) about Redisson standalone node.

@ -1066,4 +1066,24 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray()); return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray());
} }
@Override
public RFuture<V> takeFirstAsync() {
return pollFirstAsync(0, TimeUnit.SECONDS);
}
@Override
public RFuture<V> takeLastAsync() {
return pollLastAsync(0, TimeUnit.SECONDS);
}
@Override
public V takeFirst() {
return get(takeFirstAsync());
}
@Override
public V takeLast() {
return get(takeLastAsync());
}
} }

@ -112,16 +112,18 @@ public interface RBlockingDequeRx<V> extends RDequeRx<V>, RBlockingQueueRx<V> {
Flowable<V> takeFirst(); Flowable<V> takeFirst();
/** /**
* Retrieves and removes stream of elements from the head of this queue. Waits for an element become available. * Retrieves and removes continues stream of elements from the head of this queue.
* Waits for next element become available.
* *
* @return the head element of this queue * @return stream of head elements
*/ */
Flowable<V> takeFirstElements(); Flowable<V> takeFirstElements();
/** /**
* Retrieves and removes stream of elements from the tail of this queue. Waits for an element become available. * Retrieves and removes continues stream of elements from the tail of this queue.
* Waits for next element become available.
* *
* @return the head element of this queue * @return stream of tail elements
*/ */
Flowable<V> takeLastElements(); Flowable<V> takeLastElements();

@ -155,10 +155,10 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
Flowable<Void> put(V e); Flowable<Void> put(V e);
/** /**
* Retrieves and removes stream of elements from the head of this queue. * Retrieves and removes continues stream of elements from the head of this queue.
* Waits for an element become available. * Waits for next element become available.
* *
* @return stream of messages * @return stream of elements
*/ */
Flowable<V> takeElements(); Flowable<V> takeElements();

@ -79,6 +79,20 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/ */
V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
V takeFirst();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
V takeLast();
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.
* *

@ -78,6 +78,20 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/ */
RFuture<V> pollFirstAsync(long timeout, TimeUnit unit); RFuture<V> pollFirstAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
RFuture<V> takeFirstAsync();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
RFuture<V> takeLastAsync();
/** /**
* Removes and returns the tail element or {@code null} if this sorted set is empty. * Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p> * <p>

@ -417,5 +417,18 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/ */
Mono<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight); Mono<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
Publisher<V> takeFirst();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
Publisher<V> takeLast();
} }

@ -416,5 +416,18 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/ */
Flowable<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight); Flowable<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
* @return the head element
*/
Flowable<V> takeFirst();
/**
* Removes and returns the tail element waiting if necessary for an element to become available.
*
* @return the tail element
*/
Flowable<V> takeLast();
} }

@ -57,7 +57,6 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectDecoder; import org.redisson.client.protocol.decoder.ObjectDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
@ -71,6 +70,7 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.client.protocol.decoder.StreamIdDecoder; import org.redisson.client.protocol.decoder.StreamIdDecoder;
import org.redisson.client.protocol.decoder.StreamIdListDecoder; import org.redisson.client.protocol.decoder.StreamIdListDecoder;
import org.redisson.client.protocol.decoder.StreamObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.StreamResultDecoder; import org.redisson.client.protocol.decoder.StreamResultDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder;
@ -329,8 +329,8 @@ public interface RedisCommands {
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XRANGE", RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XRANGE",
new ListMultiDecoder( new ListMultiDecoder(
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder())); new ObjectMapJoinDecoder()));
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE = RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE =
@ -340,10 +340,10 @@ public interface RedisCommands {
new ListMultiDecoder( new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1),
new ObjectMapJoinDecoder(), new ObjectMapJoinDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder())); new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder()); RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder());
@ -352,10 +352,10 @@ public interface RedisCommands {
new ListMultiDecoder( new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1),
new ObjectMapJoinDecoder(), new ObjectMapJoinDecoder(),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new StreamResultDecoder())); new StreamResultDecoder()));
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE =
@ -365,10 +365,10 @@ public interface RedisCommands {
new ListMultiDecoder( new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1),
new ObjectMapJoinDecoder(), new ObjectMapJoinDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder())); new StreamResultDecoder()));
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP_BLOCKING = RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP_BLOCKING =
@ -378,10 +378,10 @@ public interface RedisCommands {
new ListMultiDecoder( new ListMultiDecoder(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1),
new ObjectMapJoinDecoder(), new ObjectMapJoinDecoder(),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new StreamResultDecoder())); new StreamResultDecoder()));
RedisCommand<List<StreamMessageId>> XCLAIM_IDS = new RedisCommand<List<StreamMessageId>>("XCLAIM", new StreamIdListDecoder()); RedisCommand<List<StreamMessageId>> XCLAIM_IDS = new RedisCommand<List<StreamMessageId>>("XCLAIM", new StreamIdListDecoder());
@ -389,8 +389,8 @@ public interface RedisCommands {
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XCLAIM = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XCLAIM", RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XCLAIM = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XCLAIM",
new ListMultiDecoder( new ListMultiDecoder(
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new ObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder(),
new ObjectMapReplayDecoder(ListMultiDecoder.RESET), new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder())); new ObjectMapJoinDecoder()));
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP", RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",

@ -29,25 +29,8 @@ import org.redisson.client.protocol.Decoder;
*/ */
public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> { public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
private Decoder<Object> codec;
public ObjectMapReplayDecoder() {
}
public ObjectMapReplayDecoder(Decoder<Object> codec) {
super();
this.codec = codec;
}
@Override @Override
public Map<Object, Object> decode(List<Object> parts, State state) { public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.get(0) instanceof Map) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size());
for (int i = 0; i < parts.size(); i++) {
result.putAll((Map<? extends Object, ? extends Object>) parts.get(i));
}
return result;
} else {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2); Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) { for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) { if (i % 2 != 0) {
@ -56,13 +39,9 @@ public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>>
} }
return result; return result;
} }
}
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
if (codec != null) {
return codec;
}
return null; return null;
} }

@ -32,7 +32,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder<Object>
@Override @Override
public Object decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) { if (!parts.isEmpty()) {
return parts.get(2); return parts.get(1);
} }
return null; return null;
} }
@ -42,7 +42,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder<Object>
if (paramNum == 0) { if (paramNum == 0) {
return StringCodec.INSTANCE.getValueDecoder(); return StringCodec.INSTANCE.getValueDecoder();
} }
if (paramNum == 1) { if (paramNum == 2) {
return DoubleCodec.INSTANCE.getValueDecoder(); return DoubleCodec.INSTANCE.getValueDecoder();
} }
return null; return null;

@ -0,0 +1,62 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamObjectMapReplayDecoder extends ObjectMapReplayDecoder {
private Decoder<Object> codec;
public StreamObjectMapReplayDecoder() {
}
public StreamObjectMapReplayDecoder(Decoder<Object> codec) {
super();
this.codec = codec;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.get(0) instanceof Map) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size());
for (int i = 0; i < parts.size(); i++) {
result.putAll((Map<? extends Object, ? extends Object>) parts.get(i));
}
return result;
}
return super.decode(parts, state);
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (codec != null) {
return codec;
}
return null;
}
}

@ -61,7 +61,10 @@ public class JsonJacksonCodec extends BaseCodec {
public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec();
@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id") @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id")
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY) @JsonAutoDetect(fieldVisibility = Visibility.ANY,
getterVisibility = Visibility.PUBLIC_ONLY,
setterVisibility = Visibility.NONE,
isGetterVisibility = Visibility.NONE)
public static class ThrowableMixIn { public static class ThrowableMixIn {
} }

@ -0,0 +1,79 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/**
*
* @author Nikita Koksharov
*
*/
public class ElementsStream {
public static <V> Flowable<V> takeElements(final Callable<RFuture<V>> callable) {
final ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
final AtomicLong counter = new AtomicLong(n);
final AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
private static <V> void take(final Callable<RFuture<V>> factory, final ReplayProcessor<V> p, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) throws Exception {
RFuture<V> future = factory.call();
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
p.onNext(future.getNow());
if (counter.decrementAndGet() == 0) {
p.onComplete();
}
take(factory, p, counter, futureRef);
}
});
}
}

@ -38,7 +38,7 @@ public class RedissonBlockingDequeRx<V> extends RedissonBlockingQueueRx<V> {
} }
public Flowable<V> takeFirstElements() { public Flowable<V> takeFirstElements() {
return takeElements(new Callable<RFuture<V>>() { return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override @Override
public RFuture<V> call() throws Exception { public RFuture<V> call() throws Exception {
return queue.takeFirstAsync(); return queue.takeFirstAsync();
@ -47,7 +47,7 @@ public class RedissonBlockingDequeRx<V> extends RedissonBlockingQueueRx<V> {
} }
public Flowable<V> takeLastElements() { public Flowable<V> takeLastElements() {
return takeElements(new Callable<RFuture<V>>() { return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override @Override
public RFuture<V> call() throws Exception { public RFuture<V> call() throws Exception {
return queue.takeLastAsync(); return queue.takeLastAsync();

@ -16,19 +16,12 @@
package org.redisson.rx; package org.redisson.rx;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RListAsync; import org.redisson.api.RListAsync;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable; import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
/** /**
* *
@ -46,7 +39,7 @@ public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
} }
public Flowable<V> takeElements() { public Flowable<V> takeElements() {
return takeElements(new Callable<RFuture<V>>() { return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override @Override
public RFuture<V> call() throws Exception { public RFuture<V> call() throws Exception {
return queue.takeAsync(); return queue.takeAsync();
@ -54,45 +47,4 @@ public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
}); });
} }
protected final Flowable<V> takeElements(final Callable<RFuture<V>> callable) {
final ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
final AtomicLong counter = new AtomicLong(n);
final AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
futureRef.get().cancel(true);
}
});
}
});
}
private void take(final Callable<RFuture<V>> factory, final ReplayProcessor<V> p, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) throws Exception {
RFuture<V> future = factory.call();
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
p.onNext(future.getNow());
if (counter.decrementAndGet() == 0) {
p.onComplete();
}
take(factory, p, counter, futureRef);
}
});
}
} }

@ -15,7 +15,8 @@
*/ */
package org.redisson.rx; package org.redisson.rx;
import org.reactivestreams.Publisher; import java.util.concurrent.Callable;
import org.redisson.RedissonScoredSortedSet; import org.redisson.RedissonScoredSortedSet;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScoredSortedSetAsync;
@ -47,23 +48,41 @@ public class RedissonScoredSortedSetRx<V> {
}.create(); }.create();
} }
public Flowable<V> takeFirst() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return instance.takeFirstAsync();
}
});
}
public Flowable<V> takeLast() {
return ElementsStream.takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return instance.takeLastAsync();
}
});
}
public String getName() { public String getName() {
return ((RedissonScoredSortedSet<V>)instance).getName(); return ((RedissonScoredSortedSet<V>)instance).getName();
} }
public Publisher<V> iterator() { public Flowable<V> iterator() {
return scanIteratorReactive(null, 10); return scanIteratorReactive(null, 10);
} }
public Publisher<V> iterator(String pattern) { public Flowable<V> iterator(String pattern) {
return scanIteratorReactive(pattern, 10); return scanIteratorReactive(pattern, 10);
} }
public Publisher<V> iterator(int count) { public Flowable<V> iterator(int count) {
return scanIteratorReactive(null, count); return scanIteratorReactive(null, count);
} }
public Publisher<V> iterator(String pattern, int count) { public Flowable<V> iterator(String pattern, int count) {
return scanIteratorReactive(pattern, count); return scanIteratorReactive(pattern, count);
} }

@ -30,6 +30,21 @@ import org.redisson.client.protocol.ScoredEntry;
public class RedissonScoredSortedSetTest extends BaseTest { public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testTakeFirst() {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue1.add(0.1, 1);
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.takeFirst();
Assert.assertEquals(1, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test @Test
public void testPollFirstFromAny() throws InterruptedException { public void testPollFirstFromAny() throws InterruptedException {
final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany"); final RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");

Loading…
Cancel
Save