From 907b5c8b0dc2ff248563529121ffc6367910e006 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 21:37:06 +0800 Subject: [PATCH] add SessionManager --- .../arthas/core/server/ArthasBootstrap.java | 40 ++++ .../arthas/core/shell/session/Session.java | 77 ++++++- .../core/shell/session/SessionManager.java | 29 +++ .../core/shell/session/impl/SessionImpl.java | 55 ++++- .../session/impl/SessionManagerImpl.java | 193 ++++++++++++++++++ 5 files changed, 380 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/session/SessionManager.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionManagerImpl.java diff --git a/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java b/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java index e30bf900b..6aa9b6fe3 100644 --- a/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java +++ b/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java @@ -40,6 +40,8 @@ import com.taobao.arthas.core.shell.ShellServerOptions; import com.taobao.arthas.core.shell.command.CommandResolver; import com.taobao.arthas.core.shell.handlers.BindHandler; import com.taobao.arthas.core.shell.impl.ShellServerImpl; +import com.taobao.arthas.core.shell.session.SessionManager; +import com.taobao.arthas.core.shell.session.impl.SessionManagerImpl; import com.taobao.arthas.core.shell.term.impl.HttpTermServer; import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer; import com.taobao.arthas.core.util.ArthasBanner; @@ -48,10 +50,29 @@ import com.taobao.arthas.core.util.LogUtil; import com.taobao.arthas.core.util.UserStatUtil; import io.netty.channel.ChannelFuture; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.EventExecutorGroup; + +import java.arthas.SpyAPI; +import java.io.File; +import java.io.IOException; +import java.lang.instrument.Instrumentation; +import java.lang.reflect.Method; +import java.net.URI; +import java.security.CodeSource; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + /** * @author vlinux on 15/5/2. + * @author gongdewei 2020-03-25 */ public class ArthasBootstrap { public static final String ARTHAS_HOME_PROPERTY = "arthas.home"; @@ -71,6 +92,7 @@ public class ArthasBootstrap { private Thread shutdown; private ShellServer shellServer; private ScheduledExecutorService executorService; + private SessionManager sessionManager; private TunnelClient tunnelClient; private File arthasOutputDir; @@ -286,8 +308,12 @@ public class ArthasBootstrap { shellServer.listen(new BindHandler(isBindRef)); + //http api session manager + sessionManager = new SessionManagerImpl(options, this, shellServer.getCommandManager(), shellServer.getJobController()); + logger().info("as-server listening on network={};telnet={};http={};timeout={};", configure.getIp(), configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout()); + // 异步回报启动次数 if (configure.getStatUrl() != null) { logger().info("arthas stat url: {}", configure.getStatUrl()); @@ -301,6 +327,10 @@ public class ArthasBootstrap { if (shellServer != null) { shellServer.close(); } + if (sessionManager != null){ + sessionManager.close(); + } + //shutdownWorkGroup(); throw e; } } @@ -337,6 +367,8 @@ public class ArthasBootstrap { if (loggerContext != null) { loggerContext.stop(); } + shellServer = null; + sessionManager = null; } /** @@ -385,6 +417,14 @@ public class ArthasBootstrap { return tunnelClient; } + public ShellServer getShellServer() { + return shellServer; + } + + public SessionManager getSessionManager() { + return sessionManager; + } + public Timer getTimer() { return this.timer; } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/session/Session.java b/core/src/main/java/com/taobao/arthas/core/shell/session/Session.java index 5242a684e..8ca1c902e 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/session/Session.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/session/Session.java @@ -1,7 +1,8 @@ package com.taobao.arthas.core.shell.session; -import com.taobao.arthas.core.shell.ShellServer; +import com.taobao.arthas.core.distribution.SharingResultDistributor; import com.taobao.arthas.core.shell.command.CommandResolver; +import com.taobao.arthas.core.shell.system.Job; import java.lang.instrument.Instrumentation; import java.util.List; @@ -10,6 +11,7 @@ import java.util.List; * A shell session. * * @author Julien Viet + * @author gongdewei 2020-03-23 */ public interface Session { String COMMAND_MANAGER = "arthas-command-manager"; @@ -22,6 +24,27 @@ public interface Session { */ String TTY = "tty"; + /** + * Session create time + */ + String CREATE_TIME = "createTime"; + + /** + * Session last active time + */ + String LAST_ACCESS_TIME = "lastAccessedTime"; + + /** + * Command Result Distributor + */ + String RESULT_DISTRIBUTOR = "resultDistributor"; + + /** + * The executing foreground job + */ + String FOREGROUND_JOB = "foregroundJob"; + + /** * Put some data in a session * @@ -80,13 +103,6 @@ public interface Session { */ String getSessionId(); - /** - * Get shell server - * - * @return shell server - */ - ShellServer getServer(); - /** * Get Java PID * @@ -107,4 +123,49 @@ public interface Session { * @return instrumentation instance */ Instrumentation getInstrumentation(); + + /** + * Update session last access time + * @param time new time + */ + void setLastAccessTime(long time); + + /** + * Get session last access time + * @return session last access time + */ + long getLastAccessTime(); + + /** + * Get session create time + * @return session create time + */ + long getCreateTime(); + + /** + * Update session's command result distributor + * @param resultDistributor + */ + void setResultDistributor(SharingResultDistributor resultDistributor); + + /** + * Get session's command result distributor + * @return + */ + SharingResultDistributor getResultDistributor(); + + /** + * Set the foreground job + */ + void setForegroundJob(Job job); + + /** + * Get the foreground job + */ + Job getForegroundJob(); + + /** + * Whether the session is tty term + */ + boolean isTty(); } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/session/SessionManager.java b/core/src/main/java/com/taobao/arthas/core/shell/session/SessionManager.java new file mode 100644 index 000000000..e5198aee1 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/session/SessionManager.java @@ -0,0 +1,29 @@ +package com.taobao.arthas.core.shell.session; + +import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; +import com.taobao.arthas.core.shell.system.impl.JobControllerImpl; + +import java.lang.instrument.Instrumentation; + +/** + * Arthas Session Manager + * @author gongdewei 2020-03-20 + */ +public interface SessionManager { + + Session createSession(); + + Session getSession(String sessionId); + + Session removeSession(String sessionId); + + void updateAccessTime(Session session); + + void close(); + + InternalCommandManager getCommandManager(); + + Instrumentation getInstrumentation(); + + JobControllerImpl getJobController(); +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionImpl.java index a0a10497c..2bf8bca6f 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionImpl.java @@ -1,8 +1,9 @@ package com.taobao.arthas.core.shell.session.impl; -import com.taobao.arthas.core.shell.ShellServer; +import com.taobao.arthas.core.distribution.SharingResultDistributor; import com.taobao.arthas.core.shell.command.CommandResolver; import com.taobao.arthas.core.shell.session.Session; +import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; import java.lang.instrument.Instrumentation; @@ -21,6 +22,12 @@ public class SessionImpl implements Session { private Map data = new HashMap(); + public SessionImpl() { + long now = System.currentTimeMillis(); + data.put(CREATE_TIME, now); + this.setLastAccessTime(now); + } + @Override public Session put(String key, Object obj) { if (obj == null) { @@ -69,11 +76,6 @@ public class SessionImpl implements Session { return (String) data.get(ID); } - @Override - public ShellServer getServer() { - return (ShellServer) data.get(SERVER); - } - @Override public long getPid() { return (Long) data.get(PID); @@ -89,4 +91,45 @@ public class SessionImpl implements Session { public Instrumentation getInstrumentation() { return (Instrumentation) data.get(INSTRUMENTATION); } + + @Override + public void setLastAccessTime(long time) { + data.put(LAST_ACCESS_TIME, time); + } + + @Override + public long getLastAccessTime() { + return (Long)data.get(LAST_ACCESS_TIME); + } + + @Override + public long getCreateTime() { + return (Long)data.get(CREATE_TIME); + } + + @Override + public void setResultDistributor(SharingResultDistributor resultDistributor) { + data.put(RESULT_DISTRIBUTOR, resultDistributor); + } + + @Override + public SharingResultDistributor getResultDistributor() { + return (SharingResultDistributor) data.get(RESULT_DISTRIBUTOR); + } + + @Override + public void setForegroundJob(Job job) { + data.put(FOREGROUND_JOB, job); + } + + @Override + public Job getForegroundJob() { + return (Job) data.get(FOREGROUND_JOB); + } + + @Override + public boolean isTty() { + return get(TTY) != null; + } + } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionManagerImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionManagerImpl.java new file mode 100644 index 000000000..22ec12e62 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/session/impl/SessionManagerImpl.java @@ -0,0 +1,193 @@ +package com.taobao.arthas.core.shell.session.impl; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.core.command.model.MessageModel; +import com.taobao.arthas.core.distribution.ResultConsumer; +import com.taobao.arthas.core.distribution.SharingResultDistributor; +import com.taobao.arthas.core.distribution.impl.SharingResultDistributorImpl; +import com.taobao.arthas.core.server.ArthasBootstrap; +import com.taobao.arthas.core.shell.ShellServerOptions; +import com.taobao.arthas.core.shell.session.Session; +import com.taobao.arthas.core.shell.session.SessionManager; +import com.taobao.arthas.core.shell.system.Job; +import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; +import com.taobao.arthas.core.shell.system.impl.JobControllerImpl; + +import java.lang.instrument.Instrumentation; +import java.util.*; +import java.util.concurrent.*; + +/** + * Arthas Session Manager + * + * @author gongdewei 2020-03-20 + */ +public class SessionManagerImpl implements SessionManager { + private static final Logger logger = LoggerFactory.getLogger(SessionManagerImpl.class); + private final ArthasBootstrap bootstrap; + private final InternalCommandManager commandManager; + private final Instrumentation instrumentation; + private final JobControllerImpl jobController; + private final long timeoutMillis; + private final long reaperInterval; + private final Map sessions; + private final long pid; + private boolean closed = false; + private ScheduledExecutorService scheduledExecutorService; + + public SessionManagerImpl(ShellServerOptions options, ArthasBootstrap bootstrap, InternalCommandManager commandManager, + JobControllerImpl jobController) { + this.bootstrap = bootstrap; + this.commandManager = commandManager; + this.jobController = jobController; + this.sessions = new ConcurrentHashMap(); + this.timeoutMillis = options.getSessionTimeout(); + this.reaperInterval = options.getReaperInterval(); + this.instrumentation = options.getInstrumentation(); + this.pid = options.getPid(); + //start evict session timer + this.setEvictTimer(); + } + + + @Override + public Session createSession() { + Session session = new SessionImpl(); + session.put(Session.COMMAND_MANAGER, commandManager); + session.put(Session.INSTRUMENTATION, instrumentation); + session.put(Session.PID, pid); + //session.put(Session.SERVER, server); + //session.put(Session.TTY, term); + String sessionId = UUID.randomUUID().toString(); + session.put(Session.ID, sessionId); + + //Result Distributor + session.setResultDistributor(new SharingResultDistributorImpl(session)); + + sessions.put(sessionId, session); + return session; + } + + @Override + public Session getSession(String sessionId) { + return sessions.get(sessionId); + } + + @Override + public Session removeSession(String sessionId) { + return sessions.remove(sessionId); + } + + @Override + public void updateAccessTime(Session session) { + session.setLastAccessTime(System.currentTimeMillis()); + } + + @Override + public void close() { + //TODO clear resources while shutdown arthas + closed = true; + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + + ArrayList sessions = new ArrayList(this.sessions.values()); + for (Session session : sessions) { + SharingResultDistributor resultDistributor = session.getResultDistributor(); + resultDistributor.appendResult(new MessageModel("arthas server is going to shutdown.")); + resultDistributor.close(); + logger.info("Removing session before shutdown: {}, last access time: {}", session.getSessionId(), session.getLastAccessTime()); + this.removeSession(session.getSessionId()); + } + + jobController.close(); + bootstrap.destroy(); + } + + private synchronized void setEvictTimer() { + if (!closed && reaperInterval > 0) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = new Thread(r, "arthas-shell-server"); + return t; + } + }); + scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + evictSessions(); + } + }, 0, reaperInterval, TimeUnit.MILLISECONDS); + } + } + + /** + * Check and remove inactive session + */ + public void evictSessions() { + long now = System.currentTimeMillis(); + List toClose = new ArrayList(); + for (Session session : sessions.values()) { + // do not close if there is still job running, + // e.g. trace command might wait for a long time before condition is met + //TODO check background job size + if (now - session.getLastAccessTime() > timeoutMillis && session.getForegroundJob() == null) { + toClose.add(session); + } + evictConsumers(session); + } + for (Session session : toClose) { + //interrupt foreground job + Job job = session.getForegroundJob(); + if (job != null) { + job.interrupt(); + } + long timeOutInMinutes = timeoutMillis / 1000 / 60; + String reason = "session is inactive for " + timeOutInMinutes + " min(s)."; + session.getResultDistributor().appendResult(new MessageModel(reason)); + this.removeSession(session.getSessionId()); + logger.info("Removing inactive session: {}, last access time: {}", session.getSessionId(), session.getLastAccessTime()); + } + } + + /** + * Check and remove inactive consumer + */ + public void evictConsumers(Session session) { + SharingResultDistributor distributor = session.getResultDistributor(); + if (distributor instanceof SharingResultDistributor) { + SharingResultDistributor sharingResultDistributor = (SharingResultDistributor) distributor; + List consumers = sharingResultDistributor.getConsumers(); + //remove inactive consumer from session directly + long now = System.currentTimeMillis(); + for (ResultConsumer consumer : consumers) { + long inactiveTime = now - consumer.getLastAccessTime(); + if (inactiveTime > 30000) { + //inactive duration must be large than pollTimeLimit + logger.info("Removing inactive consumer from session, sessionId: {}, consumerId: {}, inactive duration: {}", + session.getSessionId(), consumer.getConsumerId(), inactiveTime); + consumer.appendResult(new MessageModel("consumer is inactive for a while, please refresh the page.")); + sharingResultDistributor.removeConsumer(consumer); + } + } + } + } + + @Override + public InternalCommandManager getCommandManager() { + return commandManager; + } + + @Override + public Instrumentation getInstrumentation() { + return instrumentation; + } + + @Override + public JobControllerImpl getJobController() { + return jobController; + } +}