fix session manager error

pull/1490/head
gongdewei 3 years ago
parent 6a8c7acd11
commit 62b9e8278b

@ -416,7 +416,7 @@ public class ChannelRequestHandler implements ChannelClient.RequestListener {
private void tryCloseOneTimeSession(Session session) { private void tryCloseOneTimeSession(Session session) {
if (session.get(ONETIME_SESSION_KEY) != null) { if (session.get(ONETIME_SESSION_KEY) != null) {
sessionManager.closeSession(session.getSessionId()); sessionManager.removeSession(session.getSessionId());
} }
} }
@ -554,7 +554,7 @@ public class ChannelRequestHandler implements ChannelClient.RequestListener {
// } // }
private void processCloseSession(ActionRequest request, Session session) { private void processCloseSession(ActionRequest request, Session session) {
sessionManager.closeSession(session.getSessionId()); sessionManager.removeSession(session.getSessionId());
ActionResponse.Builder response = createResponse(request, session, ResponseStatus.SUCCEEDED); ActionResponse.Builder response = createResponse(request, session, ResponseStatus.SUCCEEDED);
sendResponse(response); sendResponse(response);
} }

@ -15,7 +15,7 @@ public interface SessionManager {
Session getSession(String sessionId); Session getSession(String sessionId);
Session closeSession(String sessionId); Session removeSession(String sessionId);
void updateAccessTime(Session session); void updateAccessTime(Session session);

@ -4,6 +4,7 @@ import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.command.model.MessageModel; import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.distribution.ResultConsumer; import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.distribution.SharingResultDistributor; import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.shell.ShellServerOptions; import com.taobao.arthas.core.shell.ShellServerOptions;
import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.session.Session;
@ -13,8 +14,15 @@ import com.taobao.arthas.core.shell.system.JobController;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import java.lang.instrument.Instrumentation; import java.lang.instrument.Instrumentation;
import java.util.*; import java.util.ArrayList;
import java.util.concurrent.*; import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/** /**
* Arthas Session Manager * Arthas Session Manager
@ -82,7 +90,7 @@ public class SessionManagerImpl implements SessionManager {
job.interrupt(); job.interrupt();
} }
SharingResultDistributor resultDistributor = session.getResultDistributor(); ResultDistributor resultDistributor = session.getResultDistributor();
if (resultDistributor != null) { if (resultDistributor != null) {
resultDistributor.close(); resultDistributor.close();
} }
@ -105,7 +113,7 @@ public class SessionManagerImpl implements SessionManager {
ArrayList<Session> sessions = new ArrayList<Session>(this.sessions.values()); ArrayList<Session> sessions = new ArrayList<Session>(this.sessions.values());
for (Session session : sessions) { for (Session session : sessions) {
SharingResultDistributor resultDistributor = session.getResultDistributor(); ResultDistributor resultDistributor = session.getResultDistributor();
if (resultDistributor != null) { if (resultDistributor != null) {
resultDistributor.appendResult(new MessageModel("arthas server is going to shutdown.")); resultDistributor.appendResult(new MessageModel("arthas server is going to shutdown."));
} }
@ -159,7 +167,7 @@ public class SessionManagerImpl implements SessionManager {
} }
long timeOutInMinutes = sessionTimeoutMillis / 1000 / 60; long timeOutInMinutes = sessionTimeoutMillis / 1000 / 60;
String reason = "session is inactive for " + timeOutInMinutes + " min(s)."; String reason = "session is inactive for " + timeOutInMinutes + " min(s).";
SharingResultDistributor resultDistributor = session.getResultDistributor(); ResultDistributor resultDistributor = session.getResultDistributor();
if (resultDistributor != null) { if (resultDistributor != null) {
resultDistributor.appendResult(new MessageModel(reason)); resultDistributor.appendResult(new MessageModel(reason));
} }
@ -172,9 +180,10 @@ public class SessionManagerImpl implements SessionManager {
* Check and remove inactive consumer * Check and remove inactive consumer
*/ */
public void evictConsumers(Session session) { public void evictConsumers(Session session) {
SharingResultDistributor distributor = session.getResultDistributor(); ResultDistributor distributor = session.getResultDistributor();
if (distributor != null) { if (distributor instanceof SharingResultDistributor) {
List<ResultConsumer> consumers = distributor.getConsumers(); SharingResultDistributor sharingResultDistributor = (SharingResultDistributor) distributor;
List<ResultConsumer> consumers = sharingResultDistributor.getConsumers();
//remove inactive consumer from session directly //remove inactive consumer from session directly
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
for (ResultConsumer consumer : consumers) { for (ResultConsumer consumer : consumers) {
@ -184,10 +193,13 @@ public class SessionManagerImpl implements SessionManager {
logger.info("Removing inactive consumer from session, sessionId: {}, consumerId: {}, inactive duration: {}", logger.info("Removing inactive consumer from session, sessionId: {}, consumerId: {}, inactive duration: {}",
session.getSessionId(), consumer.getConsumerId(), inactiveTime); session.getSessionId(), consumer.getConsumerId(), inactiveTime);
consumer.appendResult(new MessageModel("consumer is inactive for a while, please refresh the page.")); consumer.appendResult(new MessageModel("consumer is inactive for a while, please refresh the page."));
distributor.removeConsumer(consumer); sharingResultDistributor.removeConsumer(consumer);
} }
} }
} }
if (distributor != null) {
distributor.close();
}
} }
@Override @Override

@ -5,7 +5,12 @@ import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.taobao.arthas.common.ArthasConstants; import com.taobao.arthas.common.ArthasConstants;
import com.taobao.arthas.common.PidUtils; import com.taobao.arthas.common.PidUtils;
import com.taobao.arthas.core.command.model.*; import com.taobao.arthas.core.command.model.CommandRequestModel;
import com.taobao.arthas.core.command.model.InputStatus;
import com.taobao.arthas.core.command.model.InputStatusModel;
import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.command.model.WelcomeModel;
import com.taobao.arthas.core.config.Configure; import com.taobao.arthas.core.config.Configure;
import com.taobao.arthas.core.distribution.PackingResultDistributor; import com.taobao.arthas.core.distribution.PackingResultDistributor;
import com.taobao.arthas.core.distribution.ResultConsumer; import com.taobao.arthas.core.distribution.ResultConsumer;
@ -37,7 +42,12 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.termd.core.function.Function; import io.termd.core.function.Function;
@ -379,7 +389,7 @@ public class HttpApiHandler {
} }
private ApiResponse processCloseSessionRequest(ApiRequest apiRequest, Session session) { private ApiResponse processCloseSessionRequest(ApiRequest apiRequest, Session session) {
sessionManager.closeSession(session.getSessionId()); sessionManager.removeSession(session.getSessionId());
ApiResponse response = new ApiResponse(); ApiResponse response = new ApiResponse();
response.setState(ApiState.SUCCEEDED); response.setState(ApiState.SUCCEEDED);
return response; return response;
@ -473,7 +483,7 @@ public class HttpApiHandler {
return response; return response;
} finally { } finally {
if (oneTimeAccess) { if (oneTimeAccess) {
sessionManager.closeSession(session.getSessionId()); sessionManager.removeSession(session.getSessionId());
} }
} }
} }

Loading…
Cancel
Save