diff --git a/CHANGELOG.md b/CHANGELOG.md index cab0fa700..28f2d5aa0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,18 @@ Redisson Releases History Try __[Redisson PRO](https://redisson.pro)__ version. -### 14-Jun-2018 - versions 2.12.1 and 3.7.1 released +### 27-Jun-2018 - versions 2.12.3 and 3.7.3 released +Feature - added `RKeys.getKeys` method with batch size +Feature - added `SnappyCodecV2` codec +Fixed - `SerializationCodec` doesn't support proxied classes +Fixed - NPE if `RScheduledExecutorService`'s task scheduled with cron expression for finite number of execution +Fixed - validation of cron expression parameter of `RScheduledExecutorService.schedule` method +Feature - Iterator with batch size param for all `RSet`, `RMap`, `RMapCached` objects +Fixed - missing PubSub messages when `pingConnectionInterval` setting is specified +Fixed - excessive memory consumption if batched commands queued on Redis side +Fixed - `RRateLimiter.acquire` method throws NPE + +### 14-Jun-2018 - versions 2.12.2 and 3.7.2 released Feature - `RBatchOptions.executionMode` setting added. Please refer to [documentation](https://github.com/redisson/redisson/wiki/10.-additional-features#103-execution-batches-of-commands) for more details Fixed - NPE in JCacheManager.close method diff --git a/README.md b/README.md index 431c30bb9..59c980040 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis client ==== -[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** +[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.7.3) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Support chat](https://gitter.im/mrniko/redisson) | **[Redisson PRO](https://redisson.pro)** Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. | Stable <br/> Release Version | Release Date | JDK Version<br/> compatibility | `CompletionStage` <br/> support | `ProjectReactor` version<br/> compatibility | | ------------- | ------------- | ------------| -----------| -----------| -| 3.7.2 | 14.06.2018 | 1.8, 1.9, 1.10+ | Yes | 3.1.x | -| 2.12.2 | 14.06.2018 | 1.6, 1.7, 1.8, 1.9, 1.10, Android | No | 2.0.8 | +| 3.7.3 | 27.06.2018 | 1.8, 1.9, 1.10+ | Yes | 3.1.x | +| 2.12.3 | 27.06.2018 | 1.6, 1.7, 1.8, 1.9, 1.10, Android | No | 2.0.8 | Features @@ -100,23 +100,23 @@ Quick start <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> - <version>3.7.2</version> + <version>3.7.3</version> </dependency> <!-- JDK 1.6+ compatible --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> - <version>2.12.2</version> + <version>2.12.3</version> </dependency> #### Gradle // JDK 1.8+ compatible - compile 'org.redisson:redisson:3.7.2' + compile 'org.redisson:redisson:3.7.3' // JDK 1.6+ compatible - compile 'org.redisson:redisson:2.12.2' + compile 'org.redisson:redisson:2.12.3' #### Java @@ -141,11 +141,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService"); Downloads =============================== -[Redisson 3.7.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.7.2&e=jar), -[Redisson node 3.7.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.7.2&e=jar) +[Redisson 3.7.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.7.3&e=jar), +[Redisson node 3.7.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.7.3&e=jar) -[Redisson 2.12.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.12.2&e=jar), -[Redisson node 2.12.2](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.12.2&e=jar) +[Redisson 2.12.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.12.3&e=jar), +[Redisson node 2.12.3](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.12.3&e=jar) FAQs =============================== diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java index 246a575d9..fc63b9420 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } } } @@ -108,7 +108,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (getMaxInactiveInterval() >= 0) { map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS); @@ -116,6 +116,14 @@ public class RedissonSession extends StandardSession { } } + protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) { + Map<String, Object> map = new HashMap<String, Object>(); + for (Entry<String, Object> entry : newMap.entrySet()) { + map.put(entry.getKey(), entry.getValue()); + } + return new AttributesPutAllMessage(getId(), map); + } + @Override public void setMaxInactiveInterval(int interval) { super.setMaxInactiveInterval(interval); @@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession { map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (maxInactiveInterval >= 0) { diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 54f68db0b..988d21bf6 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -137,6 +137,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { sessionId = generateSessionId(); } + session.setManager(this); session.setId(sessionId); session.save(); @@ -150,7 +151,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { } public RTopic<AttributeMessage> getTopic() { - return redisson.getTopic("redisson:tomcat_session_updates"); + return redisson.getTopic("redisson:tomcat_session_updates:" + container.getName()); } @Override @@ -166,6 +167,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { RedissonSession session = (RedissonSession) createEmptySession(); session.setId(id); + session.setManager(this); session.load(attrs); session.access(); @@ -260,6 +262,15 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { } try { + try { + Config c = new Config(config); + Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class) + .newInstance(Thread.currentThread().getContextClassLoader()); + config.setCodec(codec); + } catch (Exception e) { + throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); + } + return Redisson.create(config); } catch (Exception e) { throw new LifecycleException(e); @@ -285,8 +296,10 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle { } if (updateMode == UpdateMode.AFTER_REQUEST) { - RedissonSession sess = (RedissonSession) findSession(session.getId()); - sess.save(); + RedissonSession sess = (RedissonSession) super.findSession(session.getId()); + if (sess != null) { + sess.save(); + } } } diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/UpdateValve.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/UpdateValve.java index a46876828..3eecc4cc9 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/UpdateValve.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/UpdateValve.java @@ -19,6 +19,7 @@ import java.io.IOException; import javax.servlet.ServletException; +import org.apache.catalina.Session; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.catalina.valves.ValveBase; @@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase { @Override public void invoke(Request request, Response response) throws IOException, ServletException { + String sessionId = request.getRequestedSessionId(); + Session session = request.getContext().getManager().findSession(sessionId); + if (session != null) { + if (!session.isValid()) { + session.expire(); + request.getContext().getManager().remove(session); + } else { + manager.add(session); + session.access(); + session.endAccess(); + } + } + try { getNext().invoke(request, response); } finally { diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java index 7edc34a51..cf1b453ee 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } } } @@ -108,13 +108,21 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (getMaxInactiveInterval() >= 0) { map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS); } } } + + protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) { + Map<String, Object> map = new HashMap<String, Object>(); + for (Entry<String, Object> entry : newMap.entrySet()) { + map.put(entry.getKey(), entry.getValue()); + } + return new AttributesPutAllMessage(getId(), map); + } @Override public void setMaxInactiveInterval(int interval) { @@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession { map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (maxInactiveInterval >= 0) { diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index aaf021e87..c2bc23585 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -57,7 +57,7 @@ public class RedissonSessionManager extends ManagerBase { private UpdateMode updateMode = UpdateMode.DEFAULT; private String keyPrefix = ""; - + public String getUpdateMode() { return updateMode.toString(); } @@ -116,6 +116,7 @@ public class RedissonSessionManager extends ManagerBase { sessionId = generateSessionId(); } + session.setManager(this); session.setId(sessionId); session.save(); @@ -129,7 +130,7 @@ public class RedissonSessionManager extends ManagerBase { } public RTopic<AttributeMessage> getTopic() { - return redisson.getTopic("redisson:tomcat_session_updates"); + return redisson.getTopic("redisson:tomcat_session_updates:" + container.getName()); } @Override @@ -145,6 +146,7 @@ public class RedissonSessionManager extends ManagerBase { RedissonSession session = (RedissonSession) createEmptySession(); session.setId(id); + session.setManager(this); session.load(attrs); session.access(); @@ -277,8 +279,10 @@ public class RedissonSessionManager extends ManagerBase { } if (updateMode == UpdateMode.AFTER_REQUEST) { - RedissonSession sess = (RedissonSession) findSession(session.getId()); - sess.save(); + RedissonSession sess = (RedissonSession) super.findSession(session.getId()); + if (sess != null) { + sess.save(); + } } } diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/UpdateValve.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/UpdateValve.java index a46876828..3eecc4cc9 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/UpdateValve.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/UpdateValve.java @@ -19,6 +19,7 @@ import java.io.IOException; import javax.servlet.ServletException; +import org.apache.catalina.Session; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.catalina.valves.ValveBase; @@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase { @Override public void invoke(Request request, Response response) throws IOException, ServletException { + String sessionId = request.getRequestedSessionId(); + Session session = request.getContext().getManager().findSession(sessionId); + if (session != null) { + if (!session.isValid()) { + session.expire(); + request.getContext().getManager().remove(session); + } else { + manager.add(session); + session.access(); + session.endAccess(); + } + } + try { getNext().invoke(request, response); } finally { diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java index bc16efaec..acdee8324 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } } } @@ -108,7 +108,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (getMaxInactiveInterval() >= 0) { map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS); @@ -116,6 +116,14 @@ public class RedissonSession extends StandardSession { } } + protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) { + Map<String, Object> map = new HashMap<String, Object>(); + for (Entry<String, Object> entry : newMap.entrySet()) { + map.put(entry.getKey(), entry.getValue()); + } + return new AttributesPutAllMessage(getId(), map); + } + @Override public void setMaxInactiveInterval(int interval) { super.setMaxInactiveInterval(interval); @@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession { map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (maxInactiveInterval >= 0) { diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 6ce1bac1c..8c3208091 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -36,6 +36,9 @@ import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; import org.redisson.config.Config; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.internal.PlatformDependent; + /** * Redisson Session Manager for Apache Tomcat * @@ -57,6 +60,15 @@ public class RedissonSessionManager extends ManagerBase { private String keyPrefix = ""; + private final String id = ByteBufUtil.hexDump(generateId()); + + protected static byte[] generateId() { + byte[] id = new byte[16]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + PlatformDependent.threadLocalRandom().nextBytes(id); + return id; + } + public String getUpdateMode() { return updateMode.toString(); } @@ -115,6 +127,7 @@ public class RedissonSessionManager extends ManagerBase { sessionId = generateSessionId(); } + session.setManager(this); session.setId(sessionId); session.save(); @@ -128,7 +141,7 @@ public class RedissonSessionManager extends ManagerBase { } public RTopic<AttributeMessage> getTopic() { - return redisson.getTopic("redisson:tomcat_session_updates"); + return redisson.getTopic("redisson:tomcat_session_updates:" + getContext().getName()); } @Override @@ -144,6 +157,7 @@ public class RedissonSessionManager extends ManagerBase { RedissonSession session = (RedissonSession) createEmptySession(); session.setId(id); + session.setManager(this); session.load(attrs); session.access(); @@ -239,10 +253,14 @@ public class RedissonSessionManager extends ManagerBase { } try { + try { Config c = new Config(config); Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class) .newInstance(Thread.currentThread().getContextClassLoader()); config.setCodec(codec); + } catch (Exception e) { + throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); + } return Redisson.create(config); } catch (Exception e) { @@ -272,9 +290,11 @@ public class RedissonSessionManager extends ManagerBase { } if (updateMode == UpdateMode.AFTER_REQUEST) { - RedissonSession sess = (RedissonSession) findSession(session.getId()); + RedissonSession sess = (RedissonSession) super.findSession(session.getId()); + if (sess != null) { sess.save(); } } + } } diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/UpdateValve.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/UpdateValve.java index a46876828..3eecc4cc9 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/UpdateValve.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/UpdateValve.java @@ -19,6 +19,7 @@ import java.io.IOException; import javax.servlet.ServletException; +import org.apache.catalina.Session; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.catalina.valves.ValveBase; @@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase { @Override public void invoke(Request request, Response response) throws IOException, ServletException { + String sessionId = request.getRequestedSessionId(); + Session session = request.getContext().getManager().findSession(sessionId); + if (session != null) { + if (!session.isValid()) { + session.expire(); + request.getContext().getManager().remove(session); + } else { + manager.add(session); + session.access(); + session.endAccess(); + } + } + try { getNext().invoke(request, response); } finally { diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSession.java index bc16efaec..acdee8324 100644 --- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -93,7 +93,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } } } @@ -108,7 +108,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:thisAccessedTime", thisAccessedTime); map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (getMaxInactiveInterval() >= 0) { map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS); @@ -116,6 +116,14 @@ public class RedissonSession extends StandardSession { } } + protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) { + Map<String, Object> map = new HashMap<String, Object>(); + for (Entry<String, Object> entry : newMap.entrySet()) { + map.put(entry.getKey(), entry.getValue()); + } + return new AttributesPutAllMessage(getId(), map); + } + @Override public void setMaxInactiveInterval(int interval) { super.setMaxInactiveInterval(interval); @@ -213,7 +221,7 @@ public class RedissonSession extends StandardSession { map.putAll(newMap); if (readMode == ReadMode.MEMORY) { - topic.publish(new AttributesPutAllMessage(getId(), newMap)); + topic.publish(createPutAllMessage(newMap)); } if (maxInactiveInterval >= 0) { diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java index 6ce1bac1c..8c3208091 100644 --- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java +++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java @@ -36,6 +36,9 @@ import org.redisson.api.listener.MessageListener; import org.redisson.client.codec.Codec; import org.redisson.config.Config; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.internal.PlatformDependent; + /** * Redisson Session Manager for Apache Tomcat * @@ -57,6 +60,15 @@ public class RedissonSessionManager extends ManagerBase { private String keyPrefix = ""; + private final String id = ByteBufUtil.hexDump(generateId()); + + protected static byte[] generateId() { + byte[] id = new byte[16]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + PlatformDependent.threadLocalRandom().nextBytes(id); + return id; + } + public String getUpdateMode() { return updateMode.toString(); } @@ -115,6 +127,7 @@ public class RedissonSessionManager extends ManagerBase { sessionId = generateSessionId(); } + session.setManager(this); session.setId(sessionId); session.save(); @@ -128,7 +141,7 @@ public class RedissonSessionManager extends ManagerBase { } public RTopic<AttributeMessage> getTopic() { - return redisson.getTopic("redisson:tomcat_session_updates"); + return redisson.getTopic("redisson:tomcat_session_updates:" + getContext().getName()); } @Override @@ -144,6 +157,7 @@ public class RedissonSessionManager extends ManagerBase { RedissonSession session = (RedissonSession) createEmptySession(); session.setId(id); + session.setManager(this); session.load(attrs); session.access(); @@ -239,10 +253,14 @@ public class RedissonSessionManager extends ManagerBase { } try { + try { Config c = new Config(config); Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class) .newInstance(Thread.currentThread().getContextClassLoader()); config.setCodec(codec); + } catch (Exception e) { + throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e); + } return Redisson.create(config); } catch (Exception e) { @@ -272,9 +290,11 @@ public class RedissonSessionManager extends ManagerBase { } if (updateMode == UpdateMode.AFTER_REQUEST) { - RedissonSession sess = (RedissonSession) findSession(session.getId()); + RedissonSession sess = (RedissonSession) super.findSession(session.getId()); + if (sess != null) { sess.save(); } } + } } diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/UpdateValve.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/UpdateValve.java index a46876828..3eecc4cc9 100644 --- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/UpdateValve.java +++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/UpdateValve.java @@ -19,6 +19,7 @@ import java.io.IOException; import javax.servlet.ServletException; +import org.apache.catalina.Session; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.catalina.valves.ValveBase; @@ -40,6 +41,19 @@ public class UpdateValve extends ValveBase { @Override public void invoke(Request request, Response response) throws IOException, ServletException { + String sessionId = request.getRequestedSessionId(); + Session session = request.getContext().getManager().findSession(sessionId); + if (session != null) { + if (!session.isValid()) { + session.expire(); + request.getContext().getManager().remove(session); + } else { + manager.add(session); + session.access(); + session.endAccess(); + } + } + try { getNext().invoke(request, response); } finally { diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index 369893943..054f34f44 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -23,7 +23,9 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RObject; +import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RedissonObjectFactory; @@ -257,4 +259,64 @@ public abstract class RedissonObject implements RObject { } } + @Override + public byte[] dump() { + return get(dumpAsync()); + } + + @Override + public RFuture<byte[]> dumpAsync() { + return commandExecutor.readAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.DUMP, getName()); + } + + @Override + public void restore(byte[] state) { + get(restoreAsync(state)); + } + + @Override + public RFuture<Void> restoreAsync(byte[] state) { + return restoreAsync(state, 0, null); + } + + @Override + public void restore(byte[] state, long timeToLive, TimeUnit timeUnit) { + get(restoreAsync(state, timeToLive, timeUnit)); + } + + @Override + public RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit) { + long ttl = 0; + if (timeToLive > 0) { + ttl = timeUnit.toMillis(timeToLive); + } + + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RESTORE, getName(), ttl, state); + } + + @Override + public void restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit) { + get(restoreAndReplaceAsync(state, timeToLive, timeUnit)); + } + + @Override + public RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit) { + long ttl = 0; + if (timeToLive > 0) { + ttl = timeUnit.toMillis(timeToLive); + } + + return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.RESTORE, getName(), ttl, state, "REPLACE"); + } + + @Override + public void restoreAndReplace(byte[] state) { + get(restoreAndReplaceAsync(state)); + } + + @Override + public RFuture<Void> restoreAndReplaceAsync(byte[] state) { + return restoreAndReplaceAsync(state, 0, null); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index 81eeaaaa6..992bd175e 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -153,7 +153,7 @@ public class RedissonRateLimiter extends RedissonObject implements RRateLimiter public void run() { tryAcquireAsync(permits, promise, timeoutInMillis); } - }, delay, TimeUnit.SECONDS); + }, delay, TimeUnit.MILLISECONDS); return; } diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index 1975ab5b8..a1d963e64 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -16,7 +16,8 @@ package org.redisson; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -147,8 +148,8 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K } @Override - public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, Collection<String> keys, StreamId ... ids) { - return readAsync(count, -1, null, keys, ids); + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> keyToId) { + return readAsync(count, -1, null, id, keyToId); } @Override @@ -157,11 +158,7 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K } @Override - public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId... ids) { - if (keys.size() + 1 != ids.length) { - throw new IllegalArgumentException("keys amount should be lower by one than ids amount"); - } - + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) { List<Object> params = new ArrayList<Object>(); if (count > 0) { params.add("COUNT"); @@ -175,14 +172,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K params.add("STREAMS"); params.add(getName()); - if (keys != null) { - for (String key : keys) { - params.add(key); - } + for (String key : keyToId.keySet()) { + params.add(key); } - for (StreamId id : ids) { - params.add(id.toString()); + params.add(id); + for (StreamId nextId : keyToId.values()) { + params.add(nextId.toString()); } if (timeout > 0) { @@ -294,14 +290,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K } @Override - public Map<String, Map<StreamId, Map<K, V>>> read(int count, Collection<String> keys, StreamId... ids) { - return get(readAsync(count, keys, ids)); + public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> keyToId) { + return get(readAsync(count, id, keyToId)); } @Override - public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, Collection<String> keys, - StreamId... ids) { - return get(readAsync(count, timeout, unit, keys, ids)); + public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) { + return get(readAsync(count, timeout, unit, id, keyToId)); } @Override @@ -355,14 +350,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K } @Override - public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(Collection<String> keys, StreamId... ids) { - return readAsync(0, keys, ids); + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> keyToId) { + return readAsync(0, id, keyToId); } @Override - public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, - Collection<String> keys, StreamId... ids) { - return readAsync(0, timeout, unit, keys, ids); + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) { + return readAsync(0, timeout, unit, id, keyToId); } @Override @@ -386,14 +380,13 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K } @Override - public Map<String, Map<StreamId, Map<K, V>>> read(Collection<String> keys, StreamId... ids) { - return read(0, keys, ids); + public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> keyToId) { + return read(0, id, keyToId); } @Override - public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, Collection<String> keys, - StreamId... ids) { - return read(0, timeout, unit, keys, ids); + public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> keyToId) { + return read(0, timeout, unit, id, keyToId); } @Override @@ -405,5 +398,109 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K public Map<StreamId, Map<K, V>> rangeReversed(StreamId startId, StreamId endId) { return rangeReversed(0, startId, endId); } - + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2) { + return readAsync(id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String key2, StreamId id2, String key3, + StreamId id3) { + Map<String, StreamId> params = new HashMap<String, StreamId>(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(id, params); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2) { + return readAsync(count, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String key2, StreamId id2, + String key3, StreamId id3) { + Map<String, StreamId> params = new HashMap<String, StreamId>(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(count, id, params); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, + String key2, StreamId id2) { + return readAsync(timeout, unit, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, + String key2, StreamId id2, String key3, StreamId id3) { + Map<String, StreamId> params = new HashMap<String, StreamId>(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(timeout, unit, id, params); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, + String key2, StreamId id2) { + return readAsync(count, timeout, unit, id, Collections.singletonMap(key2, id2)); + } + + @Override + public RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, + String key2, StreamId id2, String key3, StreamId id3) { + Map<String, StreamId> params = new HashMap<String, StreamId>(2); + params.put(key2, id2); + params.put(key3, id3); + return readAsync(count, timeout, unit, id, params); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2) { + return get(readAsync(id, key2, id2)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String key2, StreamId id2, String key3, + StreamId id3) { + return get(readAsync(id, key2, id2, key3, id3)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2) { + return get(readAsync(count, id, key2, id2)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String key2, StreamId id2, String key3, + StreamId id3) { + return get(readAsync(count, id, key2, id2, key3, id3)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2, + StreamId id2) { + return get(readAsync(timeout, unit, id, key2, id2)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String key2, + StreamId id2, String key3, StreamId id3) { + return get(readAsync(timeout, unit, id, key2, id2, key3, id3)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2, + StreamId id2) { + return get(readAsync(count, timeout, unit, id, key2, id2)); + } + + @Override + public Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String key2, + StreamId id2, String key3, StreamId id3) { + return get(readAsync(count, timeout, unit, id, key2, id2, key3, id3)); + } + } diff --git a/redisson/src/main/java/org/redisson/api/RObject.java b/redisson/src/main/java/org/redisson/api/RObject.java index 760ffd2f0..942da09ef 100644 --- a/redisson/src/main/java/org/redisson/api/RObject.java +++ b/redisson/src/main/java/org/redisson/api/RObject.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + import org.redisson.client.codec.Codec; /** @@ -25,6 +27,45 @@ import org.redisson.client.codec.Codec; */ public interface RObject extends RObjectAsync { + /** + * Restores object using its state returned by {@link #dump()} method. + * + * @param state - state of object + */ + void restore(byte[] state); + + /** + * Restores object using its state returned by {@link #dump()} method and set time to live for it. + * + * @param state - state of object + * @param timeToLive - time to live of the object + * @param timeUnit - time unit + */ + void restore(byte[] state, long timeToLive, TimeUnit timeUnit); + + /** + * Restores and replaces object if it already exists. + * + * @param state - state of the object + */ + void restoreAndReplace(byte[] state); + + /** + * Restores and replaces object if it already exists and set time to live for it. + * + * @param state - state of the object + * @param timeToLive - time to live of the object + * @param timeUnit - time unit + */ + void restoreAndReplace(byte[] state, long timeToLive, TimeUnit timeUnit); + + /** + * Returns dump of object + * + * @return dump + */ + byte[] dump(); + /** * Update the last access time of an object. * diff --git a/redisson/src/main/java/org/redisson/api/RObjectAsync.java b/redisson/src/main/java/org/redisson/api/RObjectAsync.java index 11e44eedd..63962f55f 100644 --- a/redisson/src/main/java/org/redisson/api/RObjectAsync.java +++ b/redisson/src/main/java/org/redisson/api/RObjectAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.concurrent.TimeUnit; + /** * Base interface for all Redisson objects * @@ -23,6 +25,49 @@ package org.redisson.api; */ public interface RObjectAsync { + /** + * Restores object using its state returned by {@link #dumpAsync()} method. + * + * @param state - state of object + * @return void + */ + RFuture<Void> restoreAsync(byte[] state); + + /** + * Restores object using its state returned by {@link #dumpAsync()} method and set time to live for it. + * + * @param state - state of object + * @param timeToLive - time to live of the object + * @param timeUnit - time unit + * @return void + */ + RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit); + + /** + * Restores and replaces object if it already exists. + * + * @param state - state of the object + * @return void + */ + RFuture<Void> restoreAndReplaceAsync(byte[] state); + + /** + * Restores and replaces object if it already exists and set time to live for it. + * + * @param state - state of the object + * @param timeToLive - time to live of the object + * @param timeUnit - time unit + * @return void + */ + RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit); + + /** + * Returns dump of object + * + * @return dump + */ + RFuture<byte[]> dumpAsync(); + /** * Update the last access time of an object in async mode. * diff --git a/redisson/src/main/java/org/redisson/api/RStream.java b/redisson/src/main/java/org/redisson/api/RStream.java index 471d8a546..db6452d6d 100644 --- a/redisson/src/main/java/org/redisson/api/RStream.java +++ b/redisson/src/main/java/org/redisson/api/RStream.java @@ -163,52 +163,152 @@ public interface RStream<K, V> extends RStreamAsync<K, V>, RExpirable { Map<StreamId, Map<K, V>> read(int count, long timeout, TimeUnit unit, StreamId ... ids); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. + * Read stream data by specified stream name including this stream. * - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - Map<String, Map<StreamId, Map<K, V>>> read(Collection<String> keys, StreamId ... ids); + Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, String name2, StreamId id2, String name3, StreamId id3); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. + * Read stream data by specified stream id mapped by name including this stream. + * + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(StreamId id, Map<String, StreamId> nameToId); + + /** + * Read stream data by specified stream name including this stream. * * @param count - stream data size limit - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - Map<String, Map<StreamId, Map<K, V>>> read(int count, Collection<String> keys, StreamId ... ids); + Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. - * Wait for first stream data availability for specified <code>timeout</code> interval. + * Read stream data by specified stream id mapped by name including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(int count, StreamId id, Map<String, StreamId> nameToId); + + /** + * Read stream data by specified stream name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. * * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids); + Map<String, Map<StreamId, Map<K, V>>> read(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. - * Wait for first stream data availability for specified <code>timeout</code> interval. + * Read stream data by specified stream name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. * * @param count - stream data size limit * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids); + Map<String, Map<StreamId, Map<K, V>>> read(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). diff --git a/redisson/src/main/java/org/redisson/api/RStreamAsync.java b/redisson/src/main/java/org/redisson/api/RStreamAsync.java index bd8a53a13..343a3dbb5 100644 --- a/redisson/src/main/java/org/redisson/api/RStreamAsync.java +++ b/redisson/src/main/java/org/redisson/api/RStreamAsync.java @@ -15,7 +15,6 @@ */ package org.redisson.api; -import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -167,52 +166,152 @@ public interface RStreamAsync<K, V> extends RExpirableAsync { RFuture<Map<StreamId, Map<K, V>>> readAsync(int count, long timeout, TimeUnit unit, StreamId ... ids); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. + * Read stream data by specified stream name including this stream. * - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream * @return stream data mapped by key and Stream ID */ - RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(Collection<String> keys, StreamId ... ids); + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, String name2, StreamId id2, String name3, StreamId id3); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. + * Read stream data by specified stream id mapped by name including this stream. + * + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(StreamId id, Map<String, StreamId> nameToId); + + /** + * Read stream data by specified stream name including this stream. * * @param count - stream data size limit - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * + * @param count - stream data size limit + * @param id - id of this stream + * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, Collection<String> keys, StreamId ... ids); + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, StreamId id, Map<String, StreamId> nameToId); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. - * Wait for first stream data availability for specified <code>timeout</code> interval. + * Read stream data by specified stream name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. * * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream * @return stream data mapped by key and Stream ID */ - RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids); + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); /** - * Read stream data by specified collection of keys including this stream and Stream ID per key. - * First Stream ID is related to this stream. - * Wait for first stream data availability for specified <code>timeout</code> interval. + * Read stream data by specified stream id mapped by name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param nameToId - stream id mapped by name + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId); + + /** + * Read stream data by specified stream name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. * * @param count - stream data size limit * @param timeout - time interval to wait for stream data availability * @param unit - time interval unit - * @param keys - collection of keys - * @param ids - collection of Stream IDs + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2); + + /** + * Read stream data by specified stream names including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param name2 - name of second stream + * @param id2 - id of second stream + * @param name3 - name of third stream + * @param id3 - id of third stream + * @return stream data mapped by key and Stream ID + */ + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, String name2, StreamId id2, String name3, StreamId id3); + + /** + * Read stream data by specified stream id mapped by name including this stream. + * Wait for the first stream data availability for specified <code>timeout</code> interval. + * + * @param count - stream data size limit + * @param timeout - time interval to wait for stream data availability + * @param unit - time interval unit + * @param id - id of this stream + * @param nameToId - stream id mapped by name * @return stream data mapped by key and Stream ID */ - RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, Collection<String> keys, StreamId ... ids); + RFuture<Map<String, Map<StreamId, Map<K, V>>>> readAsync(int count, long timeout, TimeUnit unit, StreamId id, Map<String, StreamId> nameToId); /** * Read stream data in range by specified start Stream ID (included) and end Stream ID (included). diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index fb7c56132..08caf1c3a 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -326,12 +326,12 @@ public class RedisClient { for (Channel channel : channels) { RedisConnection connection = RedisConnection.getFrom(channel); if (connection != null) { - connection.setClosed(true); + connection.closeAsync(); } } - ChannelGroupFuture channelsFuture = channels.close(); - + final RPromise<Void> result = new RedissonPromise<Void>(); + ChannelGroupFuture channelsFuture = channels.newCloseFuture(); channelsFuture.addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 4e622c5c7..d6900fb31 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -27,7 +27,7 @@ import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -195,7 +195,7 @@ public class RedisConnection implements RedisCommands { return async(-1, encoder, command, params); } - public <T, R> RFuture<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object ... params) { + public <T, R> RFuture<R> async(long timeout, Codec encoder, final RedisCommand<T> command, final Object ... params) { final RPromise<R> promise = new RedissonPromise<R>(); if (timeout == -1) { timeout = redisClient.getCommandTimeout(); @@ -209,7 +209,9 @@ public class RedisConnection implements RedisCommands { final ScheduledFuture<?> scheduledFuture = redisClient.getEventLoopGroup().schedule(new Runnable() { @Override public void run() { - RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); + RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for command: " + + command + ", command params: " + LogHelper.toString(params) + + ", Redis client: " + redisClient); promise.tryFailure(ex); } }, timeout, TimeUnit.MILLISECONDS); @@ -229,7 +231,7 @@ public class RedisConnection implements RedisCommands { return new CommandData<T, R>(promise, encoder, command, params); } - public void setClosed(boolean closed) { + private void setClosed(boolean closed) { this.closed = closed; } @@ -246,10 +248,19 @@ public class RedisConnection implements RedisCommands { fastReconnect = null; } + private void close() { + CommandData command = getCurrentCommand(); + if (command != null && command.isBlockingCommand()) { + channel.close(); + } else { + async(RedisCommands.QUIT); + } + } + public RFuture<Void> forceFastReconnectAsync() { RedissonPromise<Void> promise = new RedissonPromise<Void>(); fastReconnect = promise; - channel.close(); + close(); return promise; } @@ -265,7 +276,8 @@ public class RedisConnection implements RedisCommands { public ChannelFuture closeAsync() { setClosed(true); - return channel.close(); + close(); + return channel.closeFuture(); } @Override diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 1e47d3515..4b893c3df 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -105,24 +105,29 @@ public class CommandDecoder extends ReplayingDecoder<State> { state().setDecoderState(null); - if (data == null) { - try { - while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx.channel()); - } - } catch (Exception e) { - log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); + decodeCommand(ctx, in, data); + } + + protected void sendNext(ChannelHandlerContext ctx, QueueCommand data) { + if (data != null) { + if (data.isExecuted()) { sendNext(ctx); - throw e; } - } else if (data instanceof CommandData) { + } else { + sendNext(ctx); + } + } + + protected void decodeCommand(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { + if (data instanceof CommandData) { CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; try { if (state().getLevels().size() > 0) { decodeFromCheckpoint(ctx, in, data, cmd); } else { - decode(in, cmd, null, ctx.channel()); + decode(in, cmd, null, ctx.channel(), false); } + sendNext(ctx, data); } catch (Exception e) { log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); cmd.tryFailure(e); @@ -138,10 +143,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { sendNext(ctx); throw e; } - return; } - - sendNext(ctx); } protected void sendNext(ChannelHandlerContext ctx) { @@ -149,7 +151,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { state(null); } - private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, + protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, CommandData<Object, Object> cmd) throws IOException { if (state().getLevels().size() == 2) { StateLevel secondLevel = state().getLevels().get(1); @@ -163,7 +165,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { StateLevel firstLevel = state().getLevels().get(0); StateLevel secondLevel = state().getLevels().get(1); - decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts()); + decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts(), false); MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts()); if (decoder != null) { @@ -177,23 +179,23 @@ public class CommandDecoder extends ReplayingDecoder<State> { StateLevel firstLevel = state().getLevels().get(0); if (firstLevel.getParts().isEmpty() && firstLevel.getLastList() == null) { state().resetLevel(); - decode(in, cmd, null, ctx.channel()); + decode(in, cmd, null, ctx.channel(), false); } else { if (firstLevel.getLastList() != null) { if (firstLevel.getLastList().isEmpty()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel()); + decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); } else { - decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList()); + decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList(), false); } firstLevel.setLastList(null); firstLevel.setLastListSize(0); while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel()); + decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); } - decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts()); + decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false); } else { - decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); + decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false); } } } @@ -226,7 +228,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { } try { - decode(in, commandData, null, ctx.channel()); + decode(in, commandData, null, ctx.channel(), !commandBatch.isAtomic()); } finally { if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { commandsData.remove(); @@ -284,7 +286,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { } } - protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel) throws IOException { + protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor) throws IOException { int code = in.readByte(); if (code == '+') { ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r')); @@ -292,7 +294,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { String result = rb.toString(CharsetUtil.UTF_8); in.skipBytes(2); - handleResult(data, parts, result, false, channel); + handleResult(data, parts, result, skipConvertor, channel); } finally { rb.release(); } @@ -368,7 +370,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { } } - decodeList(in, data, parts, channel, size, respParts); + decodeList(in, data, parts, channel, size, respParts, skipConvertor); if (lastLevel != null && lastLevel.getLastList() != null) { lastLevel.setLastList(null); @@ -382,7 +384,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, - Channel channel, long size, List<Object> respParts) + Channel channel, long size, List<Object> respParts, boolean skipConvertor) throws IOException { if (parts == null && commandsData.get() != null) { List<CommandData<?, ?>> commands = commandsData.get(); @@ -392,7 +394,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { suffix = 1; } CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix); - decode(in, commandData, respParts, channel); + decode(in, commandData, respParts, channel, skipConvertor); if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) { data.tryFailure(commandData.cause()); } @@ -403,7 +405,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { } } else { for (int i = respParts.size(); i < size; i++) { - decode(in, data, respParts, channel); + decode(in, data, respParts, channel, skipConvertor); if (state().isMakeCheckpoint()) { checkpoint(); } @@ -426,8 +428,8 @@ public class CommandDecoder extends ReplayingDecoder<State> { } } - private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) { - if (data != null && !multiResult) { + private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor, Channel channel) { + if (data != null && !skipConvertor) { result = data.getCommand().getConvertor().convert(result); } if (parts != null) { @@ -448,7 +450,7 @@ public class CommandDecoder extends ReplayingDecoder<State> { if (parts.isEmpty()) { return null; } - } + } return data.getCommand().getReplayMultiDecoder(); } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index c69342fa6..58be99607 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -27,14 +27,19 @@ import java.util.concurrent.ExecutorService; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.decoder.ListObjectDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.pubsub.Message; import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.CharsetUtil; import io.netty.util.internal.PlatformDependent; /** @@ -63,11 +68,42 @@ public class CommandPubSubDecoder extends CommandDecoder { commands.put(new PubSubKey(channel, operation), data); } + @Override + protected void decodeCommand(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { + if (data == null) { + try { + while (in.writerIndex() > in.readerIndex()) { + decode(in, null, null, ctx.channel(), false); + } + sendNext(ctx); + } catch (Exception e) { + log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); + sendNext(ctx); + throw e; + } + } else if (data instanceof CommandData) { + CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; + try { + if (state().getLevels().size() > 0) { + decodeFromCheckpoint(ctx, in, data, cmd); + } else { + while (in.writerIndex() > in.readerIndex()) { + decode(in, cmd, null, ctx.channel(), false); + } + } + sendNext(ctx, data); + } catch (Exception e) { + log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); + cmd.tryFailure(e); + sendNext(ctx); + throw e; + } + } + } + @Override protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel, final Object result) throws IOException { - super.decodeResult(data, parts, channel, result); - if (result instanceof Message) { checkpoint(); @@ -117,6 +153,10 @@ public class CommandPubSubDecoder extends CommandDecoder { } }); } + } else { + if (data != null && data.getCommand().getName().equals("PING")) { + super.decodeResult(data, parts, channel, result); + } } } @@ -157,28 +197,26 @@ public class CommandPubSubDecoder extends CommandDecoder { @Override protected MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) { - if (data == null) { - if (parts.isEmpty()) { - return null; - } - String command = parts.get(0).toString(); - if (MESSAGES.contains(command)) { - String channelName = parts.get(1).toString(); - PubSubKey key = new PubSubKey(channelName, command); - CommandData<Object, Object> commandData = commands.get(key); - if (commandData == null) { - return null; - } - return commandData.getCommand().getReplayMultiDecoder(); - } else if (command.equals("message")) { - String channelName = (String) parts.get(1); - return entries.get(channelName).getDecoder(); - } else if (command.equals("pmessage")) { - String patternName = (String) parts.get(1); - return entries.get(patternName).getDecoder(); - } else if (command.equals("pong")) { + if (parts.isEmpty()) { + return null; + } + String command = parts.get(0).toString(); + if (MESSAGES.contains(command)) { + String channelName = parts.get(1).toString(); + PubSubKey key = new PubSubKey(channelName, command); + CommandData<Object, Object> commandData = commands.get(key); + if (commandData == null) { return null; } + return commandData.getCommand().getReplayMultiDecoder(); + } else if (command.equals("message")) { + String channelName = (String) parts.get(1); + return entries.get(channelName).getDecoder(); + } else if (command.equals("pmessage")) { + String patternName = (String) parts.get(1); + return entries.get(patternName).getDecoder(); + } else if (command.equals("pong")) { + return new ListObjectDecoder<Object>(0); } return data.getCommand().getReplayMultiDecoder(); @@ -186,7 +224,10 @@ public class CommandPubSubDecoder extends CommandDecoder { @Override protected Decoder<Object> selectDecoder(CommandData<Object, Object> data, List<Object> parts) { - if (data == null && parts != null) { + if (parts != null) { + if (data != null && parts.size() == 1 && "pong".equals(parts.get(0))) { + return data.getCodec().getValueDecoder(); + } if (parts.size() == 2 && "message".equals(parts.get(0))) { String channelName = (String) parts.get(1); return entries.get(channelName).getDecoder().getDecoder(parts.size(), state()); @@ -196,6 +237,7 @@ public class CommandPubSubDecoder extends CommandDecoder { return entries.get(patternName).getDecoder().getDecoder(parts.size(), state()); } } + if (data != null && data.getCommand().getName().equals(RedisCommands.PING.getName())) { return data.getCodec().getValueDecoder(); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java index b04ee7470..b17d94e6f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java @@ -102,4 +102,9 @@ public class CommandData<T, R> implements QueueCommand { || RedisCommands.XREAD_BLOCKING == command; } + @Override + public boolean isExecuted() { + return promise.isDone(); + } + } diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index 6f68cc14e..381edd7e4 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -96,4 +96,9 @@ public class CommandsData implements QueueCommand { return "CommandsData [commands=" + commands + "]"; } + @Override + public boolean isExecuted() { + return promise.isDone(); + } + } diff --git a/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java b/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java index bad470957..2b31eada3 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java +++ b/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java @@ -28,4 +28,6 @@ public interface QueueCommand { boolean tryFailure(Throwable cause); + boolean isExecuted(); + } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index bd88eb949..6a3c0bf1d 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -289,6 +289,9 @@ public interface RedisCommands { RedisStrictCommand<Long> UNLINK = new RedisStrictCommand<Long>("UNLINK"); RedisStrictCommand<Boolean> UNLINK_BOOL = new RedisStrictCommand<Boolean>("UNLINK", new BooleanNullSafeReplayConvertor()); + RedisCommand<Object> DUMP = new RedisCommand<Object>("DUMP"); + RedisStrictCommand<Void> RESTORE = new RedisStrictCommand<Void>("RESTORE", new VoidReplayConvertor()); + RedisCommand<Object> GET = new RedisCommand<Object>("GET"); RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor()); RedisStrictCommand<Integer> GET_INTEGER = new RedisStrictCommand<Integer>("GET", new IntegerReplayConvertor()); @@ -340,6 +343,7 @@ public interface RedisCommands { RedisStrictCommand<Void> RENAME = new RedisStrictCommand<Void>("RENAME", new VoidReplayConvertor()); RedisStrictCommand<Boolean> MOVE = new RedisStrictCommand<Boolean>("MOVE", new BooleanReplayConvertor()); RedisStrictCommand<Void> MIGRATE = new RedisStrictCommand<Void>("MIGRATE", new VoidReplayConvertor()); + RedisStrictCommand<Void> QUIT = new RedisStrictCommand<Void>("QUIT", new VoidReplayConvertor()); RedisStrictCommand<Long> PUBLISH = new RedisStrictCommand<Long>("PUBLISH"); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index fb1e0693b..5c9d88d6b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -627,12 +627,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - timer.stop(); - - shutdownLatch.close(); - shutdownPromise.trySuccess(true); - shutdownLatch.awaitUninterruptibly(); - RPromise<Void> result = new RedissonPromise<Void>(); CountableListener<Void> listener = new CountableListener<Void>(result, null, getEntrySet().size()); for (MasterSlaveEntry entry : getEntrySet()) { @@ -641,6 +635,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { result.awaitUninterruptibly(timeout, unit); resolverGroup.close(); + + timer.stop(); + shutdownLatch.close(); + shutdownPromise.trySuccess(true); + shutdownLatch.awaitUninterruptibly(); if (cfg.getEventLoopGroup() == null) { group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly(); diff --git a/redisson/src/test/java/org/redisson/RedissonBatchTest.java b/redisson/src/test/java/org/redisson/RedissonBatchTest.java index b03adb391..089cdceed 100644 --- a/redisson/src/test/java/org/redisson/RedissonBatchTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBatchTest.java @@ -29,6 +29,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RListAsync; import org.redisson.api.RMapAsync; import org.redisson.api.RMapCacheAsync; +import org.redisson.api.RScoredSortedSet; import org.redisson.api.RScript; import org.redisson.api.RScript.Mode; import org.redisson.api.RedissonClient; @@ -78,6 +79,17 @@ public class RedissonBatchTest extends BaseTest { List<?> t = batch.execute(); System.out.println(t); } + + @Test + public void testConvertor() { + RBatch batch = redisson.createBatch(batchOptions); + + batch.getScoredSortedSet("myZKey").addScoreAsync("abc", 1d); + batch.execute(); + + RScoredSortedSet<String> set = redisson.getScoredSortedSet("myZKey"); + assertThat(set.getScore("abc")).isEqualTo(1d); + } @Test public void testConnectionLeakAfterError() throws InterruptedException { diff --git a/redisson/src/test/java/org/redisson/RedissonBucketTest.java b/redisson/src/test/java/org/redisson/RedissonBucketTest.java index f1e2a2bef..d580fcc92 100755 --- a/redisson/src/test/java/org/redisson/RedissonBucketTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBucketTest.java @@ -17,6 +17,41 @@ import org.redisson.config.Config; public class RedissonBucketTest extends BaseTest { + @Test + public void testDumpAndRestore() { + RBucket<Integer> al = redisson.getBucket("test"); + al.set(1234); + + byte[] state = al.dump(); + al.delete(); + + al.restore(state); + assertThat(al.get()).isEqualTo(1234); + + RBucket<Integer> bucket = redisson.getBucket("test2"); + bucket.set(300); + bucket.restoreAndReplace(state); + assertThat(bucket.get()).isEqualTo(1234); + } + + @Test + public void testDumpAndRestoreTTL() { + RBucket<Integer> al = redisson.getBucket("test"); + al.set(1234); + + byte[] state = al.dump(); + al.delete(); + + al.restore(state, 10, TimeUnit.SECONDS); + assertThat(al.get()).isEqualTo(1234); + assertThat(al.remainTimeToLive()).isBetween(9500L, 10000L); + + RBucket<Integer> bucket = redisson.getBucket("test2"); + bucket.set(300); + bucket.restoreAndReplace(state, 10, TimeUnit.SECONDS); + assertThat(bucket.get()).isEqualTo(1234); + } + @Test public void testGetAndDelete() { RBucket<Integer> al = redisson.getBucket("test"); diff --git a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java index 8c391c88f..441c4c0ed 100644 --- a/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRateLimiterTest.java @@ -19,11 +19,11 @@ public class RedissonRateLimiterTest extends BaseTest { @Test public void testAcquire() { - RRateLimiter rr = redisson.getRateLimiter("test"); - assertThat(rr.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS)).isTrue(); - rr.acquire(1); - rr.acquire(5); - rr.acquire(4); + RRateLimiter rr = redisson.getRateLimiter("acquire"); + assertThat(rr.trySetRate(RateType.OVERALL, 1, 5, RateIntervalUnit.SECONDS)).isTrue(); + for (int i = 0; i < 10; i++) { + rr.acquire(1); + } assertThat(rr.tryAcquire()).isFalse(); } diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index 41f2ecaac..567842fa1 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -2,7 +2,6 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -85,7 +84,7 @@ public class RedissonStreamTest extends BaseTest { t.start(); long start = System.currentTimeMillis(); - Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, Collections.singleton("test1"), new StreamId(0), StreamId.NEWEST); + Map<String, Map<StreamId, Map<String, String>>> s = stream.read(2, 5, TimeUnit.SECONDS, new StreamId(0), "test1", StreamId.NEWEST); assertThat(System.currentTimeMillis() - start).isBetween(1900L, 2200L); assertThat(s).hasSize(1); assertThat(s.get("test").get(new StreamId(1))).isEqualTo(entries1); @@ -141,7 +140,7 @@ public class RedissonStreamTest extends BaseTest { @Test public void testReadMultiKeysEmpty() { RStream<String, String> stream = redisson.getStream("test2"); - Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0)); + Map<String, Map<StreamId, Map<String, String>>> s = stream.read(10, new StreamId(0), "test1", new StreamId(0)); assertThat(s).isEmpty(); } @@ -160,7 +159,7 @@ public class RedissonStreamTest extends BaseTest { entries2.put("6", "66"); stream2.addAll(entries2); - Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, Collections.singleton("test1"), new StreamId(0), new StreamId(0)); + Map<String, Map<StreamId, Map<String, String>>> s = stream2.read(10, new StreamId(0), "test1", new StreamId(0)); assertThat(s).hasSize(2); assertThat(s.get("test1").values().iterator().next()).isEqualTo(entries1); assertThat(s.get("test2").values().iterator().next()).isEqualTo(entries2); diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index a5f3520eb..4e6e71433 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -239,14 +239,19 @@ public class RedissonTest { }); assertThat(id).isNotZero(); - + r.getBucket("1").get(); Assert.assertEquals(0, p.stop()); + await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1); + try { r.getBucket("1").get(); } catch (Exception e) { } + + assertThat(connectCounter.get()).isEqualTo(0); + assertThat(disconnectCounter.get()).isEqualTo(1); RedisProcess pp = new RedisRunner() .nosave() @@ -256,12 +261,11 @@ public class RedissonTest { r.getBucket("1").get(); - r.shutdown(); + assertThat(connectCounter.get()).isEqualTo(1); + assertThat(disconnectCounter.get()).isEqualTo(1); + r.shutdown(); Assert.assertEquals(0, pp.stop()); - - await().atMost(2, TimeUnit.SECONDS).until(() -> connectCounter.get() == 1); - await().atMost(2, TimeUnit.SECONDS).until(() -> disconnectCounter.get() == 1); } @Test