add SessionManager

pull/1203/head
gongdewei 5 years ago
parent 1dd7f6c9bd
commit 907b5c8b0d

@ -40,6 +40,8 @@ import com.taobao.arthas.core.shell.ShellServerOptions;
import com.taobao.arthas.core.shell.command.CommandResolver;
import com.taobao.arthas.core.shell.handlers.BindHandler;
import com.taobao.arthas.core.shell.impl.ShellServerImpl;
import com.taobao.arthas.core.shell.session.SessionManager;
import com.taobao.arthas.core.shell.session.impl.SessionManagerImpl;
import com.taobao.arthas.core.shell.term.impl.HttpTermServer;
import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer;
import com.taobao.arthas.core.util.ArthasBanner;
@ -48,10 +50,29 @@ import com.taobao.arthas.core.util.LogUtil;
import com.taobao.arthas.core.util.UserStatUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import java.arthas.SpyAPI;
import java.io.File;
import java.io.IOException;
import java.lang.instrument.Instrumentation;
import java.lang.reflect.Method;
import java.net.URI;
import java.security.CodeSource;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author vlinux on 15/5/2.
* @author gongdewei 2020-03-25
*/
public class ArthasBootstrap {
public static final String ARTHAS_HOME_PROPERTY = "arthas.home";
@ -71,6 +92,7 @@ public class ArthasBootstrap {
private Thread shutdown;
private ShellServer shellServer;
private ScheduledExecutorService executorService;
private SessionManager sessionManager;
private TunnelClient tunnelClient;
private File arthasOutputDir;
@ -286,8 +308,12 @@ public class ArthasBootstrap {
shellServer.listen(new BindHandler(isBindRef));
//http api session manager
sessionManager = new SessionManagerImpl(options, this, shellServer.getCommandManager(), shellServer.getJobController());
logger().info("as-server listening on network={};telnet={};http={};timeout={};", configure.getIp(),
configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout());
// 异步回报启动次数
if (configure.getStatUrl() != null) {
logger().info("arthas stat url: {}", configure.getStatUrl());
@ -301,6 +327,10 @@ public class ArthasBootstrap {
if (shellServer != null) {
shellServer.close();
}
if (sessionManager != null){
sessionManager.close();
}
//shutdownWorkGroup();
throw e;
}
}
@ -337,6 +367,8 @@ public class ArthasBootstrap {
if (loggerContext != null) {
loggerContext.stop();
}
shellServer = null;
sessionManager = null;
}
/**
@ -385,6 +417,14 @@ public class ArthasBootstrap {
return tunnelClient;
}
public ShellServer getShellServer() {
return shellServer;
}
public SessionManager getSessionManager() {
return sessionManager;
}
public Timer getTimer() {
return this.timer;
}

@ -1,7 +1,8 @@
package com.taobao.arthas.core.shell.session;
import com.taobao.arthas.core.shell.ShellServer;
import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.shell.command.CommandResolver;
import com.taobao.arthas.core.shell.system.Job;
import java.lang.instrument.Instrumentation;
import java.util.List;
@ -10,6 +11,7 @@ import java.util.List;
* A shell session.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author gongdewei 2020-03-23
*/
public interface Session {
String COMMAND_MANAGER = "arthas-command-manager";
@ -22,6 +24,27 @@ public interface Session {
*/
String TTY = "tty";
/**
* Session create time
*/
String CREATE_TIME = "createTime";
/**
* Session last active time
*/
String LAST_ACCESS_TIME = "lastAccessedTime";
/**
* Command Result Distributor
*/
String RESULT_DISTRIBUTOR = "resultDistributor";
/**
* The executing foreground job
*/
String FOREGROUND_JOB = "foregroundJob";
/**
* Put some data in a session
*
@ -80,13 +103,6 @@ public interface Session {
*/
String getSessionId();
/**
* Get shell server
*
* @return shell server
*/
ShellServer getServer();
/**
* Get Java PID
*
@ -107,4 +123,49 @@ public interface Session {
* @return instrumentation instance
*/
Instrumentation getInstrumentation();
/**
* Update session last access time
* @param time new time
*/
void setLastAccessTime(long time);
/**
* Get session last access time
* @return session last access time
*/
long getLastAccessTime();
/**
* Get session create time
* @return session create time
*/
long getCreateTime();
/**
* Update session's command result distributor
* @param resultDistributor
*/
void setResultDistributor(SharingResultDistributor resultDistributor);
/**
* Get session's command result distributor
* @return
*/
SharingResultDistributor getResultDistributor();
/**
* Set the foreground job
*/
void setForegroundJob(Job job);
/**
* Get the foreground job
*/
Job getForegroundJob();
/**
* Whether the session is tty term
*/
boolean isTty();
}

@ -0,0 +1,29 @@
package com.taobao.arthas.core.shell.session;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import com.taobao.arthas.core.shell.system.impl.JobControllerImpl;
import java.lang.instrument.Instrumentation;
/**
* Arthas Session Manager
* @author gongdewei 2020-03-20
*/
public interface SessionManager {
Session createSession();
Session getSession(String sessionId);
Session removeSession(String sessionId);
void updateAccessTime(Session session);
void close();
InternalCommandManager getCommandManager();
Instrumentation getInstrumentation();
JobControllerImpl getJobController();
}

@ -1,8 +1,9 @@
package com.taobao.arthas.core.shell.session.impl;
import com.taobao.arthas.core.shell.ShellServer;
import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.shell.command.CommandResolver;
import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import java.lang.instrument.Instrumentation;
@ -21,6 +22,12 @@ public class SessionImpl implements Session {
private Map<String, Object> data = new HashMap<String, Object>();
public SessionImpl() {
long now = System.currentTimeMillis();
data.put(CREATE_TIME, now);
this.setLastAccessTime(now);
}
@Override
public Session put(String key, Object obj) {
if (obj == null) {
@ -69,11 +76,6 @@ public class SessionImpl implements Session {
return (String) data.get(ID);
}
@Override
public ShellServer getServer() {
return (ShellServer) data.get(SERVER);
}
@Override
public long getPid() {
return (Long) data.get(PID);
@ -89,4 +91,45 @@ public class SessionImpl implements Session {
public Instrumentation getInstrumentation() {
return (Instrumentation) data.get(INSTRUMENTATION);
}
@Override
public void setLastAccessTime(long time) {
data.put(LAST_ACCESS_TIME, time);
}
@Override
public long getLastAccessTime() {
return (Long)data.get(LAST_ACCESS_TIME);
}
@Override
public long getCreateTime() {
return (Long)data.get(CREATE_TIME);
}
@Override
public void setResultDistributor(SharingResultDistributor resultDistributor) {
data.put(RESULT_DISTRIBUTOR, resultDistributor);
}
@Override
public SharingResultDistributor getResultDistributor() {
return (SharingResultDistributor) data.get(RESULT_DISTRIBUTOR);
}
@Override
public void setForegroundJob(Job job) {
data.put(FOREGROUND_JOB, job);
}
@Override
public Job getForegroundJob() {
return (Job) data.get(FOREGROUND_JOB);
}
@Override
public boolean isTty() {
return get(TTY) != null;
}
}

@ -0,0 +1,193 @@
package com.taobao.arthas.core.shell.session.impl;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.distribution.impl.SharingResultDistributorImpl;
import com.taobao.arthas.core.server.ArthasBootstrap;
import com.taobao.arthas.core.shell.ShellServerOptions;
import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.session.SessionManager;
import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import com.taobao.arthas.core.shell.system.impl.JobControllerImpl;
import java.lang.instrument.Instrumentation;
import java.util.*;
import java.util.concurrent.*;
/**
* Arthas Session Manager
*
* @author gongdewei 2020-03-20
*/
public class SessionManagerImpl implements SessionManager {
private static final Logger logger = LoggerFactory.getLogger(SessionManagerImpl.class);
private final ArthasBootstrap bootstrap;
private final InternalCommandManager commandManager;
private final Instrumentation instrumentation;
private final JobControllerImpl jobController;
private final long timeoutMillis;
private final long reaperInterval;
private final Map<String, Session> sessions;
private final long pid;
private boolean closed = false;
private ScheduledExecutorService scheduledExecutorService;
public SessionManagerImpl(ShellServerOptions options, ArthasBootstrap bootstrap, InternalCommandManager commandManager,
JobControllerImpl jobController) {
this.bootstrap = bootstrap;
this.commandManager = commandManager;
this.jobController = jobController;
this.sessions = new ConcurrentHashMap<String, Session>();
this.timeoutMillis = options.getSessionTimeout();
this.reaperInterval = options.getReaperInterval();
this.instrumentation = options.getInstrumentation();
this.pid = options.getPid();
//start evict session timer
this.setEvictTimer();
}
@Override
public Session createSession() {
Session session = new SessionImpl();
session.put(Session.COMMAND_MANAGER, commandManager);
session.put(Session.INSTRUMENTATION, instrumentation);
session.put(Session.PID, pid);
//session.put(Session.SERVER, server);
//session.put(Session.TTY, term);
String sessionId = UUID.randomUUID().toString();
session.put(Session.ID, sessionId);
//Result Distributor
session.setResultDistributor(new SharingResultDistributorImpl(session));
sessions.put(sessionId, session);
return session;
}
@Override
public Session getSession(String sessionId) {
return sessions.get(sessionId);
}
@Override
public Session removeSession(String sessionId) {
return sessions.remove(sessionId);
}
@Override
public void updateAccessTime(Session session) {
session.setLastAccessTime(System.currentTimeMillis());
}
@Override
public void close() {
//TODO clear resources while shutdown arthas
closed = true;
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
ArrayList<Session> sessions = new ArrayList<Session>(this.sessions.values());
for (Session session : sessions) {
SharingResultDistributor resultDistributor = session.getResultDistributor();
resultDistributor.appendResult(new MessageModel("arthas server is going to shutdown."));
resultDistributor.close();
logger.info("Removing session before shutdown: {}, last access time: {}", session.getSessionId(), session.getLastAccessTime());
this.removeSession(session.getSessionId());
}
jobController.close();
bootstrap.destroy();
}
private synchronized void setEvictTimer() {
if (!closed && reaperInterval > 0) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread t = new Thread(r, "arthas-shell-server");
return t;
}
});
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
evictSessions();
}
}, 0, reaperInterval, TimeUnit.MILLISECONDS);
}
}
/**
* Check and remove inactive session
*/
public void evictSessions() {
long now = System.currentTimeMillis();
List<Session> toClose = new ArrayList<Session>();
for (Session session : sessions.values()) {
// do not close if there is still job running,
// e.g. trace command might wait for a long time before condition is met
//TODO check background job size
if (now - session.getLastAccessTime() > timeoutMillis && session.getForegroundJob() == null) {
toClose.add(session);
}
evictConsumers(session);
}
for (Session session : toClose) {
//interrupt foreground job
Job job = session.getForegroundJob();
if (job != null) {
job.interrupt();
}
long timeOutInMinutes = timeoutMillis / 1000 / 60;
String reason = "session is inactive for " + timeOutInMinutes + " min(s).";
session.getResultDistributor().appendResult(new MessageModel(reason));
this.removeSession(session.getSessionId());
logger.info("Removing inactive session: {}, last access time: {}", session.getSessionId(), session.getLastAccessTime());
}
}
/**
* Check and remove inactive consumer
*/
public void evictConsumers(Session session) {
SharingResultDistributor distributor = session.getResultDistributor();
if (distributor instanceof SharingResultDistributor) {
SharingResultDistributor sharingResultDistributor = (SharingResultDistributor) distributor;
List<ResultConsumer> consumers = sharingResultDistributor.getConsumers();
//remove inactive consumer from session directly
long now = System.currentTimeMillis();
for (ResultConsumer consumer : consumers) {
long inactiveTime = now - consumer.getLastAccessTime();
if (inactiveTime > 30000) {
//inactive duration must be large than pollTimeLimit
logger.info("Removing inactive consumer from session, sessionId: {}, consumerId: {}, inactive duration: {}",
session.getSessionId(), consumer.getConsumerId(), inactiveTime);
consumer.appendResult(new MessageModel("consumer is inactive for a while, please refresh the page."));
sharingResultDistributor.removeConsumer(consumer);
}
}
}
}
@Override
public InternalCommandManager getCommandManager() {
return commandManager;
}
@Override
public Instrumentation getInstrumentation() {
return instrumentation;
}
@Override
public JobControllerImpl getJobController() {
return jobController;
}
}
Loading…
Cancel
Save