Merge branch 'master' into 3.0.0

pull/1821/head
Nikita 7 years ago
commit 204f482d43

@ -134,7 +134,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>4.1.23.Final</version>
<version>4.1.24.Final</version>
<type>pom</type>
<scope>import</scope>
</dependency>

@ -0,0 +1,38 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}

@ -0,0 +1,40 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}

@ -0,0 +1,45 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}

@ -0,0 +1,32 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -69,6 +71,15 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
@Override
@ -81,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
@ -93,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
@ -104,13 +121,20 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
super.setValid(isValid);
@ -120,7 +144,7 @@ public class RedissonSession extends StandardSession {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
@ -129,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
@ -139,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
@ -177,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -210,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}

@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
@ -32,7 +33,10 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
/**
@ -141,7 +145,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
@ -149,6 +158,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
Session result = super.findSession(id);
if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty() || !Boolean.valueOf(String.valueOf(attrs.get("session:isValid")))) {
log.info("Session " + id + " can't be found");
return null;
@ -178,7 +188,9 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public void remove(Session session) {
super.remove(session);
getMap(session.getId()).delete();
if (session.getIdInternal() != null) {
((RedissonSession)session).delete();
}
}
public RedissonClient getRedisson() {
@ -193,6 +205,43 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
lifecycle.fireLifecycleEvent(START_EVENT, null);
}

@ -18,6 +18,30 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "/src/test/");

@ -0,0 +1,38 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}

@ -0,0 +1,40 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}

@ -0,0 +1,45 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}

@ -0,0 +1,32 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -69,10 +71,14 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
@ -86,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
@ -98,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
@ -109,12 +121,19 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
@ -124,8 +143,8 @@ public class RedissonSession extends StandardSession {
if (!isValid && !map.isExists()) {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
@ -134,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
@ -144,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
@ -182,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -215,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}

@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
@ -30,7 +31,9 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
@ -121,7 +124,12 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
@ -129,6 +137,7 @@ public class RedissonSessionManager extends ManagerBase {
Session result = super.findSession(id);
if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty() || !Boolean.valueOf(String.valueOf(attrs.get("session:isValid")))) {
log.info("Session " + id + " can't be found");
return null;
@ -142,7 +151,7 @@ public class RedissonSessionManager extends ManagerBase {
session.endAccess();
return session;
}
result.access();
result.endAccess();
@ -170,13 +179,49 @@ public class RedissonSessionManager extends ManagerBase {
@Override
protected void startInternal() throws LifecycleException {
super.startInternal();
redisson = buildClient();
if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session, false);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
setState(LifecycleState.STARTING);
}

@ -17,6 +17,31 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
@ -170,7 +195,7 @@ public class RedissonSessionManagerTest {
String response = executor.execute(Request.Get(url)).returnContent().asString();
Assert.assertEquals(value, response);
}
private void read(Executor executor, String key, String value) throws IOException, ClientProtocolException {
String url = "http://localhost:8080/myapp/read?key=" + key;
String response = executor.execute(Request.Get(url)).returnContent().asString();

@ -0,0 +1,38 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}

@ -0,0 +1,40 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}

@ -0,0 +1,45 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}

@ -0,0 +1,32 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -69,10 +71,14 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
@ -86,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
@ -98,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
@ -109,12 +121,19 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
@ -125,7 +144,7 @@ public class RedissonSession extends StandardSession {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
@ -134,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
@ -144,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
@ -182,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -215,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}

@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
@ -29,7 +30,9 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
@ -120,7 +123,12 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
@ -176,6 +184,43 @@ public class RedissonSessionManager extends ManagerBase {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session, false);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
setState(LifecycleState.STARTING);
}

@ -17,6 +17,31 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");

@ -0,0 +1,38 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}

@ -0,0 +1,40 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}

@ -0,0 +1,45 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}

@ -0,0 +1,32 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}

@ -0,0 +1,41 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
@ -69,10 +71,14 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
@ -86,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
@ -98,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
@ -109,12 +121,19 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
@ -125,7 +144,7 @@ public class RedissonSession extends StandardSession {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
@ -134,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
@ -144,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
@ -182,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
@ -215,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}

@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
@ -29,7 +30,9 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
@ -120,7 +123,12 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
@ -142,7 +150,7 @@ public class RedissonSessionManager extends ManagerBase {
session.endAccess();
return session;
}
result.access();
result.endAccess();
@ -176,6 +184,43 @@ public class RedissonSessionManager extends ManagerBase {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session, false);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
setState(LifecycleState.STARTING);
}

@ -17,6 +17,31 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
@ -44,7 +69,7 @@ public class RedissonSessionManagerTest {
server1.stop();
server2.stop();
}
@Test
public void testSwitchServer() throws Exception {
// start the server at http://localhost:8080/myapp

@ -24,7 +24,8 @@ import java.util.List;
import org.redisson.api.RBitSet;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
@ -57,12 +58,12 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public RFuture<Boolean> getAsync(long bitIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex);
return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.GETBIT, getName(), bitIndex);
}
@Override
public void set(long bitIndex) {
get(setAsync(bitIndex, true));
public boolean set(long bitIndex) {
return get(setAsync(bitIndex, true));
}
@Override
@ -82,11 +83,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public RFuture<Boolean> setAsync(long bitIndex, boolean value) {
RedisCommand<Boolean> command = RedisCommands.SETBIT_TRUE;
if (!value) {
command = RedisCommands.SETBIT_FALSE;
}
return commandExecutor.writeAsync(getName(), codec, command, getName(), bitIndex, value ? 1 : 0);
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.SETBIT, getName(), bitIndex, value ? 1 : 0);
}
@Override
@ -204,7 +201,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
public RFuture<Void> setAsync(long fromIndex, long toIndex, boolean value) {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (long i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, value ? 1 : 0);
executorService.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.SETBIT_VOID, getName(), i, value ? 1 : 0);
}
return executorService.executeAsyncVoid();
}
@ -231,7 +228,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public RFuture<Long> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.BITS_SIZE, getName());
return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.BITS_SIZE, getName());
}
@Override
@ -241,7 +238,7 @@ public class RedissonBitSet extends RedissonExpirable implements RBitSet {
@Override
public RFuture<Long> cardinalityAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName());
return commandExecutor.readAsync(getName(), LongCodec.INSTANCE, RedisCommands.BITCOUNT, getName());
}
@Override

@ -52,8 +52,16 @@ public interface RBitSet extends RExpirable, RBitSetAsync {
*/
void clear(long fromIndex, long toIndex);
/**
* Copy bits state of source BitSet object to this object
*
* @param bs - BitSet source
*/
void set(BitSet bs);
/**
* Executes NOT operation over all bits
*/
void not();
/**
@ -84,9 +92,10 @@ public interface RBitSet extends RExpirable, RBitSetAsync {
* Set bit to one at specified bitIndex
*
* @param bitIndex - index of bit
*
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
void set(long bitIndex);
boolean set(long bitIndex);
/**
* Set bit to <code>value</code> at specified <code>bitIndex</code>
@ -122,10 +131,28 @@ public interface RBitSet extends RExpirable, RBitSetAsync {
BitSet asBitSet();
/**
* Executes OR operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
*/
void or(String... bitSetNames);
/**
* Executes AND operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
*/
void and(String... bitSetNames);
/**
* Executes XOR operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
*/
void xor(String... bitSetNames);
}

@ -57,8 +57,19 @@ public interface RBitSetAsync extends RExpirableAsync {
*/
RFuture<Void> clearAsync(long fromIndex, long toIndex);
/**
* Copy bits state of source BitSet object to this object
*
* @param bs - BitSet source
* @return void
*/
RFuture<Void> setAsync(BitSet bs);
/**
* Executes NOT operation over all bits
*
* @return void
*/
RFuture<Void> notAsync();
/**
@ -67,7 +78,6 @@ public interface RBitSetAsync extends RExpirableAsync {
* @param fromIndex inclusive
* @param toIndex exclusive
* @return void
*
*/
RFuture<Void> setAsync(long fromIndex, long toIndex);
@ -90,7 +100,8 @@ public interface RBitSetAsync extends RExpirableAsync {
* Set bit to one at specified bitIndex
*
* @param bitIndex - index of bit
* @return void
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
RFuture<Boolean> setAsync(long bitIndex);
@ -99,9 +110,9 @@ public interface RBitSetAsync extends RExpirableAsync {
*
* @param bitIndex - index of bit
* @param value true = 1, false = 0
* @return previous value
*
*/
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
RFuture<Boolean> setAsync(long bitIndex, boolean value);
/**
@ -127,10 +138,31 @@ public interface RBitSetAsync extends RExpirableAsync {
*/
RFuture<Void> clearAsync();
/**
* Executes OR operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
* @return void
*/
RFuture<Void> orAsync(String... bitSetNames);
/**
* Executes AND operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
* @return void
*/
RFuture<Void> andAsync(String... bitSetNames);
/**
* Executes XOR operation over this object and specified bitsets.
* Stores result into this object.
*
* @param bitSetNames - name of stored bitsets
* @return void
*/
RFuture<Void> xorAsync(String... bitSetNames);
}

@ -46,7 +46,7 @@ public class TransactionOptions {
/**
* Defines timeout for Redis response.
* Starts to countdown when Redis command has been successfully sent.
* Starts to countdown when transaction has been successfully sent.
* <p>
* Default is <code>3000 milliseconds</code>
*
@ -64,7 +64,7 @@ public class TransactionOptions {
}
/**
* Defines attempts amount to send Redis commands batch
* Defines attempts amount to send transaction
* if it hasn't been sent already.
* <p>
* Default is <code>3 attempts</code>
@ -82,7 +82,7 @@ public class TransactionOptions {
}
/**
* Defines time interval for each attempt to send Redis commands batch
* Defines time interval for each attempt to send transaction
* if it hasn't been sent already.
* <p>
* Default is <code>1500 milliseconds</code>
@ -99,7 +99,7 @@ public class TransactionOptions {
/**
* Synchronization data timeout between Redis master participating in transaction and its slaves.
* <p>
* Default syncSlaves is <code>5000 milliseconds</code>
* Default is <code>5000 milliseconds</code>
*
* @param syncTimeout - synchronization timeout
* @param syncUnit - synchronization timeout time unit
@ -117,7 +117,7 @@ public class TransactionOptions {
return timeout;
}
/**
* If transaction hasn't committed within <code>timeout</code> it will rollback automatically.
* If transaction hasn't been committed within <code>timeout</code> it will rollback automatically.
* <p>
* Default is <code>5000 milliseconds</code>
*

@ -92,6 +92,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> BITCOUNT = new RedisStrictCommand<Long>("BITCOUNT");
RedisStrictCommand<Integer> BITPOS = new RedisStrictCommand<Integer>("BITPOS", new IntegerReplayConvertor());
RedisStrictCommand<Void> SETBIT_VOID = new RedisStrictCommand<Void>("SETBIT", new VoidReplayConvertor());
RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> SETBIT_TRUE = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor(0));
RedisStrictCommand<Boolean> SETBIT_FALSE = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor(1));
RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor());

@ -800,7 +800,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
});
}
private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetails<V, R> details,
Future<R> future, final boolean ignoreRedirect) {
details.getTimeout().cancel();
@ -808,62 +808,72 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return;
}
details.removeMainPromiseListener();
if (future.cause() instanceof RedisMovedException && !ignoreRedirect) {
RedisMovedException ex = (RedisMovedException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
try {
details.removeMainPromiseListener();
if (future.cause() instanceof RedisMovedException && !ignoreRedirect) {
RedisMovedException ex = (RedisMovedException) future.cause();
if (source.getRedirect() == Redirect.MOVED) {
details.getMainPromise().tryFailure(new RedisException("MOVED redirection loop detected. Node " + source.getAddr() + " has further redirect to " + ex.getUrl()));
return;
}
}, 1, TimeUnit.SECONDS);
AsyncDetails.release(details);
return;
}
free(details);
if (future.isSuccess()) {
R res = future.getNow();
if (res instanceof RedisClientResult) {
((RedisClientResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (isRedissonReferenceSupportEnabled()) {
handleReference(details.getMainPromise(), res);
if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
}
}, 1, TimeUnit.SECONDS);
AsyncDetails.release(details);
return;
}
free(details);
if (future.isSuccess()) {
R res = future.getNow();
if (res instanceof RedisClientResult) {
((RedisClientResult) res).setRedisClient(details.getConnectionFuture().getNow().getRedisClient());
}
if (isRedissonReferenceSupportEnabled()) {
handleReference(details.getMainPromise(), res);
} else {
details.getMainPromise().trySuccess(res);
}
} else {
details.getMainPromise().trySuccess(res);
details.getMainPromise().tryFailure(future.cause());
}
} else {
details.getMainPromise().tryFailure(future.cause());
AsyncDetails.release(details);
} catch (RuntimeException e) {
details.getMainPromise().tryFailure(e);
throw e;
}
AsyncDetails.release(details);
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {

@ -547,4 +547,9 @@ public class MasterSlaveEntry {
return slots;
}
@Override
public String toString() {
return "MasterSlaveEntry [masterEntry=" + masterEntry + "]";
}
}

@ -65,8 +65,9 @@ public class RedissonBitSetTest extends BaseTest {
@Test
public void testSet() {
RBitSet bs = redisson.getBitSet("testbitset");
bs.set(3);
bs.set(5);
assertThat(bs.set(3)).isFalse();
assertThat(bs.set(5)).isFalse();
assertThat(bs.set(5)).isTrue();
assertThat(bs.toString()).isEqualTo("{3, 5}");
BitSet bs1 = new BitSet();

@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.AfterClass;
@ -27,6 +28,7 @@ import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.ClusterNode;
import org.redisson.api.Node;
@ -36,15 +38,22 @@ import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config;
import org.redisson.connection.CRC16;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.HashValue;
@ -630,6 +639,92 @@ public class RedissonTest {
Assert.assertTrue(nodes.pingAll());
}
@Test
public void testMovedRedirectInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slot3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slot1)
.addNode(master2, slot2)
.addNode(master3, slot3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setScanInterval(100000)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RedisClientConfig cfg = new RedisClientConfig();
cfg.setAddress(process.getNodes().iterator().next().getRedisServerAddressAndPort());
RedisClient c = RedisClient.create(cfg);
RedisConnection cc = c.connect();
List<ClusterNodeInfo> cn = cc.sync(RedisCommands.CLUSTER_NODES);
cn = cn.stream().filter(i -> i.containsFlag(Flag.MASTER)).collect(Collectors.toList());
Iterator<ClusterNodeInfo> nodesIter = cn.iterator();
ClusterNodeInfo source = nodesIter.next();
ClusterNodeInfo destination = nodesIter.next();
RedisClientConfig sourceCfg = new RedisClientConfig();
sourceCfg.setAddress(source.getAddress());
RedisClient sourceClient = RedisClient.create(sourceCfg);
RedisConnection sourceConnection = sourceClient.connect();
RedisClientConfig destinationCfg = new RedisClientConfig();
destinationCfg.setAddress(destination.getAddress());
RedisClient destinationClient = RedisClient.create(destinationCfg);
RedisConnection destinationConnection = destinationClient.connect();
String key = null;
int slot = 0;
for (int i = 0; i < 100000; i++) {
key = "" + i;
slot = CRC16.crc16(key.getBytes()) % MasterSlaveConnectionManager.MAX_SLOT;
if (source.getSlotRanges().iterator().next().getStartSlot() == slot) {
break;
}
}
redisson.getBucket(key).set("123");
destinationConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "IMPORTING", source.getNodeId());
sourceConnection.sync(RedisCommands.CLUSTER_SETSLOT, source.getSlotRanges().iterator().next().getStartSlot(), "MIGRATING", destination.getNodeId());
List<String> keys = sourceConnection.sync(RedisCommands.CLUSTER_GETKEYSINSLOT, source.getSlotRanges().iterator().next().getStartSlot(), 100);
List<Object> params = new ArrayList<Object>();
params.add(destination.getAddress().getHost());
params.add(destination.getAddress().getPort());
params.add("");
params.add(0);
params.add(2000);
params.add("KEYS");
params.addAll(keys);
sourceConnection.async(RedisCommands.MIGRATE, params.toArray());
for (ClusterNodeInfo node : cn) {
RedisClientConfig cc1 = new RedisClientConfig();
cc1.setAddress(node.getAddress());
RedisClient ccc = RedisClient.create(cc1);
RedisConnection connection = ccc.connect();
connection.sync(RedisCommands.CLUSTER_SETSLOT, slot, "NODE", destination.getNodeId());
}
redisson.getBucket(key).set("123");
redisson.getBucket(key).get();
redisson.shutdown();
process.shutdown();
}
@Test(expected = RedisConnectionException.class)
public void testSingleConnectionFail() throws InterruptedException {

Loading…
Cancel
Save