Merge pull request #2179 from jchobantonov/master

UpdateValve should be installed only once, remove MessageListener from topic when SessionManager stopInternal is invoked #2178
pull/2190/head
Nikita Koksharov 6 years ago committed by GitHub
commit 4c76960ae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,10 +17,13 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -71,8 +74,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextInUse = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -269,18 +276,20 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.add(((Context) getContainer()).getName());
if (updateMode == UpdateMode.AFTER_REQUEST) {
if (updateValve == null) {
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -338,7 +347,9 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
lifecycle.fireLifecycleEvent(START_EVENT, null);
@ -367,9 +378,21 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
@Override
public void stop() throws LifecycleException {
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.remove(((Context) getContainer()).getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextInUse.isEmpty()) {
if (updateValve != null) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super();
this.manager = manager;
}
@Override
@ -52,7 +51,8 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
((RedissonSessionManager)manager).store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

@ -17,9 +17,12 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -68,8 +71,12 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextInUse = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -249,18 +256,20 @@ public class RedissonSessionManager extends ManagerBase {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.add(((Context) getContainer()).getName());
if (updateMode == UpdateMode.AFTER_REQUEST) {
if (updateValve == null) {
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -319,7 +328,9 @@ public class RedissonSessionManager extends ManagerBase {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
setState(LifecycleState.STARTING);
@ -352,9 +363,21 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.remove(((Context) getContainer()).getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextInUse.isEmpty()) {
if (updateValve != null) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super(true);
this.manager = manager;
}
@Override
@ -52,7 +51,8 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
((RedissonSessionManager)manager).store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

@ -17,9 +17,12 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -67,8 +70,12 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextInUse = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -247,18 +254,20 @@ public class RedissonSessionManager extends ManagerBase {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.add(getContext().getName());
if (updateMode == UpdateMode.AFTER_REQUEST) {
if (updateValve == null) {
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -317,7 +326,9 @@ public class RedissonSessionManager extends ManagerBase {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
setState(LifecycleState.STARTING);
@ -350,9 +361,21 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.remove(getContext().getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextInUse.isEmpty()) {
if (updateValve != null) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super(true);
this.manager = manager;
}
@Override
@ -52,7 +51,8 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
((RedissonSessionManager)manager).store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

@ -17,9 +17,12 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -67,8 +70,12 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextInUse = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -247,18 +254,20 @@ public class RedissonSessionManager extends ManagerBase {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.add(getContext().getName());
if (updateMode == UpdateMode.AFTER_REQUEST) {
if (updateValve == null) {
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -317,7 +326,9 @@ public class RedissonSessionManager extends ManagerBase {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
setState(LifecycleState.STARTING);
@ -350,9 +361,21 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextInUse.remove(getContext().getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextInUse.isEmpty()) {
if (updateValve != null) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super(true);
this.manager = manager;
}
@Override
@ -52,7 +51,8 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
((RedissonSessionManager)manager).store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

Loading…
Cancel
Save