From 61d7eac98ed21bf416b9b1636179d1b03237b205 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 19 Jun 2018 16:51:54 +0300 Subject: [PATCH] Redis Stream implementation. Add, Read and Range methods added #1490 --- .../src/main/java/org/redisson/Redisson.java | 11 + .../main/java/org/redisson/RedissonBatch.java | 11 + .../java/org/redisson/RedissonStream.java | 409 ++++++++++++++++++ .../main/java/org/redisson/api/RBatch.java | 22 + .../main/java/org/redisson/api/RStream.java | 251 +++++++++++ .../java/org/redisson/api/RStreamAsync.java | 255 +++++++++++ .../java/org/redisson/api/RedissonClient.java | 28 ++ .../main/java/org/redisson/api/StreamId.java | 115 +++++ .../redisson/client/protocol/CommandData.java | 4 +- .../client/protocol/RedisCommands.java | 36 +- .../protocol/convertor/StreamIdConvertor.java | 33 ++ .../protocol/decoder/ListMultiDecoder.java | 36 +- .../decoder/ObjectMapJoinDecoder.java | 46 ++ .../decoder/ObjectMapReplayDecoder.java | 27 +- .../protocol/decoder/StreamResultDecoder.java | 47 ++ .../redisson/command/CommandAsyncService.java | 22 +- .../java/org/redisson/RedissonStreamTest.java | 244 +++++++++++ 17 files changed, 1588 insertions(+), 9 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/RedissonStream.java create mode 100644 redisson/src/main/java/org/redisson/api/RStream.java create mode 100644 redisson/src/main/java/org/redisson/api/RStreamAsync.java create mode 100644 redisson/src/main/java/org/redisson/api/StreamId.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java create mode 100644 redisson/src/test/java/org/redisson/RedissonStreamTest.java diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 9b1751952..efb1ae4e7 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -72,6 +72,7 @@ import org.redisson.api.RSetCache; import org.redisson.api.RSetMultimap; import org.redisson.api.RSetMultimapCache; import org.redisson.api.RSortedSet; +import org.redisson.api.RStream; import org.redisson.api.RTopic; import org.redisson.api.RTransaction; import org.redisson.api.RedissonClient; @@ -190,6 +191,16 @@ public class Redisson implements RedissonClient { return react; } + @Override + public RStream getStream(String name) { + return new RedissonStream(connectionManager.getCommandExecutor(), name); + } + + @Override + public RStream getStream(String name, Codec codec) { + return new RedissonStream(codec, connectionManager.getCommandExecutor(), name); + } + @Override public RBinaryStream getBinaryStream(String name) { return new RedissonBinaryStream(connectionManager.getCommandExecutor(), name); diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index bd87bec7d..773c0c0d7 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -42,6 +42,7 @@ import org.redisson.api.RScoredSortedSetAsync; import org.redisson.api.RScriptAsync; import org.redisson.api.RSetAsync; import org.redisson.api.RSetCacheAsync; +import org.redisson.api.RStreamAsync; import org.redisson.api.RTopicAsync; import org.redisson.client.codec.Codec; import org.redisson.command.CommandBatchService; @@ -326,4 +327,14 @@ public class RedissonBatch implements RBatch { this.executorService.enableRedissonReferenceSupport(redisson); } + @Override + public RStreamAsync getStream(String name) { + return new RedissonStream(executorService, name); + } + + @Override + public RStreamAsync getStream(String name, Codec codec) { + return new RedissonStream(codec, executorService, name); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java new file mode 100644 index 000000000..1975ab5b8 --- /dev/null +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -0,0 +1,409 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.redisson.api.RFuture; +import org.redisson.api.RStream; +import org.redisson.api.StreamId; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandAsyncExecutor; + +/** + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public class RedissonStream extends RedissonExpirable implements RStream { + + public RedissonStream(Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(codec, connectionManager, name); + } + + public RedissonStream(CommandAsyncExecutor connectionManager, String name) { + super(connectionManager, name); + } + + protected void checkKey(Object key) { + if (key == null) { + throw new NullPointerException("key can't be null"); + } + } + + protected void checkValue(Object value) { + if (value == null) { + throw new NullPointerException("value can't be null"); + } + } + + @Override + public StreamId addAll(Map entries) { + return addAll(entries, 0, false); + } + + @Override + public RFuture addAllAsync(Map entries) { + return addAllAsync(entries, 0, false); + } + + @Override + public void addAll(StreamId id, Map entries) { + addAll(id, entries, 0, false); + } + + @Override + public RFuture addAllAsync(StreamId id, Map entries) { + return addAllAsync(id, entries, 0, false); + } + + @Override + public StreamId addAll(Map entries, int trimLen, boolean trimStrict) { + return get(addAllAsync(entries, trimLen, trimStrict)); + } + + @Override + public RFuture addAllAsync(Map entries, int trimLen, boolean trimStrict) { + return addAllCustomAsync(null, entries, trimLen, trimStrict); + } + + @Override + public void addAll(StreamId id, Map entries, int trimLen, boolean trimStrict) { + get(addAllAsync(id, entries, trimLen, trimStrict)); + } + + private RFuture addAllCustomAsync(StreamId id, Map entries, int trimLen, boolean trimStrict) { + List params = new ArrayList(entries.size()*2 + 1); + params.add(getName()); + + if (trimLen > 0) { + params.add("MAXLEN"); + if (!trimStrict) { + params.add("~"); + } + params.add(trimLen); + } + + if (id == null) { + params.add("*"); + } else { + params.add(id.toString()); + } + + for (java.util.Map.Entry t : entries.entrySet()) { + checkKey(t.getKey()); + checkValue(t.getValue()); + + params.add(encodeMapKey(t.getKey())); + params.add(encodeMapValue(t.getValue())); + } + + if (id == null) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD, params.toArray()); + } + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray()); + } + + @Override + public RFuture addAllAsync(StreamId id, Map entries, int trimLen, boolean trimStrict) { + return addAllCustomAsync(id, entries, trimLen, trimStrict); + } + + @Override + public long size() { + return get(sizeAsync()); + } + + @Override + public RFuture sizeAsync() { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XLEN, getName()); + } + + @Override + public Map> read(int count, StreamId ... ids) { + return get(readAsync(count, ids)); + } + + @Override + public RFuture>>> readAsync(int count, Collection keys, StreamId ... ids) { + return readAsync(count, -1, null, keys, ids); + } + + @Override + public Map> read(int count, long timeout, TimeUnit unit, StreamId... ids) { + return get(readAsync(count, timeout, unit, ids)); + } + + @Override + public RFuture>>> readAsync(int count, long timeout, TimeUnit unit, Collection keys, StreamId... ids) { + if (keys.size() + 1 != ids.length) { + throw new IllegalArgumentException("keys amount should be lower by one than ids amount"); + } + + List params = new ArrayList(); + if (count > 0) { + params.add("COUNT"); + params.add(count); + } + + if (timeout > 0) { + params.add("BLOCK"); + params.add(toSeconds(timeout, unit)*1000); + } + + params.add("STREAMS"); + params.add(getName()); + if (keys != null) { + for (String key : keys) { + params.add(key); + } + } + + for (StreamId id : ids) { + params.add(id.toString()); + } + + if (timeout > 0) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_BLOCKING, params.toArray()); + } + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD, params.toArray()); + } + + @Override + public RFuture addAsync(K key, V value) { + return addAsync(key, value, 0, false); + } + + @Override + public RFuture addAsync(StreamId id, K key, V value) { + return addAsync(id, key, value, 0, false); + } + + @Override + public RFuture addAsync(K key, V value, int trimLen, boolean trimStrict) { + return addCustomAsync(null, key, value, trimLen, trimStrict); + } + + private RFuture addCustomAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) { + List params = new LinkedList(); + params.add(getName()); + + if (trimLen > 0) { + params.add("MAXLEN"); + if (!trimStrict) { + params.add("~"); + } + params.add(trimLen); + } + + if (id == null) { + params.add("*"); + } else { + params.add(id.toString()); + } + + checkKey(key); + checkValue(value); + + params.add(encodeMapKey(key)); + params.add(encodeMapValue(value)); + + if (id == null) { + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD, params.toArray()); + } + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray()); + } + + @Override + public RFuture addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) { + return addCustomAsync(id, key, value, trimLen, trimStrict); + } + + @Override + public StreamId add(K key, V value) { + return get(addAsync(key, value)); + } + + @Override + public void add(StreamId id, K key, V value) { + get(addAsync(id, key, value)); + } + + @Override + public StreamId add(K key, V value, int trimLen, boolean trimStrict) { + return get(addAsync(key, value, trimLen, trimStrict)); + } + + @Override + public void add(StreamId id, K key, V value, int trimLen, boolean trimStrict) { + get(addAsync(id, key, value, trimLen, trimStrict)); + } + + @Override + public RFuture>> readAsync(int count, StreamId... ids) { + return readAsync(count, 0, null, ids); + } + + @Override + public RFuture>> readAsync(int count, long timeout, TimeUnit unit, + StreamId... ids) { + List params = new ArrayList(); + if (count > 0) { + params.add("COUNT"); + params.add(count); + } + + if (timeout > 0) { + params.add("BLOCK"); + params.add(toSeconds(timeout, unit)*1000); + } + + params.add("STREAMS"); + params.add(getName()); + + for (StreamId id : ids) { + params.add(id.toString()); + } + + if (timeout > 0) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_BLOCKING_SINGLE, params.toArray()); + } + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_SINGLE, params.toArray()); + } + + @Override + public Map>> read(int count, Collection keys, StreamId... ids) { + return get(readAsync(count, keys, ids)); + } + + @Override + public Map>> read(int count, long timeout, TimeUnit unit, Collection keys, + StreamId... ids) { + return get(readAsync(count, timeout, unit, keys, ids)); + } + + @Override + public RFuture>> rangeAsync(int count, StreamId startId, StreamId endId) { + List params = new LinkedList(); + params.add(getName()); + params.add(startId); + params.add(endId); + + if (count > 0) { + params.add("COUNT"); + params.add(count); + } + + return commandExecutor.readAsync(getName(), codec, RedisCommands.XRANGE, params.toArray()); + } + + @Override + public Map> range(int count, StreamId startId, StreamId endId) { + return get(rangeAsync(count, startId, endId)); + } + + @Override + public RFuture>> rangeReversedAsync(int count, StreamId startId, StreamId endId) { + List params = new LinkedList(); + params.add(getName()); + params.add(startId); + params.add(endId); + + if (count > 0) { + params.add("COUNT"); + params.add(count); + } + + return commandExecutor.readAsync(getName(), codec, RedisCommands.XREVRANGE, params.toArray()); + } + + @Override + public Map> rangeReversed(int count, StreamId startId, StreamId endId) { + return get(rangeReversedAsync(count, startId, endId)); + } + + @Override + public RFuture>> readAsync(StreamId... ids) { + return readAsync(0, ids); + } + + @Override + public RFuture>> readAsync(long timeout, TimeUnit unit, StreamId... ids) { + return readAsync(0, timeout, unit, ids); + } + + @Override + public RFuture>>> readAsync(Collection keys, StreamId... ids) { + return readAsync(0, keys, ids); + } + + @Override + public RFuture>>> readAsync(long timeout, TimeUnit unit, + Collection keys, StreamId... ids) { + return readAsync(0, timeout, unit, keys, ids); + } + + @Override + public RFuture>> rangeAsync(StreamId startId, StreamId endId) { + return rangeAsync(0, startId, endId); + } + + @Override + public RFuture>> rangeReversedAsync(StreamId startId, StreamId endId) { + return rangeReversedAsync(0, startId, endId); + } + + @Override + public Map> read(StreamId... ids) { + return read(0, ids); + } + + @Override + public Map> read(long timeout, TimeUnit unit, StreamId... ids) { + return read(0, timeout, unit, ids); + } + + @Override + public Map>> read(Collection keys, StreamId... ids) { + return read(0, keys, ids); + } + + @Override + public Map>> read(long timeout, TimeUnit unit, Collection keys, + StreamId... ids) { + return read(0, timeout, unit, keys, ids); + } + + @Override + public Map> range(StreamId startId, StreamId endId) { + return range(0, startId, endId); + } + + @Override + public Map> rangeReversed(StreamId startId, StreamId endId) { + return rangeReversed(0, startId, endId); + } + +} diff --git a/redisson/src/main/java/org/redisson/api/RBatch.java b/redisson/src/main/java/org/redisson/api/RBatch.java index 052635d03..42cfdb0fa 100644 --- a/redisson/src/main/java/org/redisson/api/RBatch.java +++ b/redisson/src/main/java/org/redisson/api/RBatch.java @@ -33,6 +33,28 @@ import org.redisson.client.codec.Codec; */ public interface RBatch { + /** + * Returns stream instance by name + * + * @param type of key + * @param type of value + * @param name of stream + * @return RStream object + */ + RStreamAsync getStream(String name); + + /** + * Returns stream instance by name + * using provided codec for entries. + * + * @param type of key + * @param type of value + * @param name - name of stream + * @param codec - codec for entry + * @return RStream object + */ + RStreamAsync getStream(String name, Codec codec); + /** * Returns geospatial items holder instance by name. * diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java new file mode 100644 index 000000000..471d8a546 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -0,0 +1,251 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Redis Stream implementation. + *

+ * Requires Redis 5.0.0 and higher. + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface RStream extends RStreamAsync, RExpirable { + + /** + * Returns number of entries in stream + * + * @return size of stream + */ + long size(); + + /** + * Appends a new entry and returns generated Stream ID + * + * @param key - key of entry + * @param value - value of entry + * @return Stream ID + */ + StreamId add(K key, V value); + + /** + * Appends a new entry by specified Stream ID + * + * @param id - Stream ID + * @param key - key of entry + * @param value - value of entry + */ + void add(StreamId id, K key, V value); + + /** + * Appends a new entry and returns generated Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param key - key of entry + * @param value - value of entry + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return Stream ID + */ + StreamId add(K key, V value, int trimLen, boolean trimStrict); + + /** + * Appends a new entry by specified Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param id - Stream ID + * @param key - key of entry + * @param value - value of entry + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + */ + void add(StreamId id, K key, V value, int trimLen, boolean trimStrict); + + /** + * Appends new entries and returns generated Stream ID + * + * @param entries - entries to add + * @return Stream ID + */ + StreamId addAll(Map entries); + + /** + * Appends new entries by specified Stream ID + * + * @param id - Stream ID + * @param entries - entries to add + */ + void addAll(StreamId id, Map entries); + + /** + * Appends new entries and returns generated Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param entries - entries to add + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return Stream ID + */ + StreamId addAll(Map entries, int trimLen, boolean trimStrict); + + /** + * Appends new entries by specified Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param id - Stream ID + * @param entries - entries to add + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + */ + void addAll(StreamId id, Map entries, int trimLen, boolean trimStrict); + + /** + * Read stream data by specified collection of Stream IDs. + * + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> read(StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * + * @param count - stream data size limit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> read(int count, StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> read(long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + Map> read(int count, long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + Map>> read(Collection keys, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * + * @param count - stream data size limit + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + Map>> read(int count, Collection keys, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * Wait for first stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + Map>> read(long timeout, TimeUnit unit, Collection keys, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * Wait for first stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + Map>> read(int count, long timeout, TimeUnit unit, Collection keys, StreamId ... ids); + + /** + * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Map> range(StreamId startId, StreamId endId); + + /** + * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param count - stream data size limit + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Map> range(int count, StreamId startId, StreamId endId); + + /** + * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Map> rangeReversed(StreamId startId, StreamId endId); + + /** + * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param count - stream data size limit + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + Map> rangeReversed(int count, StreamId startId, StreamId endId); + +} diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java new file mode 100644 index 000000000..bd8a53a13 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -0,0 +1,255 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Redis Stream implementation. + *

+ * Requires Redis 5.0.0 and higher. + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface RStreamAsync extends RExpirableAsync { + + /** + * Returns number of entries in stream + * + * @return size of stream + */ + RFuture sizeAsync(); + + /** + * Appends a new entry and returns generated Stream ID + * + * @param key - key of entry + * @param value - value of entry + * @return Stream ID + */ + RFuture addAsync(K key, V value); + + /** + * Appends a new entry by specified Stream ID + * + * @param id - Stream ID + * @param key - key of entry + * @param value - value of entry + * @return void + */ + RFuture addAsync(StreamId id, K key, V value); + + /** + * Appends a new entry and returns generated Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param key - key of entry + * @param value - value of entry + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return Stream ID + */ + RFuture addAsync(K key, V value, int trimLen, boolean trimStrict); + + /** + * Appends a new entry by specified Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param id - Stream ID + * @param key - key of entry + * @param value - value of entry + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return void + */ + RFuture addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict); + + /** + * Appends new entries and returns generated Stream ID + * + * @param entries - entries to add + * @return Stream ID + */ + RFuture addAllAsync(Map entries); + + /** + * Appends new entries by specified Stream ID + * + * @param id - Stream ID + * @param entries - entries to add + * @return void + */ + RFuture addAllAsync(StreamId id, Map entries); + + /** + * Appends new entries and returns generated Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param entries - entries to add + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return Stream ID + */ + RFuture addAllAsync(Map entries, int trimLen, boolean trimStrict); + + /** + * Appends new entries by specified Stream ID. + * Trims stream to a specified trimLen size. + * If trimStrict is false then trims to few tens of entries more than specified length to trim. + * + * @param id - Stream ID + * @param entries - entries to add + * @param trimLen - length to trim + * @param trimStrict - if false then trims to few tens of entries more than specified length to trim + * @return void + */ + RFuture addAllAsync(StreamId id, Map entries, int trimLen, boolean trimStrict); + + /** + * Read stream data by specified collection of Stream IDs. + * + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readAsync(StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * + * @param count - stream data size limit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readAsync(int count, StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readAsync(long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data by specified collection of Stream IDs. + * Wait for stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param ids - collection of Stream IDs + * @return stream data mapped by Stream ID + */ + RFuture>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + RFuture>>> readAsync(Collection keys, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * + * @param count - stream data size limit + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + RFuture>>> readAsync(int count, Collection keys, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * Wait for first stream data availability for specified timeout interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + RFuture>>> readAsync(long timeout, TimeUnit unit, Collection keys, StreamId ... ids); + + /** + * Read stream data by specified collection of keys including this stream and Stream ID per key. + * First Stream ID is related to this stream. + * Wait for first stream data availability for specified timeout interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param keys - collection of keys + * @param ids - collection of Stream IDs + * @return stream data mapped by key and Stream ID + */ + RFuture>>> readAsync(int count, long timeout, TimeUnit unit, Collection keys, StreamId ... ids); + + /** + * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + RFuture>> rangeAsync(StreamId startId, StreamId endId); + + /** + * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param count - stream data size limit + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + RFuture>> rangeAsync(int count, StreamId startId, StreamId endId); + + /** + * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + RFuture>> rangeReversedAsync(StreamId startId, StreamId endId); + + /** + * Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included). + * + * @param count - stream data size limit + * @param startId - start Stream ID + * @param endId - end Stream ID + * @return stream data mapped by Stream ID + */ + RFuture>> rangeReversedAsync(int count, StreamId startId, StreamId endId); + +} diff --git a/redisson/src/main/java/org/redisson/api/RedissonClient.java b/redisson/src/main/java/org/redisson/api/RedissonClient.java index c858888a8..458effdbf 100755 --- a/redisson/src/main/java/org/redisson/api/RedissonClient.java +++ b/redisson/src/main/java/org/redisson/api/RedissonClient.java @@ -23,12 +23,40 @@ import org.redisson.config.Config; /** * Main Redisson interface for access * to all redisson objects with sync/async interface. + * + * @see RedissonReactiveClient * * @author Nikita Koksharov * */ public interface RedissonClient { + /** + * Returns stream instance by name + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name of stream + * @return RStream object + */ + RStream getStream(String name); + + /** + * Returns stream instance by name + * using provided codec for entries. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name - name of stream + * @param codec - codec for entry + * @return RStream object + */ + RStream getStream(String name, Codec codec); + /** * Returns rate limiter instance by name * diff --git a/redisson/src/main/java/org/redisson/api/StreamId.java b/redisson/src/main/java/org/redisson/api/StreamId.java new file mode 100644 index 000000000..ddf6bdbc8 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/StreamId.java @@ -0,0 +1,115 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * Stream ID used by Redis Stream + * + * @author Nikita Koksharov + * + */ +public class StreamId { + + /** + * Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods + */ + public static final StreamId MIN = new StreamId(-1); + + /** + * Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods + */ + public static final StreamId MAX = new StreamId(-1); + + /** + * Defines latest id to receive Stream entries added since method invocation. + *

+ * Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods + */ + public static final StreamId NEWEST = new StreamId(-1); + + private long id0; + private long id1; + + public StreamId(long id0) { + super(); + this.id0 = id0; + } + + public StreamId(long id0, long id1) { + super(); + this.id0 = id0; + this.id1 = id1; + } + + /** + * Returns first part of ID + * + * @return first part of ID + */ + public long getId0() { + return id0; + } + + /** + * Returns second part of ID + * + * @return second part of ID + */ + public long getId1() { + return id1; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (id0 ^ (id0 >>> 32)); + result = prime * result + (int) (id1 ^ (id1 >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + StreamId other = (StreamId) obj; + if (id0 != other.id0) + return false; + if (id1 != other.id1) + return false; + return true; + } + + @Override + public String toString() { + if (this == NEWEST) { + return "$"; + } + if (this == MIN) { + return "-"; + } + if (this == MAX) { + return "+"; + } + + return id0 + "-" + id1; + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java index a276dbd69..b04ee7470 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java @@ -97,7 +97,9 @@ public class CommandData implements QueueCommand { } public boolean isBlockingCommand() { - return RedisCommands.BLOCKING_COMMANDS.contains(command.getName()); + return RedisCommands.BLOCKING_COMMANDS.contains(command.getName()) + || RedisCommands.XREAD_BLOCKING_SINGLE == command + || RedisCommands.XREAD_BLOCKING == command; } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 83db0d1f9..c719a0ec3 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -16,13 +16,17 @@ package org.redisson.client.protocol; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.redisson.api.RType; +import org.redisson.api.StreamId; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor; import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor; @@ -36,6 +40,8 @@ import org.redisson.client.protocol.convertor.DoubleReplayConvertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.KeyValueConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; +import org.redisson.client.protocol.convertor.SingleConvertor; +import org.redisson.client.protocol.convertor.StreamIdConvertor; import org.redisson.client.protocol.convertor.TimeObjectDecoder; import org.redisson.client.protocol.convertor.TrueReplayConvertor; import org.redisson.client.protocol.convertor.TypeConvertor; @@ -54,6 +60,7 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder; @@ -61,6 +68,7 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder; import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder; import org.redisson.client.protocol.decoder.SlotsDecoder; +import org.redisson.client.protocol.decoder.StreamResultDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapDataDecoder; @@ -298,8 +306,34 @@ public interface RedisCommands { RedisCommand SETPXNX = new RedisCommand("SET", new BooleanNotNullReplayConvertor()); RedisCommand SETNX = new RedisCommand("SETNX", new BooleanReplayConvertor()); RedisCommand PSETEX = new RedisCommand("PSETEX", new VoidReplayConvertor()); + + RedisCommand>>> XREVRANGE = new RedisCommand>>>("XREVRANGE", + new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET, new StreamIdConvertor()), + new ObjectMapJoinDecoder())); + + RedisCommand>>> XRANGE = new RedisCommand>>>("XRANGE", + new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET, new StreamIdConvertor()), + new ObjectMapJoinDecoder())); + + RedisCommand>>> XREAD = new RedisCommand>>>("XREAD", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX))); + + RedisCommand>> XREAD_BLOCKING = new RedisCommand>>("XREAD", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX))); + + RedisCommand>> XREAD_SINGLE = new RedisCommand>>("XREAD", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder())); - RedisStrictCommand XADD = new RedisStrictCommand("XADD"); + RedisCommand>> XREAD_BLOCKING_SINGLE = new RedisCommand>>("XREAD", + new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()), + new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder())); + + RedisStrictCommand XADD = new RedisStrictCommand("XADD", new StreamIdConvertor()); + RedisStrictCommand XADD_VOID = new RedisStrictCommand("XADD", new VoidReplayConvertor()); + RedisStrictCommand XLEN = new RedisStrictCommand("XLEN"); RedisStrictCommand TOUCH_LONG = new RedisStrictCommand("TOUCH", new LongReplayConvertor()); RedisStrictCommand TOUCH = new RedisStrictCommand("TOUCH", new BooleanReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java b/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java new file mode 100644 index 000000000..8e508dab5 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/convertor/StreamIdConvertor.java @@ -0,0 +1,33 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.convertor; + +import org.redisson.api.StreamId; + +/** + * + * @author Nikita Koksharov + * + */ +public class StreamIdConvertor extends SingleConvertor { + + @Override + public StreamId convert(Object id) { + String[] parts = id.toString().split("-"); + return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1])); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java index b1083b09e..a3af51352 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java @@ -37,6 +37,20 @@ public class ListMultiDecoder implements MultiDecoder { return null; } }; + + public static final Decoder RESET_1 = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return null; + } + }; + + public static final Decoder RESET_INDEX = new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + return null; + } + }; private final MultiDecoder[] decoders; @@ -53,8 +67,8 @@ public class ListMultiDecoder implements MultiDecoder { this.index = index; } - public void resetIndex() { - index = 0; + public void setIndex(int index) { + this.index = index; } public void resetPartsIndex() { @@ -112,13 +126,20 @@ public class ListMultiDecoder implements MultiDecoder { int index = getDecoder(state).getIndex(); if (index == -1) { - getDecoder(state).resetIndex(); + getDecoder(state).setIndex(0); index = 0; } + Decoder decoder = decoders[index].getDecoder(paramNum, state); if (decoder == RESET) { NestedDecoderState s = getDecoder(state); - s.resetIndex(); + s.setIndex(0); + int ind = s.getIndex(); + return decoders[ind].getDecoder(paramNum, state); + } + if (decoder == RESET_1) { + NestedDecoderState s = getDecoder(state); + s.setIndex(1); int ind = s.getIndex(); return decoders[ind].getDecoder(paramNum, state); } @@ -140,6 +161,13 @@ public class ListMultiDecoder implements MultiDecoder { index = s.incIndex() + s.getPartsIndex(); return decoders[index].decode(parts, state); } + + // TODO refactor it! + Decoder decoder = decoders[index].getDecoder(0, state); + if (decoder == RESET_INDEX) { + s.setIndex(-1); + } + return res; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java new file mode 100644 index 000000000..27df48004 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java @@ -0,0 +1,46 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class ObjectMapJoinDecoder implements MultiDecoder> { + + @Override + public Map decode(List parts, State state) { + Map result = new LinkedHashMap(parts.size()); + for (int i = 0; i < parts.size(); i++) { + result.putAll((Map) parts.get(i)); + } + return result; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java index 741740d76..2454c7ff6 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java @@ -21,6 +21,7 @@ import java.util.Map; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.convertor.Convertor; /** * @@ -29,12 +30,33 @@ import org.redisson.client.protocol.Decoder; */ public class ObjectMapReplayDecoder implements MultiDecoder> { + private Decoder codec; + private Convertor convertor; + + public ObjectMapReplayDecoder() { + } + + public ObjectMapReplayDecoder(Decoder codec) { + super(); + this.codec = codec; + } + + public ObjectMapReplayDecoder(Decoder codec, Convertor convertor) { + super(); + this.codec = codec; + this.convertor = convertor; + } + @Override public Map decode(List parts, State state) { Map result = new LinkedHashMap(parts.size()/2); for (int i = 0; i < parts.size(); i++) { if (i % 2 != 0) { - result.put(parts.get(i-1), parts.get(i)); + if (convertor != null) { + result.put(convertor.convert(parts.get(i-1)), parts.get(i)); + } else { + result.put(parts.get(i-1), parts.get(i)); + } } } return result; @@ -42,6 +64,9 @@ public class ObjectMapReplayDecoder implements MultiDecoder> @Override public Decoder getDecoder(int paramNum, State state) { + if (codec != null) { + return codec; + } return null; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java new file mode 100644 index 000000000..ef0ffb138 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java @@ -0,0 +1,47 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.redisson.api.StreamId; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class StreamResultDecoder implements MultiDecoder { + + @Override + public Object decode(List parts, State state) { + if (!parts.isEmpty()) { + Map>> result = (Map>>) parts.get(0); + return result.values().iterator().next(); + } + return Collections.emptyMap(); + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 4d1274920..cfe86bd5b 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -680,8 +680,26 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.getTimeout().cancel(); long timeoutTime = connectionManager.getConfig().getTimeout(); - if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) { - Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); + if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName()) + || RedisCommands.XREAD_BLOCKING_SINGLE == details.getCommand() + || RedisCommands.XREAD_BLOCKING == details.getCommand()) { + Long popTimeout = null; + if (RedisCommands.XREAD_BLOCKING_SINGLE == details.getCommand() + || RedisCommands.XREAD_BLOCKING == details.getCommand()) { + boolean found = false; + for (Object param : details.getParams()) { + if (found) { + popTimeout = Long.valueOf(param.toString()) / 1000; + break; + } + if (param instanceof String) { + found = true; + } + } + } else { + popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); + } + handleBlockingOperations(details, connection, popTimeout); if (popTimeout == 0) { return; diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java new file mode 100644 index 000000000..41f2ecaac --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -0,0 +1,244 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.redisson.api.RStream; +import org.redisson.api.StreamId; + +public class RedissonStreamTest extends BaseTest { + + @Test + public void testRangeReversed() { + RStream stream = redisson.getStream("test"); + assertThat(stream.size()).isEqualTo(0); + + Map entries1 = new HashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + stream.addAll(new StreamId(1), entries1, 1, false); + assertThat(stream.size()).isEqualTo(1); + + Map entries2 = new HashMap<>(); + entries2.put("5", "55"); + entries2.put("7", "77"); + stream.addAll(new StreamId(2), entries2, 1, false); + + Map> r2 = stream.rangeReversed(10, StreamId.MAX, StreamId.MIN); + assertThat(r2.keySet()).containsExactly(new StreamId(2), new StreamId(1)); + assertThat(r2.get(new StreamId(1))).isEqualTo(entries1); + assertThat(r2.get(new StreamId(2))).isEqualTo(entries2); + } + + @Test + public void testRange() { + RStream stream = redisson.getStream("test"); + assertThat(stream.size()).isEqualTo(0); + + Map entries1 = new HashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + stream.addAll(new StreamId(1), entries1, 1, false); + assertThat(stream.size()).isEqualTo(1); + + Map entries2 = new HashMap<>(); + entries2.put("5", "55"); + entries2.put("7", "77"); + stream.addAll(new StreamId(2), entries2, 1, false); + + Map> r = stream.range(10, new StreamId(0), new StreamId(1)); + assertThat(r).hasSize(1); + assertThat(r.get(new StreamId(1))).isEqualTo(entries1); + + Map> r2 = stream.range(10, StreamId.MIN, StreamId.MAX); + assertThat(r2.keySet()).containsExactly(new StreamId(1), new StreamId(2)); + assertThat(r2.get(new StreamId(1))).isEqualTo(entries1); + assertThat(r2.get(new StreamId(2))).isEqualTo(entries2); + } + + @Test + public void testPollMultiKeys() { + RStream stream = redisson.getStream("test"); + + Map entries1 = new LinkedHashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + + Thread t = new Thread() { + @Override + public void run() { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + stream.addAll(new StreamId(1), entries1); + } + }; + t.start(); + + long start = System.currentTimeMillis(); + Map>> s = stream.read(2, 5, TimeUnit.SECONDS, Collections.singleton("test1"), new StreamId(0), StreamId.NEWEST); + assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L); + assertThat(s).hasSize(1); + assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1); + } + + @Test + public void testPoll() { + RStream stream = redisson.getStream("test"); + + Map entries1 = new LinkedHashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + + Thread t = new Thread() { + @Override + public void run() { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + stream.addAll(new StreamId(1), entries1); + } + }; + t.start(); + + long start = System.currentTimeMillis(); + Map> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0)); + assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L); + assertThat(s).hasSize(1); + assertThat(s.get(new StreamId(1))).isEqualTo(entries1); + } + + @Test + public void testSize() { + RStream stream = redisson.getStream("test"); + assertThat(stream.size()).isEqualTo(0); + + Map entries1 = new HashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + stream.addAll(new StreamId(1), entries1, 1, false); + assertThat(stream.size()).isEqualTo(1); + + Map entries2 = new HashMap<>(); + entries2.put("5", "55"); + entries2.put("7", "77"); + stream.addAll(new StreamId(2), entries2, 1, false); + assertThat(stream.size()).isEqualTo(2); + } + + @Test + public void testReadMultiKeysEmpty() { + RStream stream = redisson.getStream("test2"); + Map>> s = stream.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0)); + assertThat(s).isEmpty(); + } + + @Test + public void testReadMultiKeys() { + RStream stream1 = redisson.getStream("test1"); + Map entries1 = new LinkedHashMap<>(); + entries1.put("1", "11"); + entries1.put("2", "22"); + entries1.put("3", "33"); + stream1.addAll(entries1); + RStream stream2 = redisson.getStream("test2"); + Map entries2 = new LinkedHashMap<>(); + entries2.put("4", "44"); + entries2.put("5", "55"); + entries2.put("6", "66"); + stream2.addAll(entries2); + + Map>> s = stream2.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0)); + assertThat(s).hasSize(2); + assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1); + assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2); + } + + @Test + public void testReadMulti() { + RStream stream = redisson.getStream("test"); + + Map entries1 = new LinkedHashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + stream.addAll(new StreamId(1), entries1, 1, false); + + Map entries2 = new LinkedHashMap<>(); + entries2.put("5", "55"); + entries2.put("7", "77"); + stream.addAll(new StreamId(2), entries2, 1, false); + + Map entries3 = new LinkedHashMap<>(); + entries3.put("15", "05"); + entries3.put("17", "07"); + stream.addAll(new StreamId(3), entries3, 1, false); + + Map> result = stream.read(10, new StreamId(0, 0)); + assertThat(result).hasSize(3); + assertThat(result.get(new StreamId(4))).isNull(); + assertThat(result.get(new StreamId(1))).isEqualTo(entries1); + assertThat(result.get(new StreamId(2))).isEqualTo(entries2); + assertThat(result.get(new StreamId(3))).isEqualTo(entries3); + } + + @Test + public void testReadSingle() { + RStream stream = redisson.getStream("test"); + Map entries1 = new LinkedHashMap<>(); + entries1.put("1", "11"); + entries1.put("3", "31"); + stream.addAll(new StreamId(1), entries1, 1, true); + + Map> result = stream.read(10, new StreamId(0, 0)); + assertThat(result).hasSize(1); + assertThat(result.get(new StreamId(4))).isNull(); + assertThat(result.get(new StreamId(1))).isEqualTo(entries1); + } + + @Test + public void testReadEmpty() { + RStream stream2 = redisson.getStream("test"); + Map> result2 = stream2.read(10, new StreamId(0, 0)); + assertThat(result2).isEmpty(); + } + + @Test + public void testAdd() { + RStream stream = redisson.getStream("test1"); + StreamId s = stream.add("12", "33"); + assertThat(s.getId0()).isNotNegative(); + assertThat(s.getId1()).isNotNegative(); + assertThat(stream.size()).isEqualTo(1); + } + + @Test + public void testAddAll() { + RStream stream = redisson.getStream("test1"); + assertThat(stream.size()).isEqualTo(0); + + Map entries = new HashMap<>(); + entries.put("6", "61"); + entries.put("4", "41"); + stream.addAll(new StreamId(12, 42), entries, 10, false); + assertThat(stream.size()).isEqualTo(1); + + entries.clear(); + entries.put("1", "11"); + entries.put("3", "31"); + stream.addAll(new StreamId(Long.MAX_VALUE), entries, 1, false); + assertThat(stream.size()).isEqualTo(2); + } + +}