UpdateValve should be installed only once, remove MessageListener from topic when SessionManager stopInternal is invoked #2178

pull/2179/head
Zhelyazko Chobantonov 6 years ago
parent 7ec36c23f9
commit 772e2b9dab

@ -17,10 +17,13 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -71,8 +74,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextWithInstalledValves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -271,16 +278,17 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
synchronized (pipeline) {
contextWithInstalledValves.add(((Context) getContainer()).getName());
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -338,7 +346,9 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
lifecycle.fireLifecycleEvent(START_EVENT, null);
@ -368,8 +378,20 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
@Override
public void stop() throws LifecycleException {
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextWithInstalledValves.remove(((Context) getContainer()).getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextWithInstalledValves.size() == 0) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super();
this.manager = manager;
}
@Override
@ -52,7 +51,10 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
if (manager instanceof RedissonSessionManager) {
((RedissonSessionManager)manager).store(request.getSession(false));
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

@ -17,9 +17,12 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -68,8 +71,12 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextWithInstalledValves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -251,16 +258,17 @@ public class RedissonSessionManager extends ManagerBase {
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
synchronized (pipeline) {
contextWithInstalledValves.add(((Context) getContainer()).getName());
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -319,7 +327,9 @@ public class RedissonSessionManager extends ManagerBase {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
setState(LifecycleState.STARTING);
@ -353,8 +363,20 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextWithInstalledValves.remove(((Context) getContainer()).getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextWithInstalledValves.size() == 0) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super(true);
this.manager = manager;
}
@Override
@ -52,7 +51,10 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
if (manager instanceof RedissonSessionManager) {
((RedissonSessionManager)manager).store(request.getSession(false));
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

@ -17,9 +17,12 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -67,8 +70,12 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextWithInstalledValves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -249,16 +256,17 @@ public class RedissonSessionManager extends ManagerBase {
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
synchronized (pipeline) {
contextWithInstalledValves.add(getContext().getName());
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -317,7 +325,9 @@ public class RedissonSessionManager extends ManagerBase {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
setState(LifecycleState.STARTING);
@ -351,8 +361,20 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextWithInstalledValves.remove(getContext().getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextWithInstalledValves.size() == 0) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super(true);
this.manager = manager;
}
@Override
@ -52,7 +51,10 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
if (manager instanceof RedissonSessionManager) {
((RedissonSessionManager)manager).store(request.getSession(false));
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

@ -17,9 +17,12 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.http.HttpSession;
@ -67,8 +70,12 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private static UpdateValve updateValve;
private static Set<String> contextWithInstalledValves = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private MessageListener messageListener;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
@ -249,16 +256,17 @@ public class RedissonSessionManager extends ManagerBase {
if (updateMode == UpdateMode.AFTER_REQUEST) {
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
synchronized (pipeline) {
contextWithInstalledValves.add(getContext().getName());
updateValve = new UpdateValve();
pipeline.addValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
RTopic updatesTopic = getTopic();
updatesTopic.addListener(AttributeMessage.class, new MessageListener<AttributeMessage>() {
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
@ -317,7 +325,9 @@ public class RedissonSessionManager extends ManagerBase {
log.error("Unable to handle topic message", e);
}
}
});
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
setState(LifecycleState.STARTING);
@ -351,8 +361,20 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
Pipeline pipeline = getEngine().getPipeline();
synchronized (pipeline) {
contextWithInstalledValves.remove(getContext().getName());
//remove valves when all of the RedissonSessionManagers (web apps) are not in use anymore
if (contextWithInstalledValves.size() == 0) {
pipeline.removeValve(updateValve);
updateValve = null;
}
}
}
if (messageListener != null) {
RTopic updatesTopic = getTopic();
updatesTopic.removeListener(messageListener);
}
codecToUse = null;

@ -19,6 +19,7 @@ import java.io.IOException;
import javax.servlet.ServletException;
import org.apache.catalina.Manager;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;
@ -32,11 +33,9 @@ import org.apache.catalina.valves.ValveBase;
public class UpdateValve extends ValveBase {
private static final String ALREADY_FILTERED_NOTE = UpdateValve.class.getName() + ".ALREADY_FILTERED_NOTE";
private final RedissonSessionManager manager;
public UpdateValve(RedissonSessionManager manager) {
public UpdateValve() {
super(true);
this.manager = manager;
}
@Override
@ -52,7 +51,10 @@ public class UpdateValve extends ValveBase {
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
Manager manager = request.getContext().getManager();
if (manager instanceof RedissonSessionManager) {
((RedissonSessionManager)manager).store(request.getSession(false));
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}

Loading…
Cancel
Save