Merge branch '3.0.0' into feature/map_cache_max_size

pull/985/head
Zun 8 years ago
commit 0feb684f39

@ -2,7 +2,25 @@ Redisson Releases History
================================ ================================
### Please Note: trunk is current development branch. ### Please Note: trunk is current development branch.
__[Redisson PRO](https://redisson.pro)__ now costs only __$975__ per year and supports unlimited Redisson instances Try __[Redisson PRO](https://redisson.pro)__ version.
### 28-Jul-2017 - versions 2.10.0 and 3.5.0 released
Feature - __Local Cache support for Hibernate Cache__ Please read [documentation](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks/#1431-hibernate-cache-local-cache) for more details
Feature - __Local Cache support for Spring Cache__ Please read [documentation](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks/#1421-spring-cache-local-cache) for more details
Feature - __`RedissonLocalCachedMapCache` object added__ Please read [documentation](https://github.com/redisson/redisson/wiki/7.-distributed-collections/#713-map-local-cache-for-expiring-entries) for more details
Feature - __`BlockingFairDeque` object added__ Please read [documentation](https://github.com/redisson/redisson/wiki/7.-distributed-collections#714-blocking-fair-deque) for more details
Feature - __`RLockReactive` object added__
Feature - __`RReadWriteLockReactive` object added__
Feature - __`RSemaphoreReactive` object added__
Feature - `unlink`, `flushdbParallel`, `flushallParallel` methods added
Fixed - ContextClassLoader should be used by Redisson Codec for Tomcat session's object serialization
Fixed - Spring Cache `NullValue` does not implement Serializable
Fixed - `RLocalCachedMap` doesn't work with non-json and non-binary codecs
Fixed - Tomcat RedissonSessionManager doesn't remove session on invalidation/expiration
Fixed - `RedissonBatch` shouldn't require `reactor.fn.Supplier` dependency
Fixed - Spring Session 1.3.x compatibility (thanks to Vcgoyo)
Fixed - priority queues should acquire lock before polling the element
### 12-Jul-2017 - versions 2.9.4 and 3.4.4 released ### 12-Jul-2017 - versions 2.9.4 and 3.4.4 released
@ -16,6 +34,7 @@ Feature - ability to submit few tasks atomically (in batch) through `RExecutorSe
Feature - [Config.keepPubSubOrder](https://github.com/redisson/redisson/wiki/2.-Configuration#keeppubsuborder) setting added Feature - [Config.keepPubSubOrder](https://github.com/redisson/redisson/wiki/2.-Configuration#keeppubsuborder) setting added
Improvement - make `RMapReactive` and `RMapCacheReactive` interfaces match with `RMap` and `RMapCache` Improvement - make `RMapReactive` and `RMapCacheReactive` interfaces match with `RMap` and `RMapCache`
Improvement - `RLexSortedSet` should extend `RSortedSet` Improvement - `RLexSortedSet` should extend `RSortedSet`
Fixed - connection listener is not invoked in some cases
Fixed - `RMapCache` `remove`, `put`, `putIfAbsent` and `replace` methods aren't respect entry expiration Fixed - `RMapCache` `remove`, `put`, `putIfAbsent` and `replace` methods aren't respect entry expiration
Fixed - `SCAN` command should be used in `RKeys.deleteByPattern` method Fixed - `SCAN` command should be used in `RKeys.deleteByPattern` method
Fixed - `RBinaryStream` doesn't work in Redis cluster environment Fixed - `RBinaryStream` doesn't work in Redis cluster environment

@ -6,8 +6,8 @@ Based on high-performance async and lock-free Java Redis client and [Netty](http
| Stable Release Version | JDK Version compatibility | Release Date | | Stable Release Version | JDK Version compatibility | Release Date |
| ------------- | ------------- | ------------| | ------------- | ------------- | ------------|
| 3.4.4 | 1.8+ | 12.07.2017 | | 3.5.0 | 1.8+ | 28.07.2017 |
| 2.9.4 | 1.6, 1.7, 1.8 and Android | 12.07.2017 | | 2.10.0 | 1.6, 1.7, 1.8 and Android | 28.07.2017 |
__NOTE__: Both version lines have same features except `CompletionStage` interface added in 3.x.x __NOTE__: Both version lines have same features except `CompletionStage` interface added in 3.x.x
@ -82,23 +82,23 @@ Quick start
<dependency> <dependency>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson</artifactId> <artifactId>redisson</artifactId>
<version>3.4.4</version> <version>3.5.0</version>
</dependency> </dependency>
<!-- JDK 1.6+ compatible --> <!-- JDK 1.6+ compatible -->
<dependency> <dependency>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson</artifactId> <artifactId>redisson</artifactId>
<version>2.9.4</version> <version>2.10.0</version>
</dependency> </dependency>
#### Gradle #### Gradle
// JDK 1.8+ compatible // JDK 1.8+ compatible
compile 'org.redisson:redisson:3.4.4' compile 'org.redisson:redisson:3.5.0'
// JDK 1.6+ compatible // JDK 1.6+ compatible
compile 'org.redisson:redisson:2.9.4' compile 'org.redisson:redisson:2.10.0'
#### Java #### Java
@ -123,11 +123,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads Downloads
=============================== ===============================
[Redisson 3.4.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.4.4&e=jar), [Redisson 3.5.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.5.0&e=jar),
[Redisson node 3.4.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.4.4&e=jar) [Redisson node 3.5.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.5.0&e=jar)
[Redisson 2.9.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.9.4&e=jar), [Redisson 2.10.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.10.0&e=jar),
[Redisson node 2.9.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.9.4&e=jar) [Redisson node 2.10.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.10.0&e=jar)
### Supported by ### Supported by

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId> <artifactId>redisson-parent</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>Redisson</name> <name>Redisson</name>

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId> <artifactId>redisson-parent</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

@ -26,22 +26,22 @@ Usage
**2** Copy two jars into `TOMCAT_BASE/lib` directory: **2** Copy two jars into `TOMCAT_BASE/lib` directory:
1. __For JDK 1.8+__ 1. __For JDK 1.8+__
[redisson-all-3.4.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.4.4&e=jar) [redisson-all-3.5.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.5.0&e=jar)
for Tomcat 6.x for Tomcat 6.x
[redisson-tomcat-6-3.4.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.4.4&e=jar) [redisson-tomcat-6-3.5.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.5.0&e=jar)
for Tomcat 7.x for Tomcat 7.x
[redisson-tomcat-7-3.4.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.4.4&e=jar) [redisson-tomcat-7-3.5.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.5.0&e=jar)
for Tomcat 8.x for Tomcat 8.x
[redisson-tomcat-8-3.4.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.4.4&e=jar) [redisson-tomcat-8-3.5.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.5.0&e=jar)
2. __For JDK 1.6+__ 2. __For JDK 1.6+__
[redisson-all-2.9.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.9.4&e=jar) [redisson-all-2.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.10.0&e=jar)
for Tomcat 6.x for Tomcat 6.x
[redisson-tomcat-6-2.9.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.9.4&e=jar) [redisson-tomcat-6-2.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.10.0&e=jar)
for Tomcat 7.x for Tomcat 7.x
[redisson-tomcat-7-2.9.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.9.4&e=jar) [redisson-tomcat-7-2.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.10.0&e=jar)
for Tomcat 8.x for Tomcat 8.x
[redisson-tomcat-8-2.9.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.9.4&e=jar) [redisson-tomcat-8-2.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.10.0&e=jar)

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId> <artifactId>redisson-parent</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId> <artifactId>redisson-tomcat</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

@ -19,11 +19,11 @@ import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession; import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
/** /**
* Redisson Session object for Apache Tomcat * Redisson Session object for Apache Tomcat
@ -36,10 +36,12 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager; private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs; private final Map<String, Object> attrs;
private RMap<String, Object> map; private RMap<String, Object> map;
private final RedissonSessionManager.ReadMode readMode;
public RedissonSession(RedissonSessionManager manager) { public RedissonSession(RedissonSessionManager manager, RedissonSessionManager.ReadMode readMode) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode;
try { try {
Field attr = StandardSession.class.getDeclaredField("attributes"); Field attr = StandardSession.class.getDeclaredField("attributes");
@ -51,6 +53,15 @@ public class RedissonSession extends StandardSession {
private static final long serialVersionUID = -2518607181636076487L; private static final long serialVersionUID = -2518607181636076487L;
@Override
public Object getAttribute(String name) {
if (readMode == ReadMode.REDIS) {
return map.get(name);
}
return super.getAttribute(name);
}
@Override @Override
public void setId(String id, boolean notify) { public void setId(String id, boolean notify) {
super.setId(id, notify); super.setId(id, notify);
@ -163,24 +174,30 @@ public class RedissonSession extends StandardSession {
} }
} }
public void load() { public void load(Map<String, Object> attrs) {
Set<Entry<String, Object>> entrySet = map.readAllEntrySet(); Long creationTime = (Long) attrs.remove("session:creationTime");
for (Entry<String, Object> entry : entrySet) { if (creationTime != null) {
if ("session:creationTime".equals(entry.getKey())) { this.creationTime = creationTime;
creationTime = (Long) entry.getValue(); }
} else if ("session:lastAccessedTime".equals(entry.getKey())) { Long lastAccessedTime = (Long) attrs.remove("session:lastAccessedTime");
lastAccessedTime = (Long) entry.getValue(); if (lastAccessedTime != null) {
} else if ("session:thisAccessedTime".equals(entry.getKey())) { this.lastAccessedTime = lastAccessedTime;
thisAccessedTime = (Long) entry.getValue(); }
} else if ("session:maxInactiveInterval".equals(entry.getKey())) { Long thisAccessedTime = (Long) attrs.remove("session:thisAccessedTime");
maxInactiveInterval = (Integer) entry.getValue(); if (thisAccessedTime != null) {
} else if ("session:isValid".equals(entry.getKey())) { this.thisAccessedTime = thisAccessedTime;
isValid = (Boolean) entry.getValue(); }
} else if ("session:isNew".equals(entry.getKey())) { Boolean isValid = (Boolean) attrs.remove("session:isValid");
isNew = (Boolean) entry.getValue(); if (isValid != null) {
} else { this.isValid = isValid;
setAttribute(entry.getKey(), entry.getValue(), false); }
} Boolean isNew = (Boolean) attrs.remove("session:isNew");
if (isNew != null) {
this.isNew = isNew;
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
} }
} }

@ -17,6 +17,7 @@ package org.redisson.tomcat;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.juli.logging.Log; import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory; import org.apache.juli.logging.LogFactory;
@ -40,12 +41,23 @@ import org.redisson.config.Config;
*/ */
public class RedissonSessionManager extends ManagerBase implements Lifecycle { public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public enum ReadMode {REDIS, MEMORY}
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
protected LifecycleSupport lifecycle = new LifecycleSupport(this); protected LifecycleSupport lifecycle = new LifecycleSupport(this);
private RedissonClient redisson; private RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.MEMORY;
public String getReadMode() {
return readMode.toString();
}
public void setReadMode(String readMode) {
this.readMode = ReadMode.valueOf(readMode);
}
public void setConfigPath(String configPath) { public void setConfigPath(String configPath) {
this.configPath = configPath; this.configPath = configPath;
@ -114,9 +126,18 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public Session findSession(String id) throws IOException { public Session findSession(String id) throws IOException {
Session result = super.findSession(id); Session result = super.findSession(id);
if (result == null && id != null) { if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession(); RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id); session.setId(id);
session.load(); session.load(attrs);
session.access();
session.endAccess();
return session; return session;
} }
@ -125,7 +146,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this); return new RedissonSession(this, readMode);
} }
@Override @Override

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId> <artifactId>redisson-tomcat</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

@ -19,11 +19,11 @@ import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession; import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
/** /**
* Redisson Session object for Apache Tomcat * Redisson Session object for Apache Tomcat
@ -36,10 +36,12 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager; private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs; private final Map<String, Object> attrs;
private RMap<String, Object> map; private RMap<String, Object> map;
private final RedissonSessionManager.ReadMode readMode;
public RedissonSession(RedissonSessionManager manager) { public RedissonSession(RedissonSessionManager manager, RedissonSessionManager.ReadMode readMode) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode;
try { try {
Field attr = StandardSession.class.getDeclaredField("attributes"); Field attr = StandardSession.class.getDeclaredField("attributes");
attrs = (Map<String, Object>) attr.get(this); attrs = (Map<String, Object>) attr.get(this);
@ -50,6 +52,15 @@ public class RedissonSession extends StandardSession {
private static final long serialVersionUID = -2518607181636076487L; private static final long serialVersionUID = -2518607181636076487L;
@Override
public Object getAttribute(String name) {
if (readMode == ReadMode.REDIS) {
return map.get(name);
}
return super.getAttribute(name);
}
@Override @Override
public void setId(String id, boolean notify) { public void setId(String id, boolean notify) {
super.setId(id, notify); super.setId(id, notify);
@ -167,24 +178,30 @@ public class RedissonSession extends StandardSession {
} }
} }
public void load() { public void load(Map<String, Object> attrs) {
Set<Entry<String, Object>> entrySet = map.readAllEntrySet(); Long creationTime = (Long) attrs.remove("session:creationTime");
for (Entry<String, Object> entry : entrySet) { if (creationTime != null) {
if ("session:creationTime".equals(entry.getKey())) { this.creationTime = creationTime;
creationTime = (Long) entry.getValue(); }
} else if ("session:lastAccessedTime".equals(entry.getKey())) { Long lastAccessedTime = (Long) attrs.remove("session:lastAccessedTime");
lastAccessedTime = (Long) entry.getValue(); if (lastAccessedTime != null) {
} else if ("session:thisAccessedTime".equals(entry.getKey())) { this.lastAccessedTime = lastAccessedTime;
thisAccessedTime = (Long) entry.getValue(); }
} else if ("session:maxInactiveInterval".equals(entry.getKey())) { Long thisAccessedTime = (Long) attrs.remove("session:thisAccessedTime");
maxInactiveInterval = (Integer) entry.getValue(); if (thisAccessedTime != null) {
} else if ("session:isValid".equals(entry.getKey())) { this.thisAccessedTime = thisAccessedTime;
isValid = (Boolean) entry.getValue(); }
} else if ("session:isNew".equals(entry.getKey())) { Boolean isValid = (Boolean) attrs.remove("session:isValid");
isNew = (Boolean) entry.getValue(); if (isValid != null) {
} else { this.isValid = isValid;
setAttribute(entry.getKey(), entry.getValue(), false); }
} Boolean isNew = (Boolean) attrs.remove("session:isNew");
if (isNew != null) {
this.isNew = isNew;
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
} }
} }

@ -17,6 +17,7 @@ package org.redisson.tomcat;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.catalina.Context; import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleException;
@ -28,6 +29,7 @@ import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config; import org.redisson.config.Config;
/** /**
@ -38,10 +40,21 @@ import org.redisson.config.Config;
*/ */
public class RedissonSessionManager extends ManagerBase { public class RedissonSessionManager extends ManagerBase {
public enum ReadMode {REDIS, MEMORY}
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
private RedissonClient redisson; private RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.MEMORY;
public String getReadMode() {
return readMode.toString();
}
public void setReadMode(String readMode) {
this.readMode = ReadMode.valueOf(readMode);
}
public void setConfigPath(String configPath) { public void setConfigPath(String configPath) {
this.configPath = configPath; this.configPath = configPath;
@ -91,9 +104,18 @@ public class RedissonSessionManager extends ManagerBase {
public Session findSession(String id) throws IOException { public Session findSession(String id) throws IOException {
Session result = super.findSession(id); Session result = super.findSession(id);
if (result == null && id != null) { if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession(); RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id); session.setId(id);
session.load(); session.load(attrs);
session.access();
session.endAccess();
return session; return session;
} }
@ -102,7 +124,7 @@ public class RedissonSessionManager extends ManagerBase {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this); return new RedissonSession(this, readMode);
} }
@Override @Override
@ -135,6 +157,15 @@ public class RedissonSessionManager extends ManagerBase {
} }
try { try {
try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
redisson = Redisson.create(config); redisson = Redisson.create(config);
} catch (Exception e) { } catch (Exception e) {
throw new LifecycleException(e); throw new LifecycleException(e);

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId> <artifactId>redisson-tomcat</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

@ -19,11 +19,11 @@ import java.lang.reflect.Field;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession; import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
/** /**
* Redisson Session object for Apache Tomcat * Redisson Session object for Apache Tomcat
@ -36,10 +36,13 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager; private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs; private final Map<String, Object> attrs;
private RMap<String, Object> map; private RMap<String, Object> map;
private final RedissonSessionManager.ReadMode readMode;
public RedissonSession(RedissonSessionManager manager) { public RedissonSession(RedissonSessionManager manager, RedissonSessionManager.ReadMode readMode) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode;
try { try {
Field attr = StandardSession.class.getDeclaredField("attributes"); Field attr = StandardSession.class.getDeclaredField("attributes");
attrs = (Map<String, Object>) attr.get(this); attrs = (Map<String, Object>) attr.get(this);
@ -50,6 +53,15 @@ public class RedissonSession extends StandardSession {
private static final long serialVersionUID = -2518607181636076487L; private static final long serialVersionUID = -2518607181636076487L;
@Override
public Object getAttribute(String name) {
if (readMode == ReadMode.REDIS) {
return map.get(name);
}
return super.getAttribute(name);
}
@Override @Override
public void setId(String id, boolean notify) { public void setId(String id, boolean notify) {
super.setId(id, notify); super.setId(id, notify);
@ -167,24 +179,30 @@ public class RedissonSession extends StandardSession {
} }
} }
public void load() { public void load(Map<String, Object> attrs) {
Set<Entry<String, Object>> entrySet = map.readAllEntrySet(); Long creationTime = (Long) attrs.remove("session:creationTime");
for (Entry<String, Object> entry : entrySet) { if (creationTime != null) {
if ("session:creationTime".equals(entry.getKey())) { this.creationTime = creationTime;
creationTime = (Long) entry.getValue(); }
} else if ("session:lastAccessedTime".equals(entry.getKey())) { Long lastAccessedTime = (Long) attrs.remove("session:lastAccessedTime");
lastAccessedTime = (Long) entry.getValue(); if (lastAccessedTime != null) {
} else if ("session:thisAccessedTime".equals(entry.getKey())) { this.lastAccessedTime = lastAccessedTime;
thisAccessedTime = (Long) entry.getValue(); }
} else if ("session:maxInactiveInterval".equals(entry.getKey())) { Long thisAccessedTime = (Long) attrs.remove("session:thisAccessedTime");
maxInactiveInterval = (Integer) entry.getValue(); if (thisAccessedTime != null) {
} else if ("session:isValid".equals(entry.getKey())) { this.thisAccessedTime = thisAccessedTime;
isValid = (Boolean) entry.getValue(); }
} else if ("session:isNew".equals(entry.getKey())) { Boolean isValid = (Boolean) attrs.remove("session:isValid");
isNew = (Boolean) entry.getValue(); if (isValid != null) {
} else { this.isValid = isValid;
setAttribute(entry.getKey(), entry.getValue(), false); }
} Boolean isNew = (Boolean) attrs.remove("session:isNew");
if (isNew != null) {
this.isNew = isNew;
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
} }
} }

@ -17,6 +17,7 @@ package org.redisson.tomcat;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleState; import org.apache.catalina.LifecycleState;
@ -27,6 +28,7 @@ import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson; import org.redisson.Redisson;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config; import org.redisson.config.Config;
/** /**
@ -37,11 +39,23 @@ import org.redisson.config.Config;
*/ */
public class RedissonSessionManager extends ManagerBase { public class RedissonSessionManager extends ManagerBase {
public enum ReadMode {REDIS, MEMORY}
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
private RedissonClient redisson; private RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.MEMORY;
public String getReadMode() {
return readMode.toString();
}
public void setReadMode(String readMode) {
this.readMode = ReadMode.valueOf(readMode);
}
public void setConfigPath(String configPath) { public void setConfigPath(String configPath) {
this.configPath = configPath; this.configPath = configPath;
} }
@ -90,9 +104,18 @@ public class RedissonSessionManager extends ManagerBase {
public Session findSession(String id) throws IOException { public Session findSession(String id) throws IOException {
Session result = super.findSession(id); Session result = super.findSession(id);
if (result == null && id != null) { if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession(); RedissonSession session = (RedissonSession) createEmptySession();
session.setId(id); session.setId(id);
session.load(); session.load(attrs);
session.access();
session.endAccess();
return session; return session;
} }
@ -101,7 +124,7 @@ public class RedissonSessionManager extends ManagerBase {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this); return new RedissonSession(this, readMode);
} }
@Override @Override
@ -134,6 +157,11 @@ public class RedissonSessionManager extends ManagerBase {
} }
try { try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
redisson = Redisson.create(config); redisson = Redisson.create(config);
} catch (Exception e) { } catch (Exception e) {
throw new LifecycleException(e); throw new LifecycleException(e);

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId> <artifactId>redisson-parent</artifactId>
<version>3.4.5-SNAPSHOT</version> <version>3.5.2-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

@ -15,6 +15,7 @@
*/ */
package org.redisson; package org.redisson;
import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.AbstractCollection; import java.util.AbstractCollection;
@ -60,13 +61,17 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.ThreadLocalRandom;
@ -178,6 +183,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidationListenerId; private int invalidationListenerId;
private int invalidationStatusListenerId; private int invalidationStatusListenerId;
private volatile long lastInvalidate; private volatile long lastInvalidate;
private Codec topicCodec;
protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) { protected RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options); super(commandExecutor, name, redisson, options);
@ -207,7 +213,28 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
private void addListeners(String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) { private void addListeners(String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(commandExecutor, suffixName(name, "topic")); topicCodec = codec;
LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(new byte[] {1, 2, 3}, new byte[] {4, 5, 6});
ByteBuf buf = null;
try {
byte[] s = topicCodec.getValueEncoder().encode(msg);
buf = Unpooled.wrappedBuffer(s);
msg = (LocalCachedMapInvalidate) topicCodec.getValueDecoder().decode(buf, null);
if (!Arrays.equals(msg.getExcludedId(), new byte[] {1, 2, 3})
|| !Arrays.equals(msg.getKeyHashes()[0], new byte[] {4, 5, 6})) {
throw new IllegalArgumentException();
}
} catch (Exception e) {
log.warn("Defined {} codec doesn't encode service messages properly. Default JsonJacksonCodec used to encode messages!", topicCodec.getClass());
topicCodec = new JsonJacksonCodec();
} finally {
if (buf != null) {
buf.release();
}
}
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, "topic"));
if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) { if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) {
return; return;
@ -1345,4 +1372,19 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future; return future;
} }
protected byte[] encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
if (reference != null) {
value = reference;
}
}
try {
return topicCodec.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
} }

@ -1576,18 +1576,21 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
} }
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"for i, value in ipairs(ARGV) do " "for i=1, #ARGV, 5000 do "
+ "if i % 2 == 0 then " + "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, table.getn(ARGV)))) "
+ "end; "
+ "for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "local val = struct.pack('dLc0', 0, string.len(value), value); " + "local val = struct.pack('dLc0', 0, string.len(value), value); "
+ "ARGV[i] = val; " + "ARGV[i] = val; "
+ "local key = ARGV[i-1];" + "local key = ARGV[i-1];"
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); " + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); "
+ "redis.call('publish', KEYS[2], msg); " + "redis.call('publish', KEYS[2], msg); "
+ "end;"
+ "end;" + "end;"
+ "return redis.call('hmset', KEYS[1], unpack(ARGV)); ", + "end;",
Arrays.<Object>asList(getName(), getCreatedChannelName()), params.toArray()); Arrays.<Object>asList(getName(), getCreatedChannelName()), params.toArray());
} }
@Override @Override

@ -20,16 +20,13 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.ThreadLocalRandom;
/** /**
@ -226,12 +223,4 @@ public class RedissonMultiLock implements Lock {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
protected boolean isLockFailed(Future<Boolean> future) {
return !future.isSuccess();
}
protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
return lockedLockHolder.get() == null && failed.get() == null;
}
} }

@ -43,6 +43,7 @@ import org.redisson.api.RQueueReactive;
import org.redisson.api.RReadWriteLockReactive; import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive; import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive; import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive; import org.redisson.api.RTopicReactive;
@ -55,6 +56,7 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport; import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler; import org.redisson.eviction.EvictionScheduler;
import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonAtomicLongReactive;
import org.redisson.reactive.RedissonBatchReactive; import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonBitSetReactive; import org.redisson.reactive.RedissonBitSetReactive;
@ -73,6 +75,7 @@ import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonScriptReactive; import org.redisson.reactive.RedissonScriptReactive;
import org.redisson.reactive.RedissonSemaphoreReactive;
import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetReactive; import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive; import org.redisson.reactive.RedissonTopicReactive;
@ -91,7 +94,9 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final ConnectionManager connectionManager; protected final ConnectionManager connectionManager;
protected final Config config; protected final Config config;
protected final CodecProvider codecProvider; protected final CodecProvider codecProvider;
protected final UUID id = UUID.randomUUID(); protected final UUID id = UUID.randomUUID();
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
protected RedissonReactive(Config config) { protected RedissonReactive(Config config) {
this.config = config; this.config = config;
@ -103,6 +108,11 @@ public class RedissonReactive implements RedissonReactiveClient {
codecProvider = config.getCodecProvider(); codecProvider = config.getCodecProvider();
} }
@Override
public RSemaphoreReactive getSemaphore(String name) {
return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub);
}
@Override @Override
public RReadWriteLockReactive getReadWriteLock(String name) { public RReadWriteLockReactive getReadWriteLock(String name) {
return new RedissonReadWriteLockReactive(commandExecutor, name, id); return new RedissonReadWriteLockReactive(commandExecutor, name, id);

@ -16,13 +16,9 @@
package org.redisson; package org.redisson;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import io.netty.util.concurrent.Future;
/** /**
* RedLock locking algorithm implementation for multiple locks. * RedLock locking algorithm implementation for multiple locks.
* It manages all locks as one. * It manages all locks as one.
@ -58,14 +54,4 @@ public class RedissonRedLock extends RedissonMultiLock {
unlockInner(locks); unlockInner(locks);
} }
@Override
protected boolean isLockFailed(Future<Boolean> future) {
return false;
}
@Override
protected boolean isAllLocksAcquired(AtomicReference<RLock> lockedLockHolder, AtomicReference<Throwable> failed, Queue<RLock> lockedLocks) {
return (lockedLockHolder.get() == null && failed.get() == null) || lockedLocks.size() >= minLocksAmount(locks);
}
} }

@ -27,7 +27,7 @@ import org.redisson.api.RSemaphore;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;
@ -48,9 +48,9 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
private final SemaphorePubSub semaphorePubSub; private final SemaphorePubSub semaphorePubSub;
final CommandExecutor commandExecutor; final CommandAsyncExecutor commandExecutor;
protected RedissonSemaphore(CommandExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) { public RedissonSemaphore(CommandAsyncExecutor commandExecutor, String name, SemaphorePubSub semaphorePubSub) {
super(commandExecutor, name); super(commandExecutor, name);
this.commandExecutor = commandExecutor; this.commandExecutor = commandExecutor;
this.semaphorePubSub = semaphorePubSub; this.semaphorePubSub = semaphorePubSub;
@ -478,7 +478,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
@Override @Override
public int drainPermits() { public int drainPermits() {
Long res = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, RFuture<Long> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local value = redis.call('get', KEYS[1]); " + "local value = redis.call('get', KEYS[1]); " +
"if (value == false or value == 0) then " + "if (value == false or value == 0) then " +
"return 0; " + "return 0; " +
@ -486,12 +486,14 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
"redis.call('set', KEYS[1], 0); " + "redis.call('set', KEYS[1], 0); " +
"return value;", "return value;",
Collections.<Object>singletonList(getName())); Collections.<Object>singletonList(getName()));
Long res = get(future);
return res.intValue(); return res.intValue();
} }
@Override @Override
public int availablePermits() { public int availablePermits() {
Long res = commandExecutor.write(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName()); RFuture<Long> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GET_LONG, getName());
Long res = get(future);
return res.intValue(); return res.intValue();
} }

@ -236,7 +236,7 @@ public interface RKeys extends RKeysAsync {
* <p> * <p>
* Requires Redis 4.0+ * Requires Redis 4.0+
* *
* @param keys * @param keys of objects
* @return number of removed keys * @return number of removed keys
*/ */
long unlink(String ... keys); long unlink(String ... keys);

@ -0,0 +1,179 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
*/
public interface RSemaphoreReactive extends RExpirableReactive {
/**
* Acquires a permit only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code false}.
*
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
Publisher<Boolean> tryAcquire();
/**
* Acquires the given number of permits only if all are available at the
* time of invocation.
*
* <p>Acquires a permits, if all are available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by given number of permits.
*
* <p>If no permits are available then this method will return
* immediately with the value {@code false}.
*
* @param permits the number of permits to acquire
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
Publisher<Boolean> tryAcquire(int permits);
/**
* Acquires a permit from this semaphore.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* @return void
*
*/
Publisher<Void> acquire();
/**
* Acquires the given number of permits, if they are available,
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* @param permits the number of permits to acquire
* @throws IllegalArgumentException if {@code permits} is negative
* @return void
*/
Publisher<Void> acquire(int permits);
/**
* Releases a permit, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one. If any threads of Redisson client are trying to acquire a permit,
* then one is selected and given the permit that was just released.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @return void
*/
Publisher<Void> release();
/**
* Releases the given number of permits, returning them to the semaphore.
*
* <p>Releases the given number of permits, increasing the number of available permits by
* the given number of permits. If any threads of Redisson client are trying to
* acquire a permits, then next threads is selected and tries to acquire the permits that was just released.
*
* <p>There is no requirement that a thread that releases a permits must
* have acquired that permit by calling {@link #acquire()}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permits amount
* @return void
*/
Publisher<Void> release(int permits);
/**
* Sets number of permits.
*
* @param permits - number of permits
* @return <code>true</code> if permits has been set successfully, otherwise <code>false</code>.
*/
Publisher<Boolean> trySetPermits(int permits);
/**
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If a permit is acquired then the value {@code true} is returned.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param waitTime the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if a permit was acquired and {@code false}
* if the waiting time elapsed before a permit was acquired
*/
Publisher<Boolean> tryAcquire(long waitTime, TimeUnit unit);
/**
* Acquires the given number of permits only if all are available
* within the given waiting time.
*
* <p>Acquires a permits, if all are available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If a permits is acquired then the value {@code true} is returned.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param permits amount
* @param waitTime the maximum time to wait for a available permits
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if a permit was acquired and {@code false}
* if the waiting time elapsed before a permit was acquired
*/
Publisher<Boolean> tryAcquire(int permits, long waitTime, TimeUnit unit);
/**
* Shrinks the number of available permits by the indicated
* reduction. This method can be useful in subclasses that use
* semaphores to track resources that become unavailable. This
* method differs from {@link #acquire()} in that it does not block
* waiting for permits to become available.
*
* @param permits - reduction the number of permits to remove
* @return void
* @throws IllegalArgumentException if {@code reduction} is negative
*/
Publisher<Void> reducePermits(int permits);
}

@ -30,6 +30,14 @@ import org.redisson.config.Config;
*/ */
public interface RedissonReactiveClient { public interface RedissonReactiveClient {
/**
* Returns semaphore instance by name
*
* @param name - name of object
* @return Semaphore object
*/
RSemaphoreReactive getSemaphore(String name);
/** /**
* Returns readWriteLock instance by name. * Returns readWriteLock instance by name.
* *

@ -175,6 +175,9 @@ public class RedisClient {
this.commandTimeout = config.getCommandTimeout(); this.commandTimeout = config.getCommandTimeout();
} }
public String getIpAddr() {
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public InetSocketAddress getAddr() { public InetSocketAddress getAddr() {
return addr; return addr;

@ -39,6 +39,11 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
@ -82,12 +87,16 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
return; return;
} }
timer.newTimeout(new TimerTask() { try {
@Override timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception { @Override
tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1)); public void run(Timeout timeout) throws Exception {
} tryReconnect(connection, Math.min(BACKOFF_CAP, attempts + 1));
}, timeout, TimeUnit.MILLISECONDS); }
}, timeout, TimeUnit.MILLISECONDS);
} catch (IllegalStateException e) {
// skip
}
} }
private void tryReconnect(final RedisConnection connection, final int nextAttempt) { private void tryReconnect(final RedisConnection connection, final int nextAttempt) {

@ -82,7 +82,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg); this.config = create(cfg);
initTimer(this.config); initTimer(this.config);
init(this.config);
Throwable lastException = null; Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>(); List<String> failedMasters = new ArrayList<String>();
@ -92,13 +91,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
List<ClusterNodeInfo> nodes = connection.sync(RedisCommands.CLUSTER_NODES); List<ClusterNodeInfo> nodes = connection.sync(RedisCommands.CLUSTER_NODES);
if (log.isDebugEnabled()) { StringBuilder nodesValue = new StringBuilder();
StringBuilder nodesValue = new StringBuilder(); for (ClusterNodeInfo clusterNodeInfo : nodes) {
for (ClusterNodeInfo clusterNodeInfo : nodes) { nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
} }
log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
lastClusterNode = addr; lastClusterNode = addr;
@ -185,23 +182,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
RedisConnection connection = future.getNow(); RedisConnection connection = future.getNow();
if (connection.isActive()) { if (connection.isActive()) {
nodeConnections.put(addr, connection); nodeConnections.put(addr, connection);
result.trySuccess(connection); result.trySuccess(connection);
} else { } else {
connection.closeAsync(); connection.closeAsync();
result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
} }
} }
}); });
return result; return result;
} }
@Override
protected void initEntry(MasterSlaveServersConfig config) {
}
private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) { private RFuture<Collection<RFuture<Void>>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " + RedisException e = new RedisException("Failed to add master: " +
@ -253,7 +246,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveEntry e; final MasterSlaveEntry e;
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>(); List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
if (config.getReadMode() == ReadMode.MASTER) { if (config.checkSkipSlavesInit()) {
e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
} else { } else {
config.setSlaveAddresses(partition.getSlaveAddresses()); config.setSlaveAddresses(partition.getSlaveAddresses());
@ -426,7 +419,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses()); aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) { for (URI uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri); currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
@ -435,7 +428,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) { for (URI uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri); currentPart.addFailedSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
@ -448,7 +441,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (URI uri : removedSlaves) { for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri); currentPart.removeSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
@ -466,7 +459,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
currentPart.addSlaveAddress(uri); currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); entry.slaveUp(uri, FreezeReason.MANAGER);
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
}); });
@ -510,8 +503,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
ClusterPartition newMasterPart = find(newPartitions, slot); ClusterPartition newMasterPart = find(newPartitions, slot);
// does partition has a new master? // does partition has a new master?
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress(); URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress(); URI oldUri = currentPart.getMasterAddress();

@ -0,0 +1,74 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.codec;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/**
*
* @author Nikita Koksharov
*
*/
public class CompositeCodec implements Codec {
private final Codec mapKeyCodec;
private final Codec mapValueCodec;
private final Codec valueCodec;
public CompositeCodec(Codec mapKeyCodec, Codec mapValueCodec) {
this(mapKeyCodec, mapValueCodec, null);
}
public CompositeCodec(Codec mapKeyCodec, Codec mapValueCodec, Codec valueCodec) {
super();
this.mapKeyCodec = mapKeyCodec;
this.mapValueCodec = mapValueCodec;
this.valueCodec = valueCodec;
}
@Override
public Decoder<Object> getMapValueDecoder() {
return mapValueCodec.getMapKeyDecoder();
}
@Override
public Encoder getMapValueEncoder() {
return mapValueCodec.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return mapKeyCodec.getMapKeyDecoder();
}
@Override
public Encoder getMapKeyEncoder() {
return mapKeyCodec.getMapKeyEncoder();
}
@Override
public Decoder<Object> getValueDecoder() {
return valueCodec.getValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return valueCodec.getValueEncoder();
}
}

@ -65,6 +65,8 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
*/ */
private int subscriptionConnectionPoolSize = 50; private int subscriptionConnectionPoolSize = 50;
private long dnsMonitoringInterval = 5000;
public BaseMasterSlaveServersConfig() { public BaseMasterSlaveServersConfig() {
} }
@ -79,6 +81,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize()); setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize());
setReadMode(config.getReadMode()); setReadMode(config.getReadMode());
setSubscriptionMode(config.getSubscriptionMode()); setSubscriptionMode(config.getSubscriptionMode());
setDnsMonitoringInterval(config.getDnsMonitoringInterval());
} }
/** /**
@ -257,6 +260,10 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return readMode; return readMode;
} }
public boolean checkSkipSlavesInit() {
return getReadMode() == ReadMode.MASTER && getSubscriptionMode() == SubscriptionMode.MASTER;
}
/** /**
* Set node type used for subscription operation. * Set node type used for subscription operation.
* <p> * <p>
@ -273,5 +280,22 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return subscriptionMode; return subscriptionMode;
} }
/**
* Interval in milliseconds to check the endpoint's DNS<p>
* Applications must ensure the JVM DNS cache TTL is low enough to support this.<p>
* Set <code>-1</code> to disable.
* <p>
* Default is <code>5000</code>.
*
* @param dnsMonitoringInterval time
* @return config
*/
public T setDnsMonitoringInterval(long dnsMonitoringInterval) {
this.dnsMonitoringInterval = dnsMonitoringInterval;
return (T) this;
}
public long getDnsMonitoringInterval() {
return dnsMonitoringInterval;
}
} }

@ -32,6 +32,11 @@ import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class ClientConnectionsEntry { public class ClientConnectionsEntry {
final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());
@ -49,17 +54,17 @@ public class ClientConnectionsEntry {
private FreezeReason freezeReason; private FreezeReason freezeReason;
final RedisClient client; final RedisClient client;
private final NodeType nodeType; private NodeType nodeType;
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
private final AtomicInteger failedAttempts = new AtomicInteger(); private final AtomicInteger failedAttempts = new AtomicInteger();
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize, public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType serverMode) { ConnectionManager connectionManager, NodeType nodeType) {
this.client = client; this.client = client;
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize); this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.nodeType = serverMode; this.nodeType = nodeType;
this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize); this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) { if (subscribePoolMaxSize > 0) {
@ -68,6 +73,9 @@ public class ClientConnectionsEntry {
connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter); connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter);
} }
public void setNodeType(NodeType nodeType) {
this.nodeType = nodeType;
}
public NodeType getNodeType() { public NodeType getNodeType() {
return nodeType; return nodeType;
} }

@ -0,0 +1,139 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnectionException;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
/**
* DNS changes monitor.
*
* @author Nikita Koksharov
*
*/
public class DNSMonitor {
private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class);
private ScheduledFuture<?> dnsMonitorFuture;
private ConnectionManager connectionManager;
private final Map<URI, InetAddress> masters = new HashMap<URI, InetAddress>();
private final Map<URI, InetAddress> slaves = new HashMap<URI, InetAddress>();
private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, Set<URI> masterHosts, Set<URI> slaveHosts, long dnsMonitoringInterval) {
for (URI host : masterHosts) {
try {
masters.put(host, InetAddress.getByName(host.getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + host.getHost(), e);
}
}
for (URI host : slaveHosts) {
try {
slaves.put(host, InetAddress.getByName(host.getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + host.getHost(), e);
}
}
this.connectionManager = connectionManager;
this.dnsMonitoringInterval = dnsMonitoringInterval;
}
public void start() {
monitorDnsChange();
log.debug("DNS monitoring enabled; Current masters: {}, slaves: {}", masters, slaves);
}
public void stop() {
if (dnsMonitorFuture != null) {
dnsMonitorFuture.cancel(true);
}
}
private void monitorDnsChange() {
dnsMonitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
// As InetAddress.getByName call is blocking. Method should be run in dedicated thread
connectionManager.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
for (Entry<URI, InetAddress> entry : masters.entrySet()) {
InetAddress master = entry.getValue();
InetAddress now = InetAddress.getByName(entry.getKey().getHost());
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress());
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) {
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost())
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) {
entrySet.changeMaster(entry.getKey());
}
}
masters.put(entry.getKey(), now);
log.info("Master {} has been changed", entry.getKey().getHost());
}
}
for (Entry<URI, InetAddress> entry : slaves.entrySet()) {
InetAddress slave = entry.getValue();
InetAddress updatedSlave = InetAddress.getByName(entry.getKey().getHost());
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
URI uri = URIBuilder.create("redis://" + slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) {
masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER);
}
}
slaves.put(entry.getKey(), updatedSlave);
log.info("Slave {} has been changed", entry.getKey().getHost());
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
monitorDnsChange();
}
}
});
}
}, dnsMonitoringInterval, TimeUnit.MILLISECONDS);
}
}

@ -51,8 +51,6 @@ import org.redisson.command.CommandSyncService;
import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
@ -128,6 +126,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>(); protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
protected DNSMonitor dnsMonitor;
protected MasterSlaveServersConfig config; protected MasterSlaveServersConfig config;
private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap(); private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();
@ -161,7 +161,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
this(config); this(config);
initTimer(cfg); initTimer(cfg);
init(cfg); this.config = cfg;
initSingleEntry();
} }
public MasterSlaveConnectionManager(Config cfg) { public MasterSlaveConnectionManager(Config cfg) {
@ -237,19 +238,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new HashSet<MasterSlaveEntry>(entries.values()); return new HashSet<MasterSlaveEntry>(entries.values());
} }
protected void init(MasterSlaveServersConfig config) {
this.config = config;
connectionWatcher = new IdleConnectionWatcher(this, config);
try {
initEntry(config);
} catch (RuntimeException e) {
stopThreads();
throw e;
}
}
protected void initTimer(MasterSlaveServersConfig config) { protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()}; int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
Arrays.sort(timeouts); Arrays.sort(timeouts);
@ -273,23 +261,35 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
connectionWatcher = new IdleConnectionWatcher(this, config);
} }
protected void initEntry(MasterSlaveServersConfig config) { protected void initSingleEntry() {
HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>(); try {
slots.add(singleSlotRange); HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
slots.add(singleSlotRange);
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(slots, this, config);
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} else {
entry = createMasterSlaveEntry(config, slots);
}
MasterSlaveEntry entry; for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
if (config.getReadMode() == ReadMode.MASTER) { addEntry(slot, entry);
entry = new SingleEntry(slots, this, config); }
RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
f.syncUninterruptibly();
} else {
entry = createMasterSlaveEntry(config, slots);
}
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { if (config.getDnsMonitoringInterval() != -1) {
addEntry(slot, entry); dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),
config.getSlaveAddresses(), config.getDnsMonitoringInterval());
dnsMonitor.start();
}
} catch (RuntimeException e) {
stopThreads();
throw e;
} }
} }
@ -349,7 +349,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void shutdownAsync(RedisClient client) { public void shutdownAsync(RedisClient client) {
clientEntries.remove(client); if (clientEntries.remove(client) == null) {
log.error("Can't find client {}", client);
}
client.shutdownAsync(); client.shutdownAsync();
} }
@ -683,10 +685,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entries.get(slot); return entries.get(slot);
} }
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason);
}
protected void changeMaster(int slot, URI address) { protected void changeMaster(int slot, URI address) {
getEntry(slot).changeMaster(address); getEntry(slot).changeMaster(address);
} }
@ -779,6 +777,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
if (dnsMonitor != null) {
dnsMonitor.stop();
}
shutdownLatch.close(); shutdownLatch.close();
shutdownPromise.trySuccess(true); shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly(); shutdownLatch.awaitUninterruptibly();

@ -42,6 +42,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -86,9 +87,13 @@ public class MasterSlaveEntry {
pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this); pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);
} }
public MasterSlaveServersConfig getConfig() {
return config;
}
public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) { public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty()
&& config.getReadMode() == ReadMode.SLAVE && !config.checkSkipSlavesInit()
&& disconnectedNodes.size() < config.getSlaveAddresses().size(); && disconnectedNodes.size() < config.getSlaveAddresses().size();
List<RFuture<Void>> result = new LinkedList<RFuture<Void>>(); List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();
@ -120,17 +125,8 @@ public class MasterSlaveEntry {
return writeConnectionHolder.add(masterEntry); return writeConnectionHolder.add(masterEntry);
} }
private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) { public boolean slaveDown(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason); ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason);
if (e == null) {
return false;
}
return slaveDown(e, freezeReason == FreezeReason.SYSTEM);
}
public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason);
if (entry == null) { if (entry == null) {
return false; return false;
} }
@ -140,10 +136,10 @@ public class MasterSlaveEntry {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr(); URI addr = masterEntry.getClient().getConfig().getAddress();
if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) { if (slaveUp(addr, FreezeReason.SYSTEM)) {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); log.info("master {} used as slave", addr);
} }
} }
@ -310,17 +306,21 @@ public class MasterSlaveEntry {
return slaveBalancer.contains(addr); return slaveBalancer.contains(addr);
} }
public boolean hasSlave(String addr) {
return slaveBalancer.contains(addr);
}
public RFuture<Void> addSlave(URI address) { public RFuture<Void> addSlave(URI address) {
return addSlave(address, true, NodeType.SLAVE); return addSlave(address, true, NodeType.SLAVE);
} }
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType mode) { private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client, ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSubscriptionConnectionMinimumIdleSize(), this.config.getSubscriptionConnectionMinimumIdleSize(),
this.config.getSubscriptionConnectionPoolSize(), connectionManager, mode); this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);
if (freezed) { if (freezed) {
synchronized (entry) { synchronized (entry) {
entry.setFreezed(freezed); entry.setFreezed(freezed);
@ -334,17 +334,18 @@ public class MasterSlaveEntry {
return masterEntry.getClient(); return masterEntry.getClient();
} }
public boolean slaveUp(String host, int port, FreezeReason freezeReason) { public boolean slaveUp(URI address, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(host, port, freezeReason)) { if (!slaveBalancer.unfreeze(address, freezeReason)) {
return false; return false;
} }
InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves // exclude master from slaves
if (config.getReadMode() == ReadMode.SLAVE if (!config.checkSkipSlavesInit()
&& (!addr.getHostName().equals(host) || port != addr.getPort())) { && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) {
slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); slaveDown(address, FreezeReason.SYSTEM);
log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); log.info("master {} excluded from slaves", addr);
} }
return true; return true;
} }
@ -364,14 +365,21 @@ public class MasterSlaveEntry {
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster); writeConnectionHolder.remove(oldMaster);
pubSubConnectionHolder.remove(oldMaster); pubSubConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);
oldMaster.freezeMaster(FreezeReason.MANAGER);
slaveDown(oldMaster, false);
slaveDown(URIBuilder.create("redis://" + oldMaster.getClient().getIpAddr()), FreezeReason.MANAGER);
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(address, NodeType.MASTER);
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE if (!config.checkSkipSlavesInit()
&& slaveBalancer.getAvailableClients() > 1) { && slaveBalancer.getAvailableClients() > 1) {
slaveDown(address.getHost(), address.getPort(), FreezeReason.SYSTEM); slaveDown(address, FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), address);
} }
}); });
} }
@ -410,10 +418,16 @@ public class MasterSlaveEntry {
} }
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command);
}
return slaveBalancer.nextConnection(command); return slaveBalancer.nextConnection(command);
} }
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
if (config.getReadMode() == ReadMode.MASTER) {
return connectionWriteOp(command);
}
return slaveBalancer.getConnection(command, addr); return slaveBalancer.getConnection(command, addr);
} }
@ -438,6 +452,10 @@ public class MasterSlaveEntry {
} }
public void releaseRead(RedisConnection connection) { public void releaseRead(RedisConnection connection) {
if (config.getReadMode() == ReadMode.MASTER) {
releaseWrite(connection);
return;
}
slaveBalancer.returnConnection(connection); slaveBalancer.returnConnection(connection);
} }

@ -101,7 +101,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Can't connect to servers!"); throw new RedisConnectionException("Can't connect to servers!");
} }
init(this.config); initSingleEntry();
scheduleMasterChangeCheck(cfg); scheduleMasterChangeCheck(cfg);
} }
@ -192,7 +192,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
if (master.equals(addr)) { if (master.equals(addr)) {
log.debug("Current master {} unchanged", master); log.debug("Current master {} unchanged", master);
} else if (currentMaster.compareAndSet(master, addr)) { } else if (currentMaster.compareAndSet(master, addr)) {
log.info("Master has changed from {} to {}", master, addr);
changeMaster(singleSlotRange.getStartSlot(), addr); changeMaster(singleSlotRange.getStartSlot(), addr);
} }
} }

@ -39,16 +39,15 @@ import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SentinelServersConfig; import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.redisson.misc.URIBuilder;
/** /**
* *
@ -68,13 +67,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config); super(config);
this.config = create(cfg);
initTimer(this.config);
if (cfg.getMasterName() == null) { if (cfg.getMasterName() == null) {
throw new IllegalArgumentException("masterName parameter is not defined!"); throw new IllegalArgumentException("masterName parameter is not defined!");
} }
this.config = create(cfg);
initTimer(this.config);
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
try { try {
@ -126,7 +125,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
stopThreads(); stopThreads();
throw new RedisConnectionException("Can't connect to servers!"); throw new RedisConnectionException("Can't connect to servers!");
} }
init(this.config);
initSingleEntry();
List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size()); List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size());
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {
@ -171,7 +171,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override @Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception { public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); log.warn("Can't connect to sentinel: {}", addr);
return; return;
} }
@ -220,8 +220,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];
String addr = createAddress(ip, port); URI uri = convert(ip, port);
URI uri = URIBuilder.create(addr);
registerSentinel(cfg, uri, c); registerSentinel(cfg, uri, c);
} }
} }
@ -241,7 +240,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
// to avoid addition twice // to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null) { if (slaves.putIfAbsent(slaveAddr, true) == null && !config.checkSkipSlavesInit()) {
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr)); RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr));
future.addListener(new FutureListener<Void>() { future.addListener(new FutureListener<Void>() {
@ -253,11 +252,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return; return;
} }
if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { URI uri = convert(ip, port);
if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr); log.info("slave: {} added", slaveAddr);
} }
} }
}); });
} else { } else {
slaveUp(ip, port); slaveUp(ip, port);
@ -267,6 +268,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
} }
protected URI convert(String ip, String port) {
String addr = createAddress(ip, port);
URI uri = URIBuilder.create(addr);
return uri;
}
private void onNodeDown(URI sentinelAddr, String msg) { private void onNodeDown(URI sentinelAddr, String msg) {
String[] parts = msg.split(" "); String[] parts = msg.split(" ");
@ -305,11 +312,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
private void slaveDown(String ip, String port) { private void slaveDown(String ip, String port) {
if (config.getReadMode() == ReadMode.MASTER) { if (config.checkSkipSlavesInit()) {
log.warn("slave: {}:{} has down", ip, port); log.warn("slave: {}:{} has down", ip, port);
} else { } else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { URI uri = convert(ip, port);
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {}:{} has down", ip, port); log.warn("slave: {}:{} has down", ip, port);
} }
} }
@ -361,13 +369,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
private void slaveUp(String ip, String port) { private void slaveUp(String ip, String port) {
if (config.getReadMode() == ReadMode.MASTER) { if (config.checkSkipSlavesInit()) {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr); log.info("slave: {} has up", slaveAddr);
return; return;
} }
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { URI uri = convert(ip, port);
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr); log.info("slave: {} has up", slaveAddr);
} }
@ -386,7 +395,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (!newMaster.equals(current) if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) { && currentMaster.compareAndSet(current, newMaster)) {
changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster)); changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));
log.info("master {} changed to {}", current, newMaster);
} }
} }
} else { } else {

@ -15,22 +15,11 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.RedisConnectionException;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.SingleServerConfig; import org.redisson.config.SingleServerConfig;
import org.redisson.config.SubscriptionMode; import org.redisson.config.SubscriptionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
/** /**
* *
@ -39,24 +28,8 @@ import io.netty.util.concurrent.ScheduledFuture;
*/ */
public class SingleConnectionManager extends MasterSlaveConnectionManager { public class SingleConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<InetAddress> currentMaster = new AtomicReference<InetAddress>();
private ScheduledFuture<?> monitorFuture;
public SingleConnectionManager(SingleServerConfig cfg, Config config) { public SingleConnectionManager(SingleServerConfig cfg, Config config) {
super(create(cfg), config); super(create(cfg), config);
if (cfg.isDnsMonitoring()) {
try {
this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + cfg.getAddress().getHost(), e);
}
log.debug("DNS monitoring enabled; Current master set to {}", currentMaster.get());
monitorDnsChange(cfg);
}
} }
private static MasterSlaveServersConfig create(SingleServerConfig cfg) { private static MasterSlaveServersConfig create(SingleServerConfig cfg) {
@ -84,6 +57,11 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
newconfig.setFailedAttempts(cfg.getFailedAttempts()); newconfig.setFailedAttempts(cfg.getFailedAttempts());
newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout()); newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout());
if (cfg.isDnsMonitoring()) {
newconfig.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval());
} else {
newconfig.setDnsMonitoringInterval(-1);
}
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
@ -92,41 +70,4 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
return newconfig; return newconfig;
} }
private void monitorDnsChange(final SingleServerConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
// As InetAddress.getByName call is blocking. Method should be run in dedicated thread
getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
InetAddress master = currentMaster.get();
InetAddress now = InetAddress.getByName(cfg.getAddress().getHost());
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress());
if (currentMaster.compareAndSet(master, now)) {
changeMaster(singleSlotRange.getStartSlot(), cfg.getAddress());
log.info("Master has been changed");
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
monitorDnsChange(cfg);
}
}
});
}
}, cfg.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
if (monitorFuture != null) {
monitorFuture.cancel(true);
}
super.shutdown();
}
} }

@ -16,15 +16,18 @@
package org.redisson.connection.balancer; package org.redisson.connection.balancer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
@ -39,21 +42,46 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class LoadBalancerManager { public class LoadBalancerManager {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubConnectionPool; private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool; private final SlaveConnectionPool slaveConnectionPool;
private final Map<String, ClientConnectionsEntry> ip2Entry = PlatformDependent.newConcurrentHashMap();
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry); slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry); pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
} }
public void changeType(InetSocketAddress addr, NodeType nodeType) {
ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress() + ":" + addr.getPort());
changeType(addr, nodeType, entry);
}
protected void changeType(Object addr, NodeType nodeType, ClientConnectionsEntry entry) {
if (entry != null) {
if (connectionManager.isClusterMode()) {
entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER);
}
entry.setNodeType(nodeType);
}
}
public void changeType(URI address, NodeType nodeType) {
ClientConnectionsEntry entry = getEntry(address);
changeType(address, nodeType, entry);
}
public RFuture<Void> add(final ClientConnectionsEntry entry) { public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise(); final RPromise<Void> result = connectionManager.newPromise();
FutureListener<Void> listener = new FutureListener<Void>() { FutureListener<Void> listener = new FutureListener<Void>() {
@ -65,7 +93,8 @@ public class LoadBalancerManager {
return; return;
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry); String addr = entry.getClient().getIpAddr();
ip2Entry.put(addr, entry);
result.trySuccess(null); result.trySuccess(null);
} }
} }
@ -80,7 +109,7 @@ public class LoadBalancerManager {
public int getAvailableClients() { public int getAvailableClients() {
int count = 0; int count = 0;
for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) { for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) {
if (!connectionEntry.isFreezed()) { if (!connectionEntry.isFreezed()) {
count++; count++;
} }
@ -88,11 +117,10 @@ public class LoadBalancerManager {
return count; return count;
} }
public boolean unfreeze(String host, int port, FreezeReason freezeReason) { public boolean unfreeze(URI address, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); ClientConnectionsEntry entry = getEntry(address);
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry == null) { if (entry == null) {
throw new IllegalStateException("Can't find " + addr + " in slaves!"); throw new IllegalStateException("Can't find " + address + " in slaves!");
} }
synchronized (entry) { synchronized (entry) {
@ -111,12 +139,21 @@ public class LoadBalancerManager {
return false; return false;
} }
public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) { private String convert(URI address) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort());
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason); return freeze(connectionEntry, freezeReason);
} }
private ClientConnectionsEntry getEntry(URI address) {
String addr = convert(address);
return ip2Entry.get(addr);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) { if (connectionEntry == null) {
return null; return null;
@ -143,11 +180,15 @@ public class LoadBalancerManager {
} }
public boolean contains(InetSocketAddress addr) { public boolean contains(InetSocketAddress addr) {
return addr2Entry.containsKey(addr); return ip2Entry.containsKey(addr.getAddress().getHostAddress() + ":" + addr.getPort());
}
public boolean contains(String addr) {
return ip2Entry.containsKey(addr);
} }
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr); ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress());
if (entry != null) { if (entry != null) {
return slaveConnectionPool.get(command, entry); return slaveConnectionPool.get(command, entry);
} }
@ -160,23 +201,23 @@ public class LoadBalancerManager {
} }
public void returnPubSubConnection(RedisPubSubConnection connection) { public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress());
pubSubConnectionPool.returnConnection(entry, connection); pubSubConnectionPool.returnConnection(entry, connection);
} }
public void returnConnection(RedisConnection connection) { public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress());
slaveConnectionPool.returnConnection(entry, connection); slaveConnectionPool.returnConnection(entry, connection);
} }
public void shutdown() { public void shutdown() {
for (ClientConnectionsEntry entry : addr2Entry.values()) { for (ClientConnectionsEntry entry : ip2Entry.values()) {
entry.getClient().shutdown(); entry.getClient().shutdown();
} }
} }
public void shutdownAsync() { public void shutdownAsync() {
for (ClientConnectionsEntry entry : addr2Entry.values()) { for (ClientConnectionsEntry entry : ip2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient()); connectionManager.shutdownAsync(entry.getClient());
} }
} }

@ -204,7 +204,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
RedisConnectionException exception = new RedisConnectionException( RedisConnectionException exception = new RedisConnectionException(
"Can't aquire connection to " + entry.getClient().getAddr()); "Can't aquire connection to " + entry);
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
} }
@ -296,7 +296,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) { private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) { if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry); checkForReconnect(entry, cause);
} }
releaseConnection(entry); releaseConnection(entry);
@ -308,7 +308,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
int attempts = entry.incFailedAttempts(); int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) { if (attempts == config.getFailedAttempts()) {
conn.closeAsync(); conn.closeAsync();
checkForReconnect(entry); checkForReconnect(entry, null);
} else if (attempts < config.getFailedAttempts()) { } else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn); releaseConnection(entry, conn);
} else { } else {
@ -321,15 +321,14 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.tryFailure(cause); promise.tryFailure(cause);
} }
private void checkForReconnect(ClientConnectionsEntry entry) { private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry); scheduleCheck(entry);
} else { } else {
if (entry.freezeMaster(FreezeReason.RECONNECT)) { if (entry.freezeMaster(FreezeReason.RECONNECT)) {
log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); log.error("host " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
scheduleCheck(entry); scheduleCheck(entry);
} }
} }
@ -342,19 +341,23 @@ abstract class ConnectionPool<T extends RedisConnection> {
connectionManager.newTimeout(new TimerTask() { connectionManager.newTimeout(new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (entry.getFreezeReason() != FreezeReason.RECONNECT synchronized (entry) {
|| !entry.isFreezed() if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()
|| connectionManager.isShuttingDown()) { || connectionManager.isShuttingDown()) {
return; return;
}
} }
RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync(); RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync();
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (entry.getFreezeReason() != FreezeReason.RECONNECT synchronized (entry) {
|| !entry.isFreezed()) { if (entry.getFreezeReason() != FreezeReason.RECONNECT
return; || !entry.isFreezed()) {
return;
}
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
@ -372,9 +375,11 @@ abstract class ConnectionPool<T extends RedisConnection> {
@Override @Override
public void operationComplete(Future<String> future) throws Exception { public void operationComplete(Future<String> future) throws Exception {
try { try {
if (entry.getFreezeReason() != FreezeReason.RECONNECT synchronized (entry) {
|| !entry.isFreezed()) { if (entry.getFreezeReason() != FreezeReason.RECONNECT
return; || !entry.isFreezed()) {
return;
}
} }
if (future.isSuccess() && "PONG".equals(future.getNow())) { if (future.isSuccess() && "PONG".equals(future.getNow())) {
@ -385,14 +390,14 @@ abstract class ConnectionPool<T extends RedisConnection> {
public void operationComplete(Future<Void> future) public void operationComplete(Future<Void> future)
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr()); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
} else { } else {
synchronized (entry) { synchronized (entry) {
if (entry.getFreezeReason() == FreezeReason.RECONNECT) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) {
entry.setFreezed(false); entry.setFreezed(false);
entry.setFreezeReason(null); entry.setFreezeReason(null);
log.info("host {} successfully reconnected", entry.getClient().getAddr()); log.info("host {} has been successfully reconnected", entry.getClient().getAddr());
} }
} }
} }

@ -67,7 +67,7 @@ public class TasksRunnerService implements RemoteExecutorService, RemoteParams {
try { try {
this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader); this.codec = codec.getClass().getConstructor(ClassLoader.class).newInstance(classLoader);
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
} }
} }

@ -0,0 +1,152 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
import org.redisson.RedissonSemaphore;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RSemaphoreAsync;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.pubsub.SemaphorePubSub;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSemaphoreReactive extends RedissonExpirableReactive implements RSemaphoreReactive {
private final RSemaphoreAsync instance;
public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name);
instance = new RedissonSemaphore(commandExecutor, name, semaphorePubSub);
}
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name, UUID id) {
return new RedissonLock(commandExecutor, name, id);
}
@Override
public Publisher<Boolean> tryAcquire() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync();
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final int permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits);
}
});
}
@Override
public Publisher<Void> acquire() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync();
}
});
}
@Override
public Publisher<Void> acquire(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync(permits);
}
});
}
@Override
public Publisher<Void> release() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.releaseAsync();
}
});
}
@Override
public Publisher<Void> release(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.releaseAsync(permits);
}
});
}
@Override
public Publisher<Boolean> trySetPermits(final int permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetPermitsAsync(permits);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(waitTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final int permits, final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits, waitTime, unit);
}
});
}
@Override
public Publisher<Void> reducePermits(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.reducePermitsAsync(permits);
}
});
}
}

@ -15,6 +15,8 @@
*/ */
package org.redisson.spring.cache; package org.redisson.spring.cache;
import java.io.Serializable;
import org.springframework.cache.Cache.ValueWrapper; import org.springframework.cache.Cache.ValueWrapper;
/** /**
@ -22,7 +24,9 @@ import org.springframework.cache.Cache.ValueWrapper;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class NullValue implements ValueWrapper { public class NullValue implements ValueWrapper, Serializable {
private static final long serialVersionUID = -8310337775544536701L;
public static final NullValue INSTANCE = new NullValue(); public static final NullValue INSTANCE = new NullValue();

@ -55,7 +55,7 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
RedissonClient redisson; RedissonClient redisson;
Map<String, CacheConfig> configMap = new ConcurrentHashMap<String, CacheConfig>(); Map<String, CacheConfig> configMap = new ConcurrentHashMap<String, CacheConfig>();
private ConcurrentMap<String, Cache> instanceMap = new ConcurrentHashMap<String, Cache>(); ConcurrentMap<String, Cache> instanceMap = new ConcurrentHashMap<String, Cache>();
String configLocation; String configLocation;

@ -8,7 +8,6 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -22,6 +21,7 @@ import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.RLocalCachedMap; import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.cache.Cache; import org.redisson.cache.Cache;
import org.redisson.client.codec.StringCodec;
import mockit.Deencapsulation; import mockit.Deencapsulation;
@ -149,6 +149,32 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
}.execute(); }.execute();
} }
@Test
public void testInvalidationOnUpdateNonBinaryCodec() throws InterruptedException {
LocalCachedMapOptions<String, String> options = LocalCachedMapOptions.<String, String>defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5);
RLocalCachedMap<String, String> map1 = redisson.getLocalCachedMap("test", new StringCodec(), options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
RLocalCachedMap<String, String> map2 = redisson.getLocalCachedMap("test", new StringCodec(), options);
Cache<CacheKey, CacheValue> cache2 = Deencapsulation.getField(map2, "cache");
map1.put("1", "1");
map1.put("2", "2");
assertThat(map2.get("1")).isEqualTo("1");
assertThat(map2.get("2")).isEqualTo("2");
assertThat(cache1.size()).isEqualTo(2);
assertThat(cache2.size()).isEqualTo(2);
map1.put("1", "3");
map2.put("2", "4");
Thread.sleep(50);
assertThat(cache1.size()).isEqualTo(1);
assertThat(cache2.size()).isEqualTo(1);
}
@Test @Test
public void testInvalidationOnUpdate() throws InterruptedException { public void testInvalidationOnUpdate() throws InterruptedException {
new InvalidationTest() { new InvalidationTest() {

@ -457,6 +457,19 @@ public class RedissonMapCacheTest extends BaseMapTest {
Assert.assertEquals(1, map.size()); Assert.assertEquals(1, map.size());
} }
@Test
public void testPutAllBig() {
Map<Integer, String> joinMap = new HashMap<Integer, String>();
for (int i = 0; i < 100000; i++) {
joinMap.put(i, "" + i);
}
Map<Integer, String> map = redisson.getMapCache("simple");
map.putAll(joinMap);
assertThat(map.size()).isEqualTo(joinMap.size());
}
@Test @Test
public void testPutAll() { public void testPutAll() {
Map<Integer, String> map = redisson.getMapCache("simple"); Map<Integer, String> map = redisson.getMapCache("simple");

@ -32,7 +32,6 @@ import org.redisson.api.NodesGroup;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -536,7 +535,7 @@ public class RedissonTest {
@Test(expected = RedisConnectionException.class) @Test(expected = RedisConnectionException.class)
public void testSentinelConnectionFail() throws InterruptedException { public void testSentinelConnectionFail() throws InterruptedException {
Config config = new Config(); Config config = new Config();
config.useSentinelServers().addSentinelAddress("redis://127.99.0.1:1111"); config.useSentinelServers().addSentinelAddress("redis://127.99.0.1:1111").setMasterName("test");
Redisson.create(config); Redisson.create(config);
Thread.sleep(1500); Thread.sleep(1500);

Loading…
Cancel
Save