Merge branch 'master' into 3.0.0

pull/1933/head
Nikita Koksharov 6 years ago
commit f132165e25

@ -51,7 +51,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic topic;
private final RTopic topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -60,6 +60,7 @@ public class RedissonSession extends StandardSession {
this.redissonManager = manager;
this.readMode = readMode;
this.updateMode = updateMode;
this.topic = redissonManager.getTopic();
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
@ -80,13 +81,6 @@ public class RedissonSession extends StandardSession {
return super.getAttribute(name);
}
@Override
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
@ -123,9 +117,13 @@ public class RedissonSession extends StandardSession {
if (readMode == ReadMode.MEMORY) {
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
protected void expireSession() {
if (maxInactiveInterval >= 0) {
map.expire(maxInactiveInterval + 60, TimeUnit.SECONDS);
}
}
@ -143,9 +141,7 @@ public class RedissonSession extends StandardSession {
if (map != null) {
fastPut(MAX_INACTIVE_INTERVAL_ATTR, maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
@ -183,7 +179,7 @@ public class RedissonSession extends StandardSession {
boolean oldValue = isNew;
super.endAccess();
if (isNew != oldValue) {
if (isNew != oldValue && map != null) {
fastPut(IS_NEW_ATTR, isNew);
}
}
@ -218,6 +214,10 @@ public class RedissonSession extends StandardSession {
}
public void save() {
if (map == null) {
map = redissonManager.getMap(id);
}
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put(CREATION_TIME_ATTR, creationTime);
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
@ -237,9 +237,7 @@ public class RedissonSession extends StandardSession {
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
public void load(Map<String, Object> attrs) {

@ -39,6 +39,8 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config;
/**
@ -145,15 +147,14 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
session.setManager(this);
session.setId(sessionId);
session.save();
return session;
}
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
}
public RTopic getTopic() {
@ -204,6 +205,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
return new RedissonSession(this, readMode, updateMode);
}
@Override
public void add(Session session) {
super.add(session);
((RedissonSession)session).save();
}
@Override
public void remove(Session session) {
super.remove(session);

@ -51,7 +51,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic topic;
private final RTopic topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -60,6 +60,7 @@ public class RedissonSession extends StandardSession {
this.redissonManager = manager;
this.readMode = readMode;
this.updateMode = updateMode;
this.topic = redissonManager.getTopic();
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
@ -80,13 +81,6 @@ public class RedissonSession extends StandardSession {
return super.getAttribute(name);
}
@Override
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
@ -123,9 +117,13 @@ public class RedissonSession extends StandardSession {
if (readMode == ReadMode.MEMORY) {
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
protected void expireSession() {
if (maxInactiveInterval >= 0) {
map.expire(maxInactiveInterval + 60, TimeUnit.SECONDS);
}
}
@ -143,9 +141,7 @@ public class RedissonSession extends StandardSession {
if (map != null) {
fastPut(MAX_INACTIVE_INTERVAL_ATTR, maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
@ -183,7 +179,7 @@ public class RedissonSession extends StandardSession {
boolean oldValue = isNew;
super.endAccess();
if (isNew != oldValue) {
if (isNew != oldValue && map != null) {
fastPut(IS_NEW_ATTR, isNew);
}
}
@ -218,6 +214,10 @@ public class RedissonSession extends StandardSession {
}
public void save() {
if (map == null) {
map = redissonManager.getMap(id);
}
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put(CREATION_TIME_ATTR, creationTime);
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
@ -237,9 +237,7 @@ public class RedissonSession extends StandardSession {
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
public void load(Map<String, Object> attrs) {

@ -37,6 +37,8 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config;
/**
@ -124,15 +126,14 @@ public class RedissonSessionManager extends ManagerBase {
session.setManager(this);
session.setId(sessionId);
session.save();
return session;
}
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
}
public RTopic getTopic() {
@ -183,6 +184,12 @@ public class RedissonSessionManager extends ManagerBase {
return new RedissonSession(this, readMode, updateMode);
}
@Override
public void add(Session session) {
super.add(session);
((RedissonSession)session).save();
}
@Override
public void remove(Session session, boolean update) {
super.remove(session, update);

@ -51,7 +51,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic topic;
private final RTopic topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -60,6 +60,7 @@ public class RedissonSession extends StandardSession {
this.redissonManager = manager;
this.readMode = readMode;
this.updateMode = updateMode;
this.topic = redissonManager.getTopic();
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
@ -80,13 +81,6 @@ public class RedissonSession extends StandardSession {
return super.getAttribute(name);
}
@Override
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
@ -123,12 +117,16 @@ public class RedissonSession extends StandardSession {
if (readMode == ReadMode.MEMORY) {
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
protected void expireSession() {
if (maxInactiveInterval >= 0) {
map.expire(maxInactiveInterval + 60, TimeUnit.SECONDS);
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) {
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<String, Object> entry : newMap.entrySet()) {
@ -143,9 +141,7 @@ public class RedissonSession extends StandardSession {
if (map != null) {
fastPut(MAX_INACTIVE_INTERVAL_ATTR, maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
@ -183,7 +179,7 @@ public class RedissonSession extends StandardSession {
boolean oldValue = isNew;
super.endAccess();
if (isNew != oldValue) {
if (isNew != oldValue && map != null) {
fastPut(IS_NEW_ATTR, isNew);
}
}
@ -218,6 +214,10 @@ public class RedissonSession extends StandardSession {
}
public void save() {
if (map == null) {
map = redissonManager.getMap(id);
}
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put(CREATION_TIME_ATTR, creationTime);
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
@ -237,9 +237,7 @@ public class RedissonSession extends StandardSession {
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
public void load(Map<String, Object> attrs) {

@ -36,6 +36,8 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config;
/**
@ -123,15 +125,14 @@ public class RedissonSessionManager extends ManagerBase {
session.setManager(this);
session.setId(sessionId);
session.save();
return session;
}
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
}
public RTopic getTopic() {
@ -193,6 +194,12 @@ public class RedissonSessionManager extends ManagerBase {
}
}
@Override
public void add(Session session) {
super.add(session);
((RedissonSession)session).save();
}
public RedissonClient getRedisson() {
return redisson;
}

@ -51,7 +51,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic topic;
private final RTopic topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -60,6 +60,7 @@ public class RedissonSession extends StandardSession {
this.redissonManager = manager;
this.readMode = readMode;
this.updateMode = updateMode;
this.topic = redissonManager.getTopic();
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
@ -80,13 +81,6 @@ public class RedissonSession extends StandardSession {
return super.getAttribute(name);
}
@Override
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
@ -123,12 +117,16 @@ public class RedissonSession extends StandardSession {
if (readMode == ReadMode.MEMORY) {
topic.publish(createPutAllMessage(newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
protected void expireSession() {
if (maxInactiveInterval >= 0) {
map.expire(maxInactiveInterval + 60, TimeUnit.SECONDS);
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<String, Object> newMap) {
Map<String, Object> map = new HashMap<String, Object>();
for (Entry<String, Object> entry : newMap.entrySet()) {
@ -143,9 +141,7 @@ public class RedissonSession extends StandardSession {
if (map != null) {
fastPut(MAX_INACTIVE_INTERVAL_ATTR, maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
}
@ -183,7 +179,7 @@ public class RedissonSession extends StandardSession {
boolean oldValue = isNew;
super.endAccess();
if (isNew != oldValue) {
if (isNew != oldValue && map != null) {
fastPut(IS_NEW_ATTR, isNew);
}
}
@ -218,6 +214,10 @@ public class RedissonSession extends StandardSession {
}
public void save() {
if (map == null) {
map = redissonManager.getMap(id);
}
Map<String, Object> newMap = new HashMap<String, Object>();
newMap.put(CREATION_TIME_ATTR, creationTime);
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
@ -236,10 +236,7 @@ public class RedissonSession extends StandardSession {
if (readMode == ReadMode.MEMORY) {
topic.publish(createPutAllMessage(newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
expireSession();
}
public void load(Map<String, Object> attrs) {

@ -36,6 +36,8 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config;
/**
@ -123,15 +125,14 @@ public class RedissonSessionManager extends ManagerBase {
session.setManager(this);
session.setId(sessionId);
session.save();
return session;
}
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
}
public RTopic getTopic() {
@ -191,6 +192,12 @@ public class RedissonSessionManager extends ManagerBase {
}
}
@Override
public void add(Session session) {
super.add(session);
((RedissonSession)session).save();
}
public RedissonClient getRedisson() {
return redisson;
}

@ -150,7 +150,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"redis.call('del', KEYS[1]); " +
"return vals", Collections.<Object>singletonList(getName()));
}

@ -712,7 +712,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
+ ", command: " + command + ", command params: " + LogHelper.toString(details.getParams())
+ " after " + details.getAttempt() + " retry attempts"));
}
connectionManager.getShutdownLatch().release();
} else {
if (details.getConnectionFuture().isSuccess()) {
if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
@ -781,6 +780,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (connFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
return;
}

@ -691,7 +691,6 @@ public class CommandBatchService extends CommandAsyncService {
details.setException(new RedisTimeoutException("Unable to get connection! "
+ "Node source: " + source + " after " + attempts + " retry attempts"));
}
connectionManager.getShutdownLatch().release();
} else {
if (connectionFuture.isSuccess()) {
if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
@ -859,6 +858,7 @@ public class CommandBatchService extends CommandAsyncService {
RFuture<RedisConnection> connFuture, final boolean noResult, final long responseTimeout, final int attempts,
ExecutionMode executionMode, final AtomicInteger slots) {
if (connFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
return;
}

@ -166,5 +166,10 @@ public class RedisClientEntry implements ClusterNode {
}
throw new IllegalStateException();
}
@Override
public String toString() {
return "RedisClientEntry [client=" + client + ", type=" + type + "]";
}
}

@ -30,6 +30,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.ReplicatedServersConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,8 +95,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!");
}
if (this.config.getSlaveAddresses().isEmpty()) {
log.warn("Slave nodes not found! Please specify all nodes in replicated mode.");
if (this.config.getReadMode() != ReadMode.MASTER && this.config.getSlaveAddresses().isEmpty()) {
log.warn("ReadMode = " + this.config.getReadMode() + ", but slave nodes are not found! Please specify all nodes in replicated mode.");
}
initSingleEntry();
@ -111,6 +112,10 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
private void scheduleMasterChangeCheck(final ReplicatedServersConfig cfg) {
if (isShuttingDown()) {
return;
}
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {

@ -41,6 +41,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.CountableListener;
@ -162,6 +163,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
stopThreads();
throw new RedisConnectionException("Can't connect to servers!");
}
if (this.config.getReadMode() != ReadMode.MASTER && this.config.getSlaveAddresses().isEmpty()) {
log.warn("ReadMode = " + this.config.getReadMode() + ", but slave nodes are not found!");
}
initSingleEntry();

@ -31,6 +31,7 @@ import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEvent;
@ -60,7 +61,7 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito
public RedissonSession() {
this.delegate = new MapSession();
map = redisson.getMap(keyPrefix + delegate.getId());
map = redisson.getMap(keyPrefix + delegate.getId(), new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
principalName = resolvePrincipal(delegate);
Map<String, Object> newMap = new HashMap<String, Object>(3);
@ -84,7 +85,7 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito
public RedissonSession(String sessionId) {
this.delegate = new MapSession(sessionId);
map = redisson.getMap(keyPrefix + sessionId);
map = redisson.getMap(keyPrefix + sessionId, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
principalName = resolvePrincipal(delegate);
}
@ -382,7 +383,7 @@ public class RedissonSessionRepository implements FindByIndexNameSessionReposito
private RSet<String> getPrincipalSet(String indexValue) {
String principalKey = getPrincipalKey(indexValue);
return redisson.getSet(principalKey);
return redisson.getSet(principalKey, StringCodec.INSTANCE);
}
}

@ -467,6 +467,18 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test
public void testDrainToSingle() {
RBlockingQueue<Integer> queue = getQueue();
Assert.assertTrue(queue.add(1));
Assert.assertEquals(1, queue.size());
Set<Integer> batch = new HashSet<Integer>();
int count = queue.drainTo(batch);
Assert.assertEquals(1, count);
Assert.assertEquals(1, batch.size());
Assert.assertTrue(queue.isEmpty());
}
@Test
public void testDrainTo() {
RBlockingQueue<Integer> queue = getQueue();

Loading…
Cancel
Save