Redis Stream implementation. Add, Read and Range methods added #1490

pull/1499/head
Nikita 7 years ago
parent 459bdfa49a
commit 61d7eac98e

@ -72,6 +72,7 @@ import org.redisson.api.RSetCache;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetMultimapCache;
import org.redisson.api.RSortedSet;
import org.redisson.api.RStream;
import org.redisson.api.RTopic;
import org.redisson.api.RTransaction;
import org.redisson.api.RedissonClient;
@ -190,6 +191,16 @@ public class Redisson implements RedissonClient {
return react;
}
@Override
public <K, V> RStream<K, V> getStream(String name) {
return new RedissonStream<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RStream<K, V> getStream(String name, Codec codec) {
return new RedissonStream<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RBinaryStream getBinaryStream(String name) {
return new RedissonBinaryStream(connectionManager.getCommandExecutor(), name);

@ -42,6 +42,7 @@ import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.api.RScriptAsync;
import org.redisson.api.RSetAsync;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RStreamAsync;
import org.redisson.api.RTopicAsync;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchService;
@ -326,4 +327,14 @@ public class RedissonBatch implements RBatch {
this.executorService.enableRedissonReferenceSupport(redisson);
}
@Override
public <K, V> RStreamAsync<K, V> getStream(String name) {
return new RedissonStream<K, V>(executorService, name);
}
@Override
public <K, V> RStreamAsync<K, V> getStream(String name, Codec codec) {
return new RedissonStream<K, V>(codec, executorService, name);
}
}

@ -0,0 +1,409 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K, V> {
public RedissonStream(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
public RedissonStream(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
protected void checkKey(Object key) {
if (key == null) {
throw new NullPointerException("key can't be null");
}
}
protected void checkValue(Object value) {
if (value == null) {
throw new NullPointerException("value can't be null");
}
}
@Override
public StreamId addAll(Map<K, V> entries) {
return addAll(entries, 0, false);
}
@Override
public RFuture<StreamId> addAllAsync(Map<K, V> entries) {
return addAllAsync(entries, 0, false);
}
@Override
public void addAll(StreamId id, Map<K, V> entries) {
addAll(id, entries, 0, false);
}
@Override
public RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries) {
return addAllAsync(id, entries, 0, false);
}
@Override
public StreamId addAll(Map<K, V> entries, int trimLen, boolean trimStrict) {
return get(addAllAsync(entries, trimLen, trimStrict));
}
@Override
public RFuture<StreamId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict) {
return addAllCustomAsync(null, entries, trimLen, trimStrict);
}
@Override
public void addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
get(addAllAsync(id, entries, trimLen, trimStrict));
}
private <R> RFuture<R> addAllCustomAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
List<Object> params = new ArrayList<Object>(entries.size()*2 + 1);
params.add(getName());
if (trimLen > 0) {
params.add("MAXLEN");
if (!trimStrict) {
params.add("~");
}
params.add(trimLen);
}
if (id == null) {
params.add("*");
} else {
params.add(id.toString());
}
for (java.util.Map.Entry<? extends K, ? extends V> t : entries.entrySet()) {
checkKey(t.getKey());
checkValue(t.getValue());
params.add(encodeMapKey(t.getKey()));
params.add(encodeMapValue(t.getValue()));
}
if (id == null) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD, params.toArray());
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
}
@Override
public RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict) {
return addAllCustomAsync(id, entries, trimLen, trimStrict);
}
@Override
public long size() {
return get(sizeAsync());
}
@Override
public RFuture<Long> sizeAsync() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XLEN, getName());
}
@Override
public Map<StreamId, Map<K, V>> read(int count, StreamId ... ids) {
return get(readAsync(count, ids));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, Collection<String> keys, StreamId ... ids) {
return readAsync(count, -1, null, keys, ids);
}
@Override
public Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId... ids) {
return get(readAsync(count, timeout, unit, ids));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId... ids) {
if (keys.size() + 1 != ids.length) {
throw new IllegalArgumentException("keys amount should be lower by one than ids amount");
}
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
params.add(count);
}
if (timeout > 0) {
params.add("BLOCK");
params.add(toSeconds(timeout, unit)*1000);
}
params.add("STREAMS");
params.add(getName());
if (keys != null) {
for (String key : keys) {
params.add(key);
}
}
for (StreamId id : ids) {
params.add(id.toString());
}
if (timeout > 0) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_BLOCKING, params.toArray());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD, params.toArray());
}
@Override
public RFuture<StreamId> addAsync(K key, V value) {
return addAsync(key, value, 0, false);
}
@Override
public RFuture<Void> addAsync(StreamId id, K key, V value) {
return addAsync(id, key, value, 0, false);
}
@Override
public RFuture<StreamId> addAsync(K key, V value, int trimLen, boolean trimStrict) {
return addCustomAsync(null, key, value, trimLen, trimStrict);
}
private <R> RFuture<R> addCustomAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
if (trimLen > 0) {
params.add("MAXLEN");
if (!trimStrict) {
params.add("~");
}
params.add(trimLen);
}
if (id == null) {
params.add("*");
} else {
params.add(id.toString());
}
checkKey(key);
checkValue(value);
params.add(encodeMapKey(key));
params.add(encodeMapValue(value));
if (id == null) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD, params.toArray());
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.XADD_VOID, params.toArray());
}
@Override
public RFuture<Void> addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
return addCustomAsync(id, key, value, trimLen, trimStrict);
}
@Override
public StreamId add(K key, V value) {
return get(addAsync(key, value));
}
@Override
public void add(StreamId id, K key, V value) {
get(addAsync(id, key, value));
}
@Override
public StreamId add(K key, V value, int trimLen, boolean trimStrict) {
return get(addAsync(key, value, trimLen, trimStrict));
}
@Override
public void add(StreamId id, K key, V value, int trimLen, boolean trimStrict) {
get(addAsync(id, key, value, trimLen, trimStrict));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, StreamId... ids) {
return readAsync(count, 0, null, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit,
StreamId... ids) {
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
params.add(count);
}
if (timeout > 0) {
params.add("BLOCK");
params.add(toSeconds(timeout, unit)*1000);
}
params.add("STREAMS");
params.add(getName());
for (StreamId id : ids) {
params.add(id.toString());
}
if (timeout > 0) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_BLOCKING_SINGLE, params.toArray());
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREAD_SINGLE, params.toArray());
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, Collection<String> keys, StreamId... ids) {
return get(readAsync(count, keys, ids));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, Collection<String> keys,
StreamId... ids) {
return get(readAsync(count, timeout, unit, keys, ids));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeAsync(int count, StreamId startId, StreamId endId) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
params.add(startId);
params.add(endId);
if (count > 0) {
params.add("COUNT");
params.add(count);
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XRANGE, params.toArray());
}
@Override
public Map<StreamId, Map<K, V>> range(int count, StreamId startId, StreamId endId) {
return get(rangeAsync(count, startId, endId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(int count, StreamId startId, StreamId endId) {
List<Object> params = new LinkedList<Object>();
params.add(getName());
params.add(startId);
params.add(endId);
if (count > 0) {
params.add("COUNT");
params.add(count);
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.XREVRANGE, params.toArray());
}
@Override
public Map<StreamId, Map<K, V>> rangeReversed(int count, StreamId startId, StreamId endId) {
return get(rangeReversedAsync(count, startId, endId));
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(StreamId... ids) {
return readAsync(0, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamId... ids) {
return readAsync(0, timeout, unit, ids);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(Collection<String> keys, StreamId... ids) {
return readAsync(0, keys, ids);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit,
Collection<String> keys, StreamId... ids) {
return readAsync(0, timeout, unit, keys, ids);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeAsync(StreamId startId, StreamId endId) {
return rangeAsync(0, startId, endId);
}
@Override
public RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(StreamId startId, StreamId endId) {
return rangeReversedAsync(0, startId, endId);
}
@Override
public Map<StreamId, Map<K, V>> read(StreamId... ids) {
return read(0, ids);
}
@Override
public Map<StreamId, Map<K, V>> read(long timeout, TimeUnit unit, StreamId... ids) {
return read(0, timeout, unit, ids);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(Collection<String> keys, StreamId... ids) {
return read(0, keys, ids);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, Collection<String> keys,
StreamId... ids) {
return read(0, timeout, unit, keys, ids);
}
@Override
public Map<StreamId, Map<K, V>> range(StreamId startId, StreamId endId) {
return range(0, startId, endId);
}
@Override
public Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId) {
return rangeReversed(0, startId, endId);
}
}

@ -33,6 +33,28 @@ import org.redisson.client.codec.Codec;
*/
public interface RBatch {
/**
* Returns stream instance by <code>name</code>
*
* @param <K> type of key
* @param <V> type of value
* @param name of stream
* @return RStream object
*/
<K, V> RStreamAsync<K, V> getStream(String name);
/**
* Returns stream instance by <code>name</code>
* using provided <code>codec</code> for entries.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of stream
* @param codec - codec for entry
* @return RStream object
*/
<K, V> RStreamAsync<K, V> getStream(String name, Codec codec);
/**
* Returns geospatial items holder instance by <code>name</code>.
*

@ -0,0 +1,251 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Redis Stream implementation.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
/**
* Returns number of entries in stream
*
* @return size of stream
*/
long size();
/**
* Appends a new entry and returns generated Stream ID
*
* @param key - key of entry
* @param value - value of entry
* @return Stream ID
*/
StreamId add(K key, V value);
/**
* Appends a new entry by specified Stream ID
*
* @param id - Stream ID
* @param key - key of entry
* @param value - value of entry
*/
void add(StreamId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
StreamId add(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
*/
void add(StreamId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
*
* @param entries - entries to add
* @return Stream ID
*/
StreamId addAll(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
*
* @param id - Stream ID
* @param entries - entries to add
*/
void addAll(StreamId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
StreamId addAll(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
*/
void addAll(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> read(StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> read(int count, StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> read(long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
*
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(Collection<String> keys, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
*
* @param count - stream data size limit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, Collection<String> keys, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> range(StreamId startId, StreamId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> range(int count, StreamId startId, StreamId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
Map<StreamId, Map<K, V>> rangeReversed(int count, StreamId startId, StreamId endId);
}

@ -0,0 +1,255 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Redis Stream implementation.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RStreamAsync<K, V> extends RExpirableAsync {
/**
* Returns number of entries in stream
*
* @return size of stream
*/
RFuture<Long> sizeAsync();
/**
* Appends a new entry and returns generated Stream ID
*
* @param key - key of entry
* @param value - value of entry
* @return Stream ID
*/
RFuture<StreamId> addAsync(K key, V value);
/**
* Appends a new entry by specified Stream ID
*
* @param id - Stream ID
* @param key - key of entry
* @param value - value of entry
* @return void
*/
RFuture<Void> addAsync(StreamId id, K key, V value);
/**
* Appends a new entry and returns generated Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
RFuture<StreamId> addAsync(K key, V value, int trimLen, boolean trimStrict);
/**
* Appends a new entry by specified Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param key - key of entry
* @param value - value of entry
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
RFuture<Void> addAsync(StreamId id, K key, V value, int trimLen, boolean trimStrict);
/**
* Appends new entries and returns generated Stream ID
*
* @param entries - entries to add
* @return Stream ID
*/
RFuture<StreamId> addAllAsync(Map<K, V> entries);
/**
* Appends new entries by specified Stream ID
*
* @param id - Stream ID
* @param entries - entries to add
* @return void
*/
RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries);
/**
* Appends new entries and returns generated Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return Stream ID
*/
RFuture<StreamId> addAllAsync(Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Appends new entries by specified Stream ID.
* Trims stream to a specified <code>trimLen</code> size.
* If <code>trimStrict</code> is <code>false</code> then trims to few tens of entries more than specified length to trim.
*
* @param id - Stream ID
* @param entries - entries to add
* @param trimLen - length to trim
* @param trimStrict - if <code>false</code> then trims to few tens of entries more than specified length to trim
* @return void
*/
RFuture<Void> addAllAsync(StreamId id, Map<K, V> entries, int trimLen, boolean trimStrict);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
*
* @param count - stream data size limit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of Stream IDs.
* Wait for stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param ids - collection of Stream IDs
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
*
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(Collection<String> keys, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
*
* @param count - stream data size limit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, Collection<String> keys, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeAsync(StreamId startId, StreamId endId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeAsync(int count, StreamId startId, StreamId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(StreamId startId, StreamId endId);
/**
* Read stream data in reverse order in range by specified start Stream ID (included) and end Stream ID (included).
*
* @param count - stream data size limit
* @param startId - start Stream ID
* @param endId - end Stream ID
* @return stream data mapped by Stream ID
*/
RFuture<Map<StreamId, Map<K, V>>> rangeReversedAsync(int count, StreamId startId, StreamId endId);
}

@ -24,11 +24,39 @@ import org.redisson.config.Config;
* Main Redisson interface for access
* to all redisson objects with sync/async interface.
*
* @see RedissonReactiveClient
*
* @author Nikita Koksharov
*
*/
public interface RedissonClient {
/**
* Returns stream instance by <code>name</code>
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name of stream
* @return RStream object
*/
<K, V> RStream<K, V> getStream(String name);
/**
* Returns stream instance by <code>name</code>
* using provided <code>codec</code> for entries.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of stream
* @param codec - codec for entry
* @return RStream object
*/
<K, V> RStream<K, V> getStream(String name, Codec codec);
/**
* Returns rate limiter instance by <code>name</code>
*

@ -0,0 +1,115 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* Stream ID used by Redis Stream
*
* @author Nikita Koksharov
*
*/
public class StreamId {
/**
* Defines minimal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
*/
public static final StreamId MIN = new StreamId(-1);
/**
* Defines maximal id. Used in {@link RStream#range} and {@link RStreamAsync#rangeAsync} methods
*/
public static final StreamId MAX = new StreamId(-1);
/**
* Defines latest id to receive Stream entries added since method invocation.
* <p>
* Used in {@link RStream#read} and {@link RStreamAsync#readAsync} methods
*/
public static final StreamId NEWEST = new StreamId(-1);
private long id0;
private long id1;
public StreamId(long id0) {
super();
this.id0 = id0;
}
public StreamId(long id0, long id1) {
super();
this.id0 = id0;
this.id1 = id1;
}
/**
* Returns first part of ID
*
* @return first part of ID
*/
public long getId0() {
return id0;
}
/**
* Returns second part of ID
*
* @return second part of ID
*/
public long getId1() {
return id1;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (id0 ^ (id0 >>> 32));
result = prime * result + (int) (id1 ^ (id1 >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
StreamId other = (StreamId) obj;
if (id0 != other.id0)
return false;
if (id1 != other.id1)
return false;
return true;
}
@Override
public String toString() {
if (this == NEWEST) {
return "$";
}
if (this == MIN) {
return "-";
}
if (this == MAX) {
return "+";
}
return id0 + "-" + id1;
}
}

@ -97,7 +97,9 @@ public class CommandData<T, R> implements QueueCommand {
}
public boolean isBlockingCommand() {
return RedisCommands.BLOCKING_COMMANDS.contains(command.getName());
return RedisCommands.BLOCKING_COMMANDS.contains(command.getName())
|| RedisCommands.XREAD_BLOCKING_SINGLE == command
|| RedisCommands.XREAD_BLOCKING == command;
}
}

@ -16,13 +16,17 @@
package org.redisson.client.protocol;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.redisson.api.RType;
import org.redisson.api.StreamId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
@ -36,6 +40,8 @@ import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.client.protocol.convertor.StreamIdConvertor;
import org.redisson.client.protocol.convertor.TimeObjectDecoder;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.TypeConvertor;
@ -54,6 +60,7 @@ import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder;
@ -61,6 +68,7 @@ import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.client.protocol.decoder.StreamResultDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
@ -299,7 +307,33 @@ public interface RedisCommands {
RedisCommand<Boolean> SETNX = new RedisCommand<Boolean>("SETNX", new BooleanReplayConvertor());
RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor());
RedisStrictCommand<String> XADD = new RedisStrictCommand<String>("XADD");
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREVRANGE = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREVRANGE",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET, new StreamIdConvertor()),
new ObjectMapJoinDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XRANGE",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET, new StreamIdConvertor()),
new ObjectMapJoinDecoder()));
RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamId, Map<Object, Object>>>>("XREAD",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX)));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_BLOCKING = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX)));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder()));
RedisCommand<Map<StreamId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = new RedisCommand<Map<StreamId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder(new ListResultReplayDecoder(), new ObjectMapReplayDecoder(), new ObjectMapReplayDecoder(ListMultiDecoder.RESET_1, new StreamIdConvertor()),
new ObjectMapJoinDecoder(), new ObjectMapReplayDecoder(), new StreamResultDecoder()));
RedisStrictCommand<StreamId> XADD = new RedisStrictCommand<StreamId>("XADD", new StreamIdConvertor());
RedisStrictCommand<Void> XADD_VOID = new RedisStrictCommand<Void>("XADD", new VoidReplayConvertor());
RedisStrictCommand<Long> XLEN = new RedisStrictCommand<Long>("XLEN");
RedisStrictCommand<Long> TOUCH_LONG = new RedisStrictCommand<Long>("TOUCH", new LongReplayConvertor());
RedisStrictCommand<Boolean> TOUCH = new RedisStrictCommand<Boolean>("TOUCH", new BooleanReplayConvertor());

@ -0,0 +1,33 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
import org.redisson.api.StreamId;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamIdConvertor extends SingleConvertor<StreamId> {
@Override
public StreamId convert(Object id) {
String[] parts = id.toString().split("-");
return new StreamId(Long.valueOf(parts[0]), Long.valueOf(parts[1]));
}
}

@ -38,6 +38,20 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
}
};
public static final Decoder<Object> RESET_1 = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return null;
}
};
public static final Decoder<Object> RESET_INDEX = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return null;
}
};
private final MultiDecoder<?>[] decoders;
public static class NestedDecoderState implements DecoderState {
@ -53,8 +67,8 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
this.index = index;
}
public void resetIndex() {
index = 0;
public void setIndex(int index) {
this.index = index;
}
public void resetPartsIndex() {
@ -112,13 +126,20 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
int index = getDecoder(state).getIndex();
if (index == -1) {
getDecoder(state).resetIndex();
getDecoder(state).setIndex(0);
index = 0;
}
Decoder<Object> decoder = decoders[index].getDecoder(paramNum, state);
if (decoder == RESET) {
NestedDecoderState s = getDecoder(state);
s.resetIndex();
s.setIndex(0);
int ind = s.getIndex();
return decoders[ind].getDecoder(paramNum, state);
}
if (decoder == RESET_1) {
NestedDecoderState s = getDecoder(state);
s.setIndex(1);
int ind = s.getIndex();
return decoders[ind].getDecoder(paramNum, state);
}
@ -140,6 +161,13 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
index = s.incIndex() + s.getPartsIndex();
return decoders[index].decode(parts, state);
}
// TODO refactor it!
Decoder<Object> decoder = decoders[index].getDecoder(0, state);
if (decoder == RESET_INDEX) {
s.setIndex(-1);
}
return res;
}

@ -0,0 +1,46 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
*/
public class ObjectMapJoinDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size());
for (int i = 0; i < parts.size(); i++) {
result.putAll((Map<? extends Object, ? extends Object>) parts.get(i));
}
return result;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -21,6 +21,7 @@ import java.util.Map;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.convertor.Convertor;
/**
*
@ -29,19 +30,43 @@ import org.redisson.client.protocol.Decoder;
*/
public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
private Decoder<Object> codec;
private Convertor<?> convertor;
public ObjectMapReplayDecoder() {
}
public ObjectMapReplayDecoder(Decoder<Object> codec) {
super();
this.codec = codec;
}
public ObjectMapReplayDecoder(Decoder<Object> codec, Convertor<?> convertor) {
super();
this.codec = codec;
this.convertor = convertor;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
if (convertor != null) {
result.put(convertor.convert(parts.get(i-1)), parts.get(i));
} else {
result.put(parts.get(i-1), parts.get(i));
}
}
}
return result;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (codec != null) {
return codec;
}
return null;
}

@ -0,0 +1,47 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.redisson.api.StreamId;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
*/
public class StreamResultDecoder implements MultiDecoder<Object> {
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
Map<String, Map<StreamId, Map<Object, Object>>> result = (Map<String, Map<StreamId, Map<Object, Object>>>) parts.get(0);
return result.values().iterator().next();
}
return Collections.emptyMap();
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -680,8 +680,26 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.getTimeout().cancel();
long timeoutTime = connectionManager.getConfig().getTimeout();
if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) {
Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())
|| RedisCommands.XREAD_BLOCKING_SINGLE == details.getCommand()
|| RedisCommands.XREAD_BLOCKING == details.getCommand()) {
Long popTimeout = null;
if (RedisCommands.XREAD_BLOCKING_SINGLE == details.getCommand()
|| RedisCommands.XREAD_BLOCKING == details.getCommand()) {
boolean found = false;
for (Object param : details.getParams()) {
if (found) {
popTimeout = Long.valueOf(param.toString()) / 1000;
break;
}
if (param instanceof String) {
found = true;
}
}
} else {
popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
}
handleBlockingOperations(details, connection, popTimeout);
if (popTimeout == 0) {
return;

@ -0,0 +1,244 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.redisson.api.RStream;
import org.redisson.api.StreamId;
public class RedissonStreamTest extends BaseTest {
@Test
public void testRangeReversed() {
RStream<String, String> stream = redisson.getStream("test");
assertThat(stream.size()).isEqualTo(0);
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
Map<StreamId, Map<String, String>> r2 = stream.rangeReversed(10, StreamId.MAX, StreamId.MIN);
assertThat(r2.keySet()).containsExactly(new StreamId(2), new StreamId(1));
assertThat(r2.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamId(2))).isEqualTo(entries2);
}
@Test
public void testRange() {
RStream<String, String> stream = redisson.getStream("test");
assertThat(stream.size()).isEqualTo(0);
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
Map<StreamId, Map<String, String>> r = stream.range(10, new StreamId(0), new StreamId(1));
assertThat(r).hasSize(1);
assertThat(r.get(new StreamId(1))).isEqualTo(entries1);
Map<StreamId, Map<String, String>> r2 = stream.range(10, StreamId.MIN, StreamId.MAX);
assertThat(r2.keySet()).containsExactly(new StreamId(1), new StreamId(2));
assertThat(r2.get(new StreamId(1))).isEqualTo(entries1);
assertThat(r2.get(new StreamId(2))).isEqualTo(entries2);
}
@Test
public void testPollMultiKeys() {
RStream<String, String> stream = redisson.getStream("test");
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
Thread t = new Thread() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stream.addAll(new StreamId(1), entries1);
}
};
t.start();
long start = System.currentTimeMillis();
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, Collections.singleton("test1"), new StreamId(0), StreamId.NEWEST);
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1);
}
@Test
public void testPoll() {
RStream<String, String> stream = redisson.getStream("test");
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
Thread t = new Thread() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stream.addAll(new StreamId(1), entries1);
}
};
t.start();
long start = System.currentTimeMillis();
Map<StreamId, Map<String, String>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0));
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get(new StreamId(1))).isEqualTo(entries1);
}
@Test
public void testSize() {
RStream<String, String> stream = redisson.getStream("test");
assertThat(stream.size()).isEqualTo(0);
Map<String, String> entries1 = new HashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
assertThat(stream.size()).isEqualTo(1);
Map<String, String> entries2 = new HashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
assertThat(stream.size()).isEqualTo(2);
}
@Test
public void testReadMultiKeysEmpty() {
RStream<String, String> stream = redisson.getStream("test2");
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0));
assertThat(s).isEmpty();
}
@Test
public void testReadMultiKeys() {
RStream<String, String> stream1 = redisson.getStream("test1");
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("2", "22");
entries1.put("3", "33");
stream1.addAll(entries1);
RStream<String, String> stream2 = redisson.getStream("test2");
Map<String, String> entries2 = new LinkedHashMap<>();
entries2.put("4", "44");
entries2.put("5", "55");
entries2.put("6", "66");
stream2.addAll(entries2);
Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0));
assertThat(s).hasSize(2);
assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1);
assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2);
}
@Test
public void testReadMulti() {
RStream<String, String> stream = redisson.getStream("test");
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, false);
Map<String, String> entries2 = new LinkedHashMap<>();
entries2.put("5", "55");
entries2.put("7", "77");
stream.addAll(new StreamId(2), entries2, 1, false);
Map<String, String> entries3 = new LinkedHashMap<>();
entries3.put("15", "05");
entries3.put("17", "07");
stream.addAll(new StreamId(3), entries3, 1, false);
Map<StreamId, Map<String, String>> result = stream.read(10, new StreamId(0, 0));
assertThat(result).hasSize(3);
assertThat(result.get(new StreamId(4))).isNull();
assertThat(result.get(new StreamId(1))).isEqualTo(entries1);
assertThat(result.get(new StreamId(2))).isEqualTo(entries2);
assertThat(result.get(new StreamId(3))).isEqualTo(entries3);
}
@Test
public void testReadSingle() {
RStream<String, String> stream = redisson.getStream("test");
Map<String, String> entries1 = new LinkedHashMap<>();
entries1.put("1", "11");
entries1.put("3", "31");
stream.addAll(new StreamId(1), entries1, 1, true);
Map<StreamId, Map<String, String>> result = stream.read(10, new StreamId(0, 0));
assertThat(result).hasSize(1);
assertThat(result.get(new StreamId(4))).isNull();
assertThat(result.get(new StreamId(1))).isEqualTo(entries1);
}
@Test
public void testReadEmpty() {
RStream<String, String> stream2 = redisson.getStream("test");
Map<StreamId, Map<String, String>> result2 = stream2.read(10, new StreamId(0, 0));
assertThat(result2).isEmpty();
}
@Test
public void testAdd() {
RStream<String, String> stream = redisson.getStream("test1");
StreamId s = stream.add("12", "33");
assertThat(s.getId0()).isNotNegative();
assertThat(s.getId1()).isNotNegative();
assertThat(stream.size()).isEqualTo(1);
}
@Test
public void testAddAll() {
RStream<String, String> stream = redisson.getStream("test1");
assertThat(stream.size()).isEqualTo(0);
Map<String, String> entries = new HashMap<>();
entries.put("6", "61");
entries.put("4", "41");
stream.addAll(new StreamId(12, 42), entries, 10, false);
assertThat(stream.size()).isEqualTo(1);
entries.clear();
entries.put("1", "11");
entries.put("3", "31");
stream.addAll(new StreamId(Long.MAX_VALUE), entries, 1, false);
assertThat(stream.size()).isEqualTo(2);
}
}
Loading…
Cancel
Save