Merge branch 'master' into 3.0.0

# Conflicts:
#	pom.xml
#	redisson-all/pom.xml
#	redisson-tomcat/pom.xml
#	redisson-tomcat/redisson-tomcat-6/pom.xml
#	redisson-tomcat/redisson-tomcat-7/pom.xml
#	redisson-tomcat/redisson-tomcat-8/pom.xml
#	redisson-tomcat/redisson-tomcat-9/pom.xml
#	redisson/pom.xml
pull/1821/head
Nikita 7 years ago
commit f13cd8b9af

@ -4,7 +4,18 @@ Redisson Releases History
Try __[Redisson PRO](https://redisson.pro)__ version.
### 14-Jun-2018 - versions 2.12.1 and 3.7.1 released
### 27-Jun-2018 - versions 2.12.3 and 3.7.3 released
Feature - added `RKeys.getKeys` method with batch size
Feature - added `SnappyCodecV2` codec
Fixed - `SerializationCodec` doesn't support proxied classes
Fixed - NPE if `RScheduledExecutorService`'s task scheduled with cron expression for finite number of execution
Fixed - validation of cron expression parameter of `RScheduledExecutorService.schedule` method
Feature - Iterator with batch size param for all `RSet`, `RMap`, `RMapCached` objects
Fixed - missing PubSub messages when `pingConnectionInterval` setting is specified
Fixed - excessive memory consumption if batched commands queued on Redis side
Fixed - `RRateLimiter.acquire` method throws NPE
### 14-Jun-2018 - versions 2.12.2 and 3.7.2 released
Feature - `RBatchOptions.executionMode` setting added. Please refer to [documentation](https://github.com/redisson/redisson/wiki/10.-additional-features#103-execution-batches-of-commands) for more details
Fixed - NPE in JCacheManager.close method

@ -1,13 +1,13 @@
Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis client
====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)**
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.3) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
| Stable <br/> Release Version | Release Date | JDK Version<br/> compatibility | `CompletionStage` <br/> support | `ProjectReactor` version<br/> compatibility |
| ------------- | ------------- | ------------| -----------| -----------|
| 3.7.2 | 14.06.2018 | 1.8, 1.9, 1.10+ | Yes | 3.1.x |
| 2.12.2 | 14.06.2018 | 1.6, 1.7, 1.8, 1.9, 1.10, Android | No | 2.0.8 |
| 3.7.3 | 27.06.2018 | 1.8, 1.9, 1.10+ | Yes | 3.1.x |
| 2.12.3 | 27.06.2018 | 1.6, 1.7, 1.8, 1.9, 1.10, Android | No | 2.0.8 |
Features
@ -100,23 +100,23 @@ Quick start
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.2</version>
<version>3.7.3</version>
</dependency>
<!-- JDK 1.6+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.12.2</version>
<version>2.12.3</version>
</dependency>
#### Gradle
// JDK 1.8+ compatible
compile 'org.redisson:redisson:3.7.2'
compile 'org.redisson:redisson:3.7.3'
// JDK 1.6+ compatible
compile 'org.redisson:redisson:2.12.2'
compile 'org.redisson:redisson:2.12.3'
#### Java
@ -141,11 +141,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads
===============================
[Redisson 3.7.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.7.2&e=jar),
[Redisson node 3.7.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.7.2&e=jar)
[Redisson 3.7.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.7.3&e=jar),
[Redisson node 3.7.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.7.3&e=jar)
[Redisson 2.12.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.12.2&e=jar),
[Redisson node 2.12.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.12.2&e=jar)
[Redisson 2.12.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.12.3&e=jar),
[Redisson node 2.12.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.12.3&e=jar)
FAQs
===============================

@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
}
}
@ -108,7 +108,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -116,6 +116,14 @@ public class RedissonSession extends StandardSession {
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) {
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<String, Object> entry : newMap.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
return new AttributesPutAllMessage(getId(), map);
}
@Override
public void setMaxInactiveInterval(int interval) {
super.setMaxInactiveInterval(interval);
@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {

@ -137,6 +137,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
sessionId = generateSessionId();
}
session.setManager(this);
session.setId(sessionId);
session.save();
@ -150,7 +151,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
return redisson.getTopic("redisson:tomcat_session_updates:" + container.getName());
}
@Override
@ -166,6 +167,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id);
session.setManager(this);
session.load(attrs);
session.access();
@ -260,6 +262,15 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
}
try {
try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
return Redisson.create(config);
} catch (Exception e) {
throw new LifecycleException(e);
@ -285,8 +296,10 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
RedissonSession sess = (RedissonSession) findSession(session.getId());
sess.save();
RedissonSession sess = (RedissonSession) super.findSession(session.getId());
if (sess != null) {
sess.save();
}
}
}

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Session;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase {
@Override
public void invoke(Request request, Response response) throws IOException, ServletException {
String sessionId = request.getRequestedSessionId();
Session session = request.getContext().getManager().findSession(sessionId);
if (session != null) {
if (!session.isValid()) {
session.expire();
request.getContext().getManager().remove(session);
} else {
manager.add(session);
session.access();
session.endAccess();
}
}
try {
getNext().invoke(request, response);
} finally {

@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
}
}
@ -108,13 +108,21 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) {
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<String, Object> entry : newMap.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
return new AttributesPutAllMessage(getId(), map);
}
@Override
public void setMaxInactiveInterval(int interval) {
@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {

@ -57,7 +57,7 @@ public class RedissonSessionManager extends ManagerBase {
private UpdateMode updateMode = UpdateMode.DEFAULT;
private String keyPrefix = "";
public String getUpdateMode() {
return updateMode.toString();
}
@ -116,6 +116,7 @@ public class RedissonSessionManager extends ManagerBase {
sessionId = generateSessionId();
}
session.setManager(this);
session.setId(sessionId);
session.save();
@ -129,7 +130,7 @@ public class RedissonSessionManager extends ManagerBase {
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
return redisson.getTopic("redisson:tomcat_session_updates:" + container.getName());
}
@Override
@ -145,6 +146,7 @@ public class RedissonSessionManager extends ManagerBase {
RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id);
session.setManager(this);
session.load(attrs);
session.access();
@ -277,8 +279,10 @@ public class RedissonSessionManager extends ManagerBase {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
RedissonSession sess = (RedissonSession) findSession(session.getId());
sess.save();
RedissonSession sess = (RedissonSession) super.findSession(session.getId());
if (sess != null) {
sess.save();
}
}
}

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Session;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase {
@Override
public void invoke(Request request, Response response) throws IOException, ServletException {
String sessionId = request.getRequestedSessionId();
Session session = request.getContext().getManager().findSession(sessionId);
if (session != null) {
if (!session.isValid()) {
session.expire();
request.getContext().getManager().remove(session);
} else {
manager.add(session);
session.access();
session.endAccess();
}
}
try {
getNext().invoke(request, response);
} finally {

@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
}
}
@ -108,7 +108,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -116,6 +116,14 @@ public class RedissonSession extends StandardSession {
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) {
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<String, Object> entry : newMap.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
return new AttributesPutAllMessage(getId(), map);
}
@Override
public void setMaxInactiveInterval(int interval) {
super.setMaxInactiveInterval(interval);
@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {

@ -36,6 +36,9 @@ import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.PlatformDependent;
/**
* Redisson Session Manager for Apache Tomcat
*
@ -57,6 +60,15 @@ public class RedissonSessionManager extends ManagerBase {
private String keyPrefix = "";
private final String id = ByteBufUtil.hexDump(generateId());
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
return id;
}
public String getUpdateMode() {
return updateMode.toString();
}
@ -115,6 +127,7 @@ public class RedissonSessionManager extends ManagerBase {
sessionId = generateSessionId();
}
session.setManager(this);
session.setId(sessionId);
session.save();
@ -128,7 +141,7 @@ public class RedissonSessionManager extends ManagerBase {
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
return redisson.getTopic("redisson:tomcat_session_updates:" + getContext().getName());
}
@Override
@ -144,6 +157,7 @@ public class RedissonSessionManager extends ManagerBase {
RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id);
session.setManager(this);
session.load(attrs);
session.access();
@ -239,10 +253,14 @@ public class RedissonSessionManager extends ManagerBase {
}
try {
try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
return Redisson.create(config);
} catch (Exception e) {
@ -272,9 +290,11 @@ public class RedissonSessionManager extends ManagerBase {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
RedissonSession sess = (RedissonSession) findSession(session.getId());
RedissonSession sess = (RedissonSession) super.findSession(session.getId());
if (sess != null) {
sess.save();
}
}
}
}

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Session;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase {
@Override
public void invoke(Request request, Response response) throws IOException, ServletException {
String sessionId = request.getRequestedSessionId();
Session session = request.getContext().getManager().findSession(sessionId);
if (session != null) {
if (!session.isValid()) {
session.expire();
request.getContext().getManager().remove(session);
} else {
manager.add(session);
session.access();
session.endAccess();
}
}
try {
getNext().invoke(request, response);
} finally {

@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
}
}
@ -108,7 +108,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -116,6 +116,14 @@ public class RedissonSession extends StandardSession {
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) {
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<String, Object> entry : newMap.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
return new AttributesPutAllMessage(getId(), map);
}
@Override
public void setMaxInactiveInterval(int interval) {
super.setMaxInactiveInterval(interval);
@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {

@ -36,6 +36,9 @@ import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.PlatformDependent;
/**
* Redisson Session Manager for Apache Tomcat
*
@ -57,6 +60,15 @@ public class RedissonSessionManager extends ManagerBase {
private String keyPrefix = "";
private final String id = ByteBufUtil.hexDump(generateId());
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
PlatformDependent.threadLocalRandom().nextBytes(id);
return id;
}
public String getUpdateMode() {
return updateMode.toString();
}
@ -115,6 +127,7 @@ public class RedissonSessionManager extends ManagerBase {
sessionId = generateSessionId();
}
session.setManager(this);
session.setId(sessionId);
session.save();
@ -128,7 +141,7 @@ public class RedissonSessionManager extends ManagerBase {
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
return redisson.getTopic("redisson:tomcat_session_updates:" + getContext().getName());
}
@Override
@ -144,6 +157,7 @@ public class RedissonSessionManager extends ManagerBase {
RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id);
session.setManager(this);
session.load(attrs);
session.access();
@ -239,10 +253,14 @@ public class RedissonSessionManager extends ManagerBase {
}
try {
try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
return Redisson.create(config);
} catch (Exception e) {
@ -272,9 +290,11 @@ public class RedissonSessionManager extends ManagerBase {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
RedissonSession sess = (RedissonSession) findSession(session.getId());
RedissonSession sess = (RedissonSession) super.findSession(session.getId());
if (sess != null) {
sess.save();
}
}
}
}

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Session;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase {
@Override
public void invoke(Request request, Response response) throws IOException, ServletException {
String sessionId = request.getRequestedSessionId();
Session session = request.getContext().getManager().findSession(sessionId);
if (session != null) {
if (!session.isValid()) {
session.expire();
request.getContext().getManager().remove(session);
} else {
manager.add(session);
session.access();
session.endAccess();
}
}
try {
getNext().invoke(request, response);
} finally {

@ -23,7 +23,9 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RObject;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonObjectFactory;
@ -257,4 +259,64 @@ public abstract class RedissonObject implements RObject {
}
}
@Override
public byte[] dump() {
return get(dumpAsync());
}
@Override
public RFuture<byte[]> dumpAsync() {
return commandExecutor.readAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.DUMP, getName());
}
@Override
public void restore(byte[] state) {
get(restoreAsync(state));
}
@Override
public RFuture<Void> restoreAsync(byte[] state) {
return restoreAsync(state, 0, null);
}
@Override
public void restore(byte[] state, long timeToLive, TimeUnit timeUnit) {
get(restoreAsync(state, timeToLive, timeUnit));
}
@Override
public RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit) {
long ttl = 0;
if (timeToLive > 0) {
ttl = timeUnit.toMillis(timeToLive);
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RESTORE, getName(), ttl, state);
}
@Override
public void restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit) {
get(restoreAndReplaceAsync(state, timeToLive, timeUnit));
}
@Override
public RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit) {
long ttl = 0;
if (timeToLive > 0) {
ttl = timeUnit.toMillis(timeToLive);
}
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RESTORE, getName(), ttl, state, "REPLACE");
}
@Override
public void restoreAndReplace(byte[] state) {
get(restoreAndReplaceAsync(state));
}
@Override
public RFuture<Void> restoreAndReplaceAsync(byte[] state) {
return restoreAndReplaceAsync(state, 0, null);
}
}

@ -153,7 +153,7 @@ public class RedissonRateLimiter extends RedissonObject implements RRateLimiter
public void run() {
tryAcquireAsync(permits, promise, timeoutInMillis);
}
}, delay, TimeUnit.SECONDS);
}, delay, TimeUnit.MILLISECONDS);
return;
}

@ -16,7 +16,8 @@
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -147,8 +148,8 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, Collection<String> keys, StreamId ... ids) {
return readAsync(count, -1, null, keys, ids);
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> keyToId) {
return readAsync(count, -1, null, id, keyToId);
}
@Override
@ -157,11 +158,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId... ids) {
if (keys.size() + 1 != ids.length) {
throw new IllegalArgumentException("keys amount should be lower by one than ids amount");
}
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
List<Object> params = new ArrayList<Object>();
if (count > 0) {
params.add("COUNT");
@ -175,14 +172,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
params.add("STREAMS");
params.add(getName());
if (keys != null) {
for (String key : keys) {
params.add(key);
}
for (String key : keyToId.keySet()) {
params.add(key);
}
for (StreamId id : ids) {
params.add(id.toString());
params.add(id);
for (StreamId nextId : keyToId.values()) {
params.add(nextId.toString());
}
if (timeout > 0) {
@ -294,14 +290,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, Collection<String> keys, StreamId... ids) {
return get(readAsync(count, keys, ids));
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> keyToId) {
return get(readAsync(count, id, keyToId));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, Collection<String> keys,
StreamId... ids) {
return get(readAsync(count, timeout, unit, keys, ids));
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
return get(readAsync(count, timeout, unit, id, keyToId));
}
@Override
@ -355,14 +350,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(Collection<String> keys, StreamId... ids) {
return readAsync(0, keys, ids);
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> keyToId) {
return readAsync(0, id, keyToId);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit,
Collection<String> keys, StreamId... ids) {
return readAsync(0, timeout, unit, keys, ids);
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
return readAsync(0, timeout, unit, id, keyToId);
}
@Override
@ -386,14 +380,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(Collection<String> keys, StreamId... ids) {
return read(0, keys, ids);
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> keyToId) {
return read(0, id, keyToId);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, Collection<String> keys,
StreamId... ids) {
return read(0, timeout, unit, keys, ids);
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) {
return read(0, timeout, unit, id, keyToId);
}
@Override
@ -405,5 +398,109 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
public Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId) {
return rangeReversed(0, startId, endId);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2) {
return readAsync(id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2) {
return readAsync(count, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2,
String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2) {
return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2, String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(timeout, unit, id, params);
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2) {
return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2));
}
@Override
public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id,
String key2, StreamId id2, String key3, StreamId id3) {
Map<String, StreamId> params = new HashMap<String, StreamId>(2);
params.put(key2, id2);
params.put(key3, id3);
return readAsync(count, timeout, unit, id, params);
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2) {
return get(readAsync(id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
return get(readAsync(id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2) {
return get(readAsync(count, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2, String key3,
StreamId id3) {
return get(readAsync(count, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2) {
return get(readAsync(timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2, String key3, StreamId id3) {
return get(readAsync(timeout, unit, id, key2, id2, key3, id3));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2) {
return get(readAsync(count, timeout, unit, id, key2, id2));
}
@Override
public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2,
StreamId id2, String key3, StreamId id3) {
return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3));
}
}

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
/**
@ -25,6 +27,45 @@ import org.redisson.client.codec.Codec;
*/
public interface RObject extends RObjectAsync {
/**
* Restores object using its state returned by {@link #dump()} method.
*
* @param state - state of object
*/
void restore(byte[] state);
/**
* Restores object using its state returned by {@link #dump()} method and set time to live for it.
*
* @param state - state of object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
*/
void restore(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Restores and replaces object if it already exists.
*
* @param state - state of the object
*/
void restoreAndReplace(byte[] state);
/**
* Restores and replaces object if it already exists and set time to live for it.
*
* @param state - state of the object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
*/
void restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Returns dump of object
*
* @return dump
*/
byte[] dump();
/**
* Update the last access time of an object.
*

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
/**
* Base interface for all Redisson objects
*
@ -23,6 +25,49 @@ package org.redisson.api;
*/
public interface RObjectAsync {
/**
* Restores object using its state returned by {@link #dumpAsync()} method.
*
* @param state - state of object
* @return void
*/
RFuture<Void> restoreAsync(byte[] state);
/**
* Restores object using its state returned by {@link #dumpAsync()} method and set time to live for it.
*
* @param state - state of object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
* @return void
*/
RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Restores and replaces object if it already exists.
*
* @param state - state of the object
* @return void
*/
RFuture<Void> restoreAndReplaceAsync(byte[] state);
/**
* Restores and replaces object if it already exists and set time to live for it.
*
* @param state - state of the object
* @param timeToLive - time to live of the object
* @param timeUnit - time unit
* @return void
*/
RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
/**
* Returns dump of object
*
* @return dump
*/
RFuture<byte[]> dumpAsync();
/**
* Update the last access time of an object in async mode.
*

@ -163,52 +163,152 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable {
Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Read stream data by specified stream name including this stream.
*
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(Collection<String> keys, StreamId ... ids);
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Read stream data by specified stream id mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, Collection<String> keys, StreamId ... ids);
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
* Read stream data by specified stream id mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).

@ -15,7 +15,6 @@
*/
package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -167,52 +166,152 @@ public interface RStreamAsync<K, V> extends RExpirableAsync {
RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Read stream data by specified stream name including this stream.
*
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(Collection<String> keys, StreamId ... ids);
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Read stream data by specified stream id mapped by name including this stream.
*
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
*
* @param count - stream data size limit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
*
* @param count - stream data size limit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, Collection<String> keys, StreamId ... ids);
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified collection of keys including this stream and Stream ID per key.
* First Stream ID is related to this stream.
* Wait for first stream data availability for specified <code>timeout</code> interval.
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data by specified stream name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param keys - collection of keys
* @param ids - collection of Stream IDs
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2);
/**
* Read stream data by specified stream names including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param name2 - name of second stream
* @param id2 - id of second stream
* @param name3 - name of third stream
* @param id3 - id of third stream
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3);
/**
* Read stream data by specified stream id mapped by name including this stream.
* Wait for the first stream data availability for specified <code>timeout</code> interval.
*
* @param count - stream data size limit
* @param timeout - time interval to wait for stream data availability
* @param unit - time interval unit
* @param id - id of this stream
* @param nameToId - stream id mapped by name
* @return stream data mapped by key and Stream ID
*/
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids);
RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId);
/**
* Read stream data in range by specified start Stream ID (included) and end Stream ID (included).

@ -326,12 +326,12 @@ public class RedisClient {
for (Channel channel : channels) {
RedisConnection connection = RedisConnection.getFrom(channel);
if (connection != null) {
connection.setClosed(true);
connection.closeAsync();
}
}
ChannelGroupFuture channelsFuture = channels.close();
final RPromise<Void> result = new RedissonPromise<Void>();
ChannelGroupFuture channelsFuture = channels.newCloseFuture();
channelsFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {

@ -27,7 +27,7 @@ import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -195,7 +195,7 @@ public class RedisConnection implements RedisCommands {
return async(-1, encoder, command, params);
}
public <T, R> RFuture<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object ... params) {
public <T, R> RFuture<R> async(long timeout, Codec encoder, final RedisCommand<T> command, final Object ... params) {
final RPromise<R> promise = new RedissonPromise<R>();
if (timeout == -1) {
timeout = redisClient.getCommandTimeout();
@ -209,7 +209,9 @@ public class RedisConnection implements RedisCommands {
final ScheduledFuture<?> scheduledFuture = redisClient.getEventLoopGroup().schedule(new Runnable() {
@Override
public void run() {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: "
+ command + ", command params: " + LogHelper.toString(params)
+ ", Redis client: " + redisClient);
promise.tryFailure(ex);
}
}, timeout, TimeUnit.MILLISECONDS);
@ -229,7 +231,7 @@ public class RedisConnection implements RedisCommands {
return new CommandData<T, R>(promise, encoder, command, params);
}
public void setClosed(boolean closed) {
private void setClosed(boolean closed) {
this.closed = closed;
}
@ -246,10 +248,19 @@ public class RedisConnection implements RedisCommands {
fastReconnect = null;
}
private void close() {
CommandData command = getCurrentCommand();
if (command != null && command.isBlockingCommand()) {
channel.close();
} else {
async(RedisCommands.QUIT);
}
}
public RFuture<Void> forceFastReconnectAsync() {
RedissonPromise<Void> promise = new RedissonPromise<Void>();
fastReconnect = promise;
channel.close();
close();
return promise;
}
@ -265,7 +276,8 @@ public class RedisConnection implements RedisCommands {
public ChannelFuture closeAsync() {
setClosed(true);
return channel.close();
close();
return channel.closeFuture();
}
@Override

@ -105,24 +105,29 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().setDecoderState(null);
if (data == null) {
try {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel());
}
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
decodeCommand(ctx, in, data);
}
protected void sendNext(ChannelHandlerContext ctx, QueueCommand data) {
if (data != null) {
if (data.isExecuted()) {
sendNext(ctx);
throw e;
}
} else if (data instanceof CommandData) {
} else {
sendNext(ctx);
}
}
protected void decodeCommand(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception {
if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
if (state().getLevels().size() > 0) {
decodeFromCheckpoint(ctx, in, data, cmd);
} else {
decode(in, cmd, null, ctx.channel());
decode(in, cmd, null, ctx.channel(), false);
}
sendNext(ctx, data);
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
cmd.tryFailure(e);
@ -138,10 +143,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
sendNext(ctx);
throw e;
}
return;
}
sendNext(ctx);
}
protected void sendNext(ChannelHandlerContext ctx) {
@ -149,7 +151,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state(null);
}
private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandData<Object, Object> cmd) throws IOException {
if (state().getLevels().size() == 2) {
StateLevel secondLevel = state().getLevels().get(1);
@ -163,7 +165,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts(), false);
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts());
if (decoder != null) {
@ -177,23 +179,23 @@ public class CommandDecoder extends ReplayingDecoder<State> {
StateLevel firstLevel = state().getLevels().get(0);
if (firstLevel.getParts().isEmpty() && firstLevel.getLastList() == null) {
state().resetLevel();
decode(in, cmd, null, ctx.channel());
decode(in, cmd, null, ctx.channel(), false);
} else {
if (firstLevel.getLastList() != null) {
if (firstLevel.getLastList().isEmpty()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel());
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
} else {
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList());
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList(), false);
}
firstLevel.setLastList(null);
firstLevel.setLastListSize(0);
while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel());
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
}
decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts());
decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false);
} else {
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false);
}
}
}
@ -226,7 +228,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
try {
decode(in, commandData, null, ctx.channel());
decode(in, commandData, null, ctx.channel(), !commandBatch.isAtomic());
} finally {
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
commandsData.remove();
@ -284,7 +286,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel) throws IOException {
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor) throws IOException {
int code = in.readByte();
if (code == '+') {
ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r'));
@ -292,7 +294,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
String result = rb.toString(CharsetUtil.UTF_8);
in.skipBytes(2);
handleResult(data, parts, result, false, channel);
handleResult(data, parts, result, skipConvertor, channel);
} finally {
rb.release();
}
@ -368,7 +370,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
decodeList(in, data, parts, channel, size, respParts);
decodeList(in, data, parts, channel, size, respParts, skipConvertor);
if (lastLevel != null && lastLevel.getLastList() != null) {
lastLevel.setLastList(null);
@ -382,7 +384,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
@SuppressWarnings("unchecked")
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts)
Channel channel, long size, List<Object> respParts, boolean skipConvertor)
throws IOException {
if (parts == null && commandsData.get() != null) {
List<CommandData<?, ?>> commands = commandsData.get();
@ -392,7 +394,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
suffix = 1;
}
CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix);
decode(in, commandData, respParts, channel);
decode(in, commandData, respParts, channel, skipConvertor);
if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) {
data.tryFailure(commandData.cause());
}
@ -403,7 +405,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
} else {
for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel);
decode(in, data, respParts, channel, skipConvertor);
if (state().isMakeCheckpoint()) {
checkpoint();
}
@ -426,8 +428,8 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) {
if (data != null && !multiResult) {
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor, Channel channel) {
if (data != null && !skipConvertor) {
result = data.getCommand().getConvertor().convert(result);
}
if (parts != null) {
@ -448,7 +450,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts.isEmpty()) {
return null;
}
}
}
return data.getCommand().getReplayMultiDecoder();
}

@ -27,14 +27,19 @@ import java.util.concurrent.ExecutorService;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.PlatformDependent;
/**
@ -63,11 +68,42 @@ public class CommandPubSubDecoder extends CommandDecoder {
commands.put(new PubSubKey(channel, operation), data);
}
@Override
protected void decodeCommand(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception {
if (data == null) {
try {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel(), false);
}
sendNext(ctx);
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
sendNext(ctx);
throw e;
}
} else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
if (state().getLevels().size() > 0) {
decodeFromCheckpoint(ctx, in, data, cmd);
} else {
while (in.writerIndex() > in.readerIndex()) {
decode(in, cmd, null, ctx.channel(), false);
}
}
sendNext(ctx, data);
} catch (Exception e) {
log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e);
cmd.tryFailure(e);
sendNext(ctx);
throw e;
}
}
}
@Override
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
final Object result) throws IOException {
super.decodeResult(data, parts, channel, result);
if (result instanceof Message) {
checkpoint();
@ -117,6 +153,10 @@ public class CommandPubSubDecoder extends CommandDecoder {
}
});
}
} else {
if (data != null && data.getCommand().getName().equals("PING")) {
super.decodeResult(data, parts, channel, result);
}
}
}
@ -157,28 +197,26 @@ public class CommandPubSubDecoder extends CommandDecoder {
@Override
protected MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) {
if (data == null) {
if (parts.isEmpty()) {
return null;
}
String command = parts.get(0).toString();
if (MESSAGES.contains(command)) {
String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = commands.get(key);
if (commandData == null) {
return null;
}
return commandData.getCommand().getReplayMultiDecoder();
} else if (command.equals("message")) {
String channelName = (String) parts.get(1);
return entries.get(channelName).getDecoder();
} else if (command.equals("pmessage")) {
String patternName = (String) parts.get(1);
return entries.get(patternName).getDecoder();
} else if (command.equals("pong")) {
if (parts.isEmpty()) {
return null;
}
String command = parts.get(0).toString();
if (MESSAGES.contains(command)) {
String channelName = parts.get(1).toString();
PubSubKey key = new PubSubKey(channelName, command);
CommandData<Object, Object> commandData = commands.get(key);
if (commandData == null) {
return null;
}
return commandData.getCommand().getReplayMultiDecoder();
} else if (command.equals("message")) {
String channelName = (String) parts.get(1);
return entries.get(channelName).getDecoder();
} else if (command.equals("pmessage")) {
String patternName = (String) parts.get(1);
return entries.get(patternName).getDecoder();
} else if (command.equals("pong")) {
return new ListObjectDecoder<Object>(0);
}
return data.getCommand().getReplayMultiDecoder();
@ -186,7 +224,10 @@ public class CommandPubSubDecoder extends CommandDecoder {
@Override
protected Decoder<Object> selectDecoder(CommandData<Object, Object> data, List<Object> parts) {
if (data == null && parts != null) {
if (parts != null) {
if (data != null && parts.size() == 1 && "pong".equals(parts.get(0))) {
return data.getCodec().getValueDecoder();
}
if (parts.size() == 2 && "message".equals(parts.get(0))) {
String channelName = (String) parts.get(1);
return entries.get(channelName).getDecoder().getDecoder(parts.size(), state());
@ -196,6 +237,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
return entries.get(patternName).getDecoder().getDecoder(parts.size(), state());
}
}
if (data != null && data.getCommand().getName().equals(RedisCommands.PING.getName())) {
return data.getCodec().getValueDecoder();
}

@ -102,4 +102,9 @@ public class CommandData<T, R> implements QueueCommand {
|| RedisCommands.XREAD_BLOCKING == command;
}
@Override
public boolean isExecuted() {
return promise.isDone();
}
}

@ -96,4 +96,9 @@ public class CommandsData implements QueueCommand {
return "CommandsData [commands=" + commands + "]";
}
@Override
public boolean isExecuted() {
return promise.isDone();
}
}

@ -28,4 +28,6 @@ public interface QueueCommand {
boolean tryFailure(Throwable cause);
boolean isExecuted();
}

@ -289,6 +289,9 @@ public interface RedisCommands {
RedisStrictCommand<Long> UNLINK = new RedisStrictCommand<Long>("UNLINK");
RedisStrictCommand<Boolean> UNLINK_BOOL = new RedisStrictCommand<Boolean>("UNLINK", new BooleanNullSafeReplayConvertor());
RedisCommand<Object> DUMP = new RedisCommand<Object>("DUMP");
RedisStrictCommand<Void> RESTORE = new RedisStrictCommand<Void>("RESTORE", new VoidReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor());
@ -340,6 +343,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> RENAME = new RedisStrictCommand<Void>("RENAME", new VoidReplayConvertor());
RedisStrictCommand<Boolean> MOVE = new RedisStrictCommand<Boolean>("MOVE", new BooleanReplayConvertor());
RedisStrictCommand<Void> MIGRATE = new RedisStrictCommand<Void>("MIGRATE", new VoidReplayConvertor());
RedisStrictCommand<Void> QUIT = new RedisStrictCommand<Void>("QUIT", new VoidReplayConvertor());
RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH");

@ -627,12 +627,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly();
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, getEntrySet().size());
for (MasterSlaveEntry entry : getEntrySet()) {
@ -641,6 +635,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
result.awaitUninterruptibly(timeout, unit);
resolverGroup.close();
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly();
if (cfg.getEventLoopGroup() == null) {
group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly();

@ -29,6 +29,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.api.RMapAsync;
import org.redisson.api.RMapCacheAsync;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScript;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RedissonClient;
@ -78,6 +79,17 @@ public class RedissonBatchTest extends BaseTest {
List<?> t = batch.execute();
System.out.println(t);
}
@Test
public void testConvertor() {
RBatch batch = redisson.createBatch(batchOptions);
batch.getScoredSortedSet("myZKey").addScoreAsync("abc", 1d);
batch.execute();
RScoredSortedSet<String> set = redisson.getScoredSortedSet("myZKey");
assertThat(set.getScore("abc")).isEqualTo(1d);
}
@Test
public void testConnectionLeakAfterError() throws InterruptedException {

@ -17,6 +17,41 @@ import org.redisson.config.Config;
public class RedissonBucketTest extends BaseTest {
@Test
public void testDumpAndRestore() {
RBucket<Integer> al = redisson.getBucket("test");
al.set(1234);
byte[] state = al.dump();
al.delete();
al.restore(state);
assertThat(al.get()).isEqualTo(1234);
RBucket<Integer> bucket = redisson.getBucket("test2");
bucket.set(300);
bucket.restoreAndReplace(state);
assertThat(bucket.get()).isEqualTo(1234);
}
@Test
public void testDumpAndRestoreTTL() {
RBucket<Integer> al = redisson.getBucket("test");
al.set(1234);
byte[] state = al.dump();
al.delete();
al.restore(state, 10, TimeUnit.SECONDS);
assertThat(al.get()).isEqualTo(1234);
assertThat(al.remainTimeToLive()).isBetween(9500L, 10000L);
RBucket<Integer> bucket = redisson.getBucket("test2");
bucket.set(300);
bucket.restoreAndReplace(state, 10, TimeUnit.SECONDS);
assertThat(bucket.get()).isEqualTo(1234);
}
@Test
public void testGetAndDelete() {
RBucket<Integer> al = redisson.getBucket("test");

@ -19,11 +19,11 @@ public class RedissonRateLimiterTest extends BaseTest {
@Test
public void testAcquire() {
RRateLimiter rr = redisson.getRateLimiter("test");
assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue();
rr.acquire(1);
rr.acquire(5);
rr.acquire(4);
RRateLimiter rr = redisson.getRateLimiter("acquire");
assertThat(rr.trySetRate(RateType.OVERALL, 1, 5, RateIntervalUnit.SECONDS)).isTrue();
for (int i = 0; i < 10; i++) {
rr.acquire(1);
}
assertThat(rr.tryAcquire()).isFalse();
}

@ -2,7 +2,6 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -85,7 +84,7 @@ public class RedissonStreamTest extends BaseTest {
t.start();
long start = System.currentTimeMillis();
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, Collections.singleton("test1"), new StreamId(0), StreamId.NEWEST);
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0), "test1", StreamId.NEWEST);
assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L);
assertThat(s).hasSize(1);
assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1);
@ -141,7 +140,7 @@ public class RedissonStreamTest extends BaseTest {
@Test
public void testReadMultiKeysEmpty() {
RStream<String, String> stream = redisson.getStream("test2");
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0));
Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, new StreamId(0), "test1", new StreamId(0));
assertThat(s).isEmpty();
}
@ -160,7 +159,7 @@ public class RedissonStreamTest extends BaseTest {
entries2.put("6", "66");
stream2.addAll(entries2);
Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0));
Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, new StreamId(0), "test1", new StreamId(0));
assertThat(s).hasSize(2);
assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1);
assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2);

@ -239,14 +239,19 @@ public class RedissonTest {
});
assertThat(id).isNotZero();
r.getBucket("1").get();
Assert.assertEquals(0, p.stop());
await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1);
try {
r.getBucket("1").get();
} catch (Exception e) {
}
assertThat(connectCounter.get()).isEqualTo(0);
assertThat(disconnectCounter.get()).isEqualTo(1);
RedisProcess pp = new RedisRunner()
.nosave()
@ -256,12 +261,11 @@ public class RedissonTest {
r.getBucket("1").get();
r.shutdown();
assertThat(connectCounter.get()).isEqualTo(1);
assertThat(disconnectCounter.get()).isEqualTo(1);
r.shutdown();
Assert.assertEquals(0, pp.stop());
await().atMost(2, TimeUnit.SECONDS).until(() -> connectCounter.get() == 1);
await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1);
}
@Test

Loading…
Cancel
Save