From c3d922cf710f9cf195cb1d5b74e7426b0b63b25b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 20 Nov 2018 19:03:30 +0300 Subject: [PATCH 1/5] Update README.md --- redisson-all/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/redisson-all/README.md b/redisson-all/README.md index 3d734b00c..d5fac5185 100644 --- a/redisson-all/README.md +++ b/redisson-all/README.md @@ -1,5 +1,7 @@ ## Redisson standalone node -Redisson offers ability to run as standalone node and participate in distributed computing. Such standalone nodes could be used to run [ExecutorService](./9.-distributed-services#93-executor-service), [ScheduledExecutorService](https://github.com/mrniko/redisson/wiki/9.-distributed-services#94-scheduled-executor-service) tasks or [RemoteService](./9.-distributed-services#91-remote-service) services. It's just a single jar and could be downloaded from [here](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.3.0&e=jar) +Redisson offers ability to run as standalone node and participate in distributed computing. Such Nodes are used to run [MapReduce](./9.-distributed-services/#95-distributed-mapreduce-service), [ExecutorService](./9.-distributed-services#93-executor-service), [ScheduledExecutorService](https://github.com/mrniko/redisson/wiki/9.-distributed-services#94-scheduled-executor-service) tasks or [RemoteService](./9.-distributed-services#91-remote-service) services. All tasks are kept in Redis until their execution moment. + +Packaged as a single jar and could be [downloaded](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.9.0&e=jar). [Documentation](https://github.com/mrniko/redisson/wiki/12.-Standalone-node) about Redisson standalone node. From 45120859ae394ca246bbf92cba968af11cf11cf7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 21 Nov 2018 12:27:06 +0300 Subject: [PATCH 2/5] Feature - takeFirst and takeLast methods added to RScoredSortedSet object --- .../org/redisson/RedissonScoredSortedSet.java | 20 +++++ .../org/redisson/api/RScoredSortedSet.java | 16 +++- .../redisson/api/RScoredSortedSetAsync.java | 14 ++++ .../api/RScoredSortedSetReactive.java | 13 +++ .../org/redisson/api/RScoredSortedSetRx.java | 13 +++ .../ScoredSortedSetPolledObjectDecoder.java | 4 +- .../java/org/redisson/rx/ElementsStream.java | 79 +++++++++++++++++++ .../redisson/rx/RedissonBlockingDequeRx.java | 4 +- .../redisson/rx/RedissonBlockingQueueRx.java | 50 +----------- .../rx/RedissonScoredSortedSetRx.java | 29 +++++-- .../redisson/RedissonScoredSortedSetTest.java | 15 ++++ 11 files changed, 198 insertions(+), 59 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/rx/ElementsStream.java diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 00459f7dd..0aa341ec5 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -1066,4 +1066,24 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray()); } + + @Override + public RFuture takeFirstAsync() { + return pollFirstAsync(0, TimeUnit.SECONDS); + } + + @Override + public RFuture takeLastAsync() { + return pollLastAsync(0, TimeUnit.SECONDS); + } + + @Override + public V takeFirst() { + return get(takeFirstAsync()); + } + + @Override + public V takeLast() { + return get(takeLastAsync()); + } } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index d9005de60..d8bbf29fc 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -78,7 +78,21 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @return the head element, or {@code null} if all sorted sets are empty */ V pollFirstFromAny(long timeout, TimeUnit unit, String ... queueNames); - + + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + V takeFirst(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + V takeLast(); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index a8f481507..804ac5ab3 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -78,6 +78,20 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn */ RFuture pollFirstAsync(long timeout, TimeUnit unit); + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + RFuture takeFirstAsync(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + RFuture takeLastAsync(); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. *

diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index df8fdb95a..2c59678d9 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -415,5 +415,18 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab */ Publisher union(Aggregate aggregate, Map nameWithWeight); + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + Publisher takeFirst(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + Publisher takeLast(); } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java index c2610bbdb..9703c13ff 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java @@ -416,5 +416,18 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> */ Flowable union(Aggregate aggregate, Map nameWithWeight); + /** + * Removes and returns the head element waiting if necessary for an element to become available. + * + * @return the head element + */ + Flowable takeFirst(); + + /** + * Removes and returns the tail element waiting if necessary for an element to become available. + * + * @return the tail element + */ + Flowable takeLast(); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java index 98c84a652..29f36ee26 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetPolledObjectDecoder.java @@ -32,7 +32,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder @Override public Object decode(List parts, State state) { if (!parts.isEmpty()) { - return parts.get(2); + return parts.get(1); } return null; } @@ -42,7 +42,7 @@ public class ScoredSortedSetPolledObjectDecoder implements MultiDecoder if (paramNum == 0) { return StringCodec.INSTANCE.getValueDecoder(); } - if (paramNum == 1) { + if (paramNum == 2) { return DoubleCodec.INSTANCE.getValueDecoder(); } return null; diff --git a/redisson/src/main/java/org/redisson/rx/ElementsStream.java b/redisson/src/main/java/org/redisson/rx/ElementsStream.java new file mode 100644 index 000000000..95fc51e07 --- /dev/null +++ b/redisson/src/main/java/org/redisson/rx/ElementsStream.java @@ -0,0 +1,79 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.rx; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.api.RFuture; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.reactivex.Flowable; +import io.reactivex.functions.Action; +import io.reactivex.functions.LongConsumer; +import io.reactivex.processors.ReplayProcessor; + +/** + * + * @author Nikita Koksharov + * + */ +public class ElementsStream { + + public static Flowable takeElements(final Callable> callable) { + final ReplayProcessor p = ReplayProcessor.create(); + return p.doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Exception { + final AtomicLong counter = new AtomicLong(n); + final AtomicReference> futureRef = new AtomicReference>(); + + take(callable, p, counter, futureRef); + + p.doOnCancel(new Action() { + @Override + public void run() throws Exception { + futureRef.get().cancel(true); + } + }); + } + }); + } + + private static void take(final Callable> factory, final ReplayProcessor p, final AtomicLong counter, final AtomicReference> futureRef) throws Exception { + RFuture future = factory.call(); + futureRef.set(future); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + p.onError(future.cause()); + return; + } + + p.onNext(future.getNow()); + if (counter.decrementAndGet() == 0) { + p.onComplete(); + } + + take(factory, p, counter, futureRef); + } + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java index d66285b17..d6f1934bd 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java @@ -38,7 +38,7 @@ public class RedissonBlockingDequeRx extends RedissonBlockingQueueRx { } public Flowable takeFirstElements() { - return takeElements(new Callable>() { + return ElementsStream.takeElements(new Callable>() { @Override public RFuture call() throws Exception { return queue.takeFirstAsync(); @@ -47,7 +47,7 @@ public class RedissonBlockingDequeRx extends RedissonBlockingQueueRx { } public Flowable takeLastElements() { - return takeElements(new Callable>() { + return ElementsStream.takeElements(new Callable>() { @Override public RFuture call() throws Exception { return queue.takeLastAsync(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java index 82aaa11d8..05d233aea 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java @@ -16,19 +16,12 @@ package org.redisson.rx; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; import org.redisson.api.RListAsync; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.reactivex.Flowable; -import io.reactivex.functions.Action; -import io.reactivex.functions.LongConsumer; -import io.reactivex.processors.ReplayProcessor; /** * @@ -46,7 +39,7 @@ public class RedissonBlockingQueueRx extends RedissonListRx { } public Flowable takeElements() { - return takeElements(new Callable>() { + return ElementsStream.takeElements(new Callable>() { @Override public RFuture call() throws Exception { return queue.takeAsync(); @@ -54,45 +47,4 @@ public class RedissonBlockingQueueRx extends RedissonListRx { }); } - protected final Flowable takeElements(final Callable> callable) { - final ReplayProcessor p = ReplayProcessor.create(); - return p.doOnRequest(new LongConsumer() { - @Override - public void accept(long n) throws Exception { - final AtomicLong counter = new AtomicLong(n); - final AtomicReference> futureRef = new AtomicReference>(); - - take(callable, p, counter, futureRef); - - p.doOnCancel(new Action() { - @Override - public void run() throws Exception { - futureRef.get().cancel(true); - } - }); - } - }); - } - - private void take(final Callable> factory, final ReplayProcessor p, final AtomicLong counter, final AtomicReference> futureRef) throws Exception { - RFuture future = factory.call(); - futureRef.set(future); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - p.onError(future.cause()); - return; - } - - p.onNext(future.getNow()); - if (counter.decrementAndGet() == 0) { - p.onComplete(); - } - - take(factory, p, counter, futureRef); - } - }); - } - } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java index ad06baff7..2ce92e155 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java @@ -15,7 +15,8 @@ */ package org.redisson.rx; -import org.reactivestreams.Publisher; +import java.util.concurrent.Callable; + import org.redisson.RedissonScoredSortedSet; import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSetAsync; @@ -47,23 +48,41 @@ public class RedissonScoredSortedSetRx { }.create(); } + public Flowable takeFirst() { + return ElementsStream.takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return instance.takeFirstAsync(); + } + }); + } + + public Flowable takeLast() { + return ElementsStream.takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return instance.takeLastAsync(); + } + }); + } + public String getName() { return ((RedissonScoredSortedSet)instance).getName(); } - public Publisher iterator() { + public Flowable iterator() { return scanIteratorReactive(null, 10); } - public Publisher iterator(String pattern) { + public Flowable iterator(String pattern) { return scanIteratorReactive(pattern, 10); } - public Publisher iterator(int count) { + public Flowable iterator(int count) { return scanIteratorReactive(null, count); } - public Publisher iterator(String pattern, int count) { + public Flowable iterator(String pattern, int count) { return scanIteratorReactive(pattern, count); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 2cbcd1403..101297ea8 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -30,6 +30,21 @@ import org.redisson.client.protocol.ScoredEntry; public class RedissonScoredSortedSetTest extends BaseTest { + @Test + public void testTakeFirst() { + final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue1.add(0.1, 1); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = queue1.takeFirst(); + Assert.assertEquals(1, l); + Assert.assertTrue(System.currentTimeMillis() - s > 2000); + } + @Test public void testPollFirstFromAny() throws InterruptedException { final RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); From 7352815440a15b91abee31055485502e41ba62e0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 21 Nov 2018 12:27:44 +0300 Subject: [PATCH 3/5] Fixed - Exception serialization by Jackson codec --- .../src/main/java/org/redisson/codec/JsonJacksonCodec.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 59228e686..708787126 100755 --- a/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/redisson/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -61,7 +61,10 @@ public class JsonJacksonCodec extends BaseCodec { public static final JsonJacksonCodec INSTANCE = new JsonJacksonCodec(); @JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id") - @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.PUBLIC_ONLY, setterVisibility = Visibility.PUBLIC_ONLY, isGetterVisibility = Visibility.PUBLIC_ONLY) + @JsonAutoDetect(fieldVisibility = Visibility.ANY, + getterVisibility = Visibility.PUBLIC_ONLY, + setterVisibility = Visibility.NONE, + isGetterVisibility = Visibility.NONE) public static class ThrowableMixIn { } From 12dc23db8d09daedb91376a48a8334f6c6b9020e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 21 Nov 2018 12:27:57 +0300 Subject: [PATCH 4/5] refactoring --- .../client/protocol/RedisCommands.java | 34 +++++----- .../decoder/ObjectMapReplayDecoder.java | 31 ++-------- .../decoder/StreamObjectMapReplayDecoder.java | 62 +++++++++++++++++++ 3 files changed, 84 insertions(+), 43 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/StreamObjectMapReplayDecoder.java diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 078e10f30..26a724c1a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -57,7 +57,6 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; -import org.redisson.client.protocol.decoder.ObjectMapDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; @@ -71,6 +70,7 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.client.protocol.decoder.StreamIdDecoder; import org.redisson.client.protocol.decoder.StreamIdListDecoder; +import org.redisson.client.protocol.decoder.StreamObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.StreamResultDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder; @@ -329,8 +329,8 @@ public interface RedisCommands { RedisCommand>>> XRANGE = new RedisCommand>>>("XRANGE", new ListMultiDecoder( new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET), + new StreamObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET), new ObjectMapJoinDecoder())); RedisCommand>>> XREVRANGE = @@ -340,10 +340,10 @@ public interface RedisCommands { new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), + new StreamObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new ObjectMapJoinDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamResultDecoder())); RedisCommand>>> XREAD_BLOCKING = new RedisCommand>>>("XREAD", XREAD.getReplayMultiDecoder()); @@ -352,10 +352,10 @@ public interface RedisCommands { new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), + new StreamObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new ObjectMapJoinDecoder(), - new ObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(), new StreamResultDecoder())); RedisCommand>> XREAD_BLOCKING_SINGLE = @@ -365,10 +365,10 @@ public interface RedisCommands { new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), + new StreamObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new ObjectMapJoinDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), new StreamResultDecoder())); RedisCommand>>> XREADGROUP_BLOCKING = @@ -378,10 +378,10 @@ public interface RedisCommands { new ListMultiDecoder( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1), + new StreamObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new ObjectMapJoinDecoder(), - new ObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(), new StreamResultDecoder())); RedisCommand> XCLAIM_IDS = new RedisCommand>("XCLAIM", new StreamIdListDecoder()); @@ -389,8 +389,8 @@ public interface RedisCommands { RedisCommand>> XCLAIM = new RedisCommand>>("XCLAIM", new ListMultiDecoder( new ObjectDecoder(new StreamIdDecoder()), - new ObjectMapReplayDecoder(), - new ObjectMapReplayDecoder(ListMultiDecoder.RESET), + new StreamObjectMapReplayDecoder(), + new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET), new ObjectMapJoinDecoder())); RedisCommand>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand>>("XREADGROUP", diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java index 10cde48de..7880b9798 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java @@ -29,40 +29,19 @@ import org.redisson.client.protocol.Decoder; */ public class ObjectMapReplayDecoder implements MultiDecoder> { - private Decoder codec; - - public ObjectMapReplayDecoder() { - } - - public ObjectMapReplayDecoder(Decoder codec) { - super(); - this.codec = codec; - } - @Override public Map decode(List parts, State state) { - if (parts.get(0) instanceof Map) { - Map result = new LinkedHashMap(parts.size()); - for (int i = 0; i < parts.size(); i++) { - result.putAll((Map) parts.get(i)); + Map result = new LinkedHashMap(parts.size()/2); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + result.put(parts.get(i-1), parts.get(i)); } - return result; - } else { - Map result = new LinkedHashMap(parts.size()/2); - for (int i = 0; i < parts.size(); i++) { - if (i % 2 != 0) { - result.put(parts.get(i-1), parts.get(i)); - } - } - return result; } + return result; } @Override public Decoder getDecoder(int paramNum, State state) { - if (codec != null) { - return codec; - } return null; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamObjectMapReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamObjectMapReplayDecoder.java new file mode 100644 index 000000000..c0c67b16e --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamObjectMapReplayDecoder.java @@ -0,0 +1,62 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class StreamObjectMapReplayDecoder extends ObjectMapReplayDecoder { + + private Decoder codec; + + public StreamObjectMapReplayDecoder() { + } + + public StreamObjectMapReplayDecoder(Decoder codec) { + super(); + this.codec = codec; + } + + @Override + public Map decode(List parts, State state) { + if (parts.get(0) instanceof Map) { + Map result = new LinkedHashMap(parts.size()); + for (int i = 0; i < parts.size(); i++) { + result.putAll((Map) parts.get(i)); + } + return result; + } + return super.decode(parts, state); + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + if (codec != null) { + return codec; + } + return null; + } + +} From d1ce467f8efcefe62866736377cc46e1952914c7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 21 Nov 2018 12:28:06 +0300 Subject: [PATCH 5/5] javadocs fixed --- .../main/java/org/redisson/api/RBlockingDequeRx.java | 10 ++++++---- .../main/java/org/redisson/api/RBlockingQueueRx.java | 6 +++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java b/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java index 73b9d58a0..9ab520301 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java @@ -112,16 +112,18 @@ public interface RBlockingDequeRx extends RDequeRx, RBlockingQueueRx { Flowable takeFirst(); /** - * Retrieves and removes stream of elements from the head of this queue. Waits for an element become available. + * Retrieves and removes continues stream of elements from the head of this queue. + * Waits for next element become available. * - * @return the head element of this queue + * @return stream of head elements */ Flowable takeFirstElements(); /** - * Retrieves and removes stream of elements from the tail of this queue. Waits for an element become available. + * Retrieves and removes continues stream of elements from the tail of this queue. + * Waits for next element become available. * - * @return the head element of this queue + * @return stream of tail elements */ Flowable takeLastElements(); diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java index f23fac4b2..a688c1fa2 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java @@ -155,10 +155,10 @@ public interface RBlockingQueueRx extends RQueueRx { Flowable put(V e); /** - * Retrieves and removes stream of elements from the head of this queue. - * Waits for an element become available. + * Retrieves and removes continues stream of elements from the head of this queue. + * Waits for next element become available. * - * @return stream of messages + * @return stream of elements */ Flowable takeElements();