From 33175d2f3eacc7bb137d503d7446cf2ac9b0f90d Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 21:45:57 +0800 Subject: [PATCH] Extract JobListener and ResultDistributor, adapt to telnet and http api --- .../core/command/model/MessageModel.java | 24 ++++ .../core/command/model/ResultModel.java | 27 ++++ .../core/command/model/StatusModel.java | 52 +++++++ .../arthas/core/command/view/MessageView.java | 14 ++ .../arthas/core/command/view/ResultView.java | 30 ++++ .../core/command/view/ResultViewResolver.java | 65 +++++++++ .../arthas/core/command/view/StatusView.java | 18 +++ .../core/distribution/ResultDistributor.java | 21 +++ .../impl/TermResultDistributorImpl.java | 36 +++++ .../taobao/arthas/core/shell/ShellServer.java | 12 ++ .../core/shell/command/CommandProcess.java | 15 ++ .../arthas/core/shell/impl/ShellImpl.java | 66 +++++++-- .../core/shell/impl/ShellServerImpl.java | 8 ++ .../taobao/arthas/core/shell/system/Job.java | 5 + .../core/shell/system/JobController.java | 11 +- .../arthas/core/shell/system/JobListener.java | 16 +++ .../system/impl/GlobalJobControllerImpl.java | 10 +- .../shell/system/impl/JobControllerImpl.java | 33 +++-- .../core/shell/system/impl/JobImpl.java | 129 ++++++++++-------- .../core/shell/system/impl/ProcessImpl.java | 90 ++++++++---- 20 files changed, 569 insertions(+), 113 deletions(-) create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/MessageModel.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/ResultModel.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/StatusModel.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/view/MessageView.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/view/ResultView.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/view/ResultViewResolver.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/view/StatusView.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/ResultDistributor.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/impl/TermResultDistributorImpl.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/system/JobListener.java diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/MessageModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/MessageModel.java new file mode 100644 index 000000000..686629008 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/MessageModel.java @@ -0,0 +1,24 @@ +package com.taobao.arthas.core.command.model; + +/** + * @author gongdewei 2020/4/2 + */ +public class MessageModel extends ResultModel { + private String message; + + public MessageModel() { + } + + public MessageModel(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + @Override + public String getType() { + return "message"; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/ResultModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/ResultModel.java new file mode 100644 index 000000000..636d9b732 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/ResultModel.java @@ -0,0 +1,27 @@ +package com.taobao.arthas.core.command.model; + +/** + * Command execute result + * + * @author gongdewei 2020-03-26 + */ +public abstract class ResultModel { + + private Integer jobId; + + /** + * Command type (name) + * + * @return + */ + public abstract String getType(); + + + public Integer getJobId() { + return jobId; + } + + public void setJobId(Integer jobId) { + this.jobId = jobId; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/StatusModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/StatusModel.java new file mode 100644 index 000000000..9af2439a0 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/StatusModel.java @@ -0,0 +1,52 @@ +package com.taobao.arthas.core.command.model; + +public class StatusModel extends ResultModel { + private int statusCode; + private String message; + + public StatusModel() { + } + + public StatusModel(int statusCode) { + this.statusCode = statusCode; + } + + public StatusModel(int statusCode, String message) { + this.statusCode = statusCode; + this.message = message; + } + + public int getStatusCode() { + return statusCode; + } + + public StatusModel setStatusCode(int statusCode) { + this.statusCode = statusCode; + return this; + } + + public String getMessage() { + return message; + } + + public StatusModel setMessage(String message) { + this.message = message; + return this; + } + + public StatusModel setStatus(int statusCode, String message) { + this.statusCode = statusCode; + this.message = message; + return this; + } + + public StatusModel setStatus(int statusCode) { + return this.setStatus(statusCode, null); + } + + @Override + public String getType() { + return "status"; + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/view/MessageView.java b/core/src/main/java/com/taobao/arthas/core/command/view/MessageView.java new file mode 100644 index 000000000..f1846113d --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/view/MessageView.java @@ -0,0 +1,14 @@ +package com.taobao.arthas.core.command.view; + +import com.taobao.arthas.core.command.model.MessageModel; +import com.taobao.arthas.core.shell.command.CommandProcess; + +/** + * @author gongdewei 2020/4/2 + */ +public class MessageView extends ResultView { + @Override + public void draw(CommandProcess process, MessageModel result) { + writeln(process, result.getMessage()); + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/view/ResultView.java b/core/src/main/java/com/taobao/arthas/core/command/view/ResultView.java new file mode 100644 index 000000000..674955dbc --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/view/ResultView.java @@ -0,0 +1,30 @@ +package com.taobao.arthas.core.command.view; + +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.shell.command.CommandProcess; + +/** + * Command result view for telnet term/tty. + * Note: Result view is a reusable and stateless instance + * + * @author gongdewei 2020/3/27 + */ +public abstract class ResultView { + + /** + * formatted printing data to term/tty + * + * @param process + */ + public abstract void draw(CommandProcess process, T result); + + /** + * write str and append a new line + * + * @param process + * @param str + */ + protected void writeln(CommandProcess process, String str) { + process.write(str).write("\n"); + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/view/ResultViewResolver.java b/core/src/main/java/com/taobao/arthas/core/command/view/ResultViewResolver.java new file mode 100644 index 000000000..d2956fb86 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/view/ResultViewResolver.java @@ -0,0 +1,65 @@ +package com.taobao.arthas.core.command.view; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.core.command.model.*; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author gongdewei 2020/3/27 + */ +public class ResultViewResolver { + private static final Logger logger = LoggerFactory.getLogger(ResultViewResolver.class); + + private Map resultViewMap = new ConcurrentHashMap(); + + private static ResultViewResolver viewResolver; + + public static ResultViewResolver getInstance() { + if (viewResolver == null) { + synchronized (ResultViewResolver.class) { + viewResolver = new ResultViewResolver(); + } + } + return viewResolver; + } + + static { + getInstance().registerResultViews(); + } + + private void registerResultViews() { + try { + registerView(new StatusModel(), new StatusView()); + registerView(new VersionModel(), new VersionView()); + registerView(new MessageModel(), new MessageView()); + registerView(new HelpModel(), new HelpView()); + //registerView(new HistoryModel(), new HistoryView()); + registerView(new EchoModel(), new EchoView()); + } catch (Throwable e) { + logger.error("register result view failed", e); + } + } + + private ResultViewResolver() { + } + +// public void registerView(Class resultClass, ResultView view) throws IllegalAccessException, InstantiationException { +// ExecResult instance = resultClass.newInstance(); +// this.registerView(instance.getType(), view); +// } + + public void registerView(T resultObject, ResultView view) { + this.registerView(resultObject.getType(), view); + } + + public void registerView(String resultType, ResultView view) { + resultViewMap.put(resultType, view); + } + + public ResultView getResultView(String resultType) { + return resultViewMap.get(resultType); + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/view/StatusView.java b/core/src/main/java/com/taobao/arthas/core/command/view/StatusView.java new file mode 100644 index 000000000..6ea60c9d0 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/view/StatusView.java @@ -0,0 +1,18 @@ +package com.taobao.arthas.core.command.view; + +import com.taobao.arthas.core.command.model.StatusModel; +import com.taobao.arthas.core.shell.command.CommandProcess; + +/** + * @author gongdewei 2020/3/27 + */ +public class StatusView extends ResultView { + + @Override + public void draw(CommandProcess process, StatusModel result) { + if (result.getMessage() != null) { + writeln(process, result.getMessage()); + } + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/ResultDistributor.java b/core/src/main/java/com/taobao/arthas/core/distribution/ResultDistributor.java new file mode 100644 index 000000000..5b6711bf7 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/ResultDistributor.java @@ -0,0 +1,21 @@ +package com.taobao.arthas.core.distribution; + +import com.taobao.arthas.core.command.model.ResultModel; + +/** + * Command result distributor, sending results to consumers who joins in the same session. + * @author gongdewei 2020-03-26 + */ +public interface ResultDistributor { + + /** + * Append the phased result to queue + * @param result a phased result of the command + */ + void appendResult(ResultModel result); + + /** + * Close result distribtor, release resources + */ + void close(); +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/impl/TermResultDistributorImpl.java b/core/src/main/java/com/taobao/arthas/core/distribution/impl/TermResultDistributorImpl.java new file mode 100644 index 000000000..c47d69e04 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/impl/TermResultDistributorImpl.java @@ -0,0 +1,36 @@ +package com.taobao.arthas.core.distribution.impl; + +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.command.view.ResultView; +import com.taobao.arthas.core.command.view.ResultViewResolver; +import com.taobao.arthas.core.distribution.ResultDistributor; +import com.taobao.arthas.core.shell.command.CommandProcess; + +/** + * Term/Tty Result Distributor + * + * @author gongdewei 2020-03-26 + */ +public class TermResultDistributorImpl implements ResultDistributor { + + private final CommandProcess commandProcess; + private final ResultViewResolver resultViewResolver; + + public TermResultDistributorImpl(CommandProcess commandProcess) { + this.commandProcess = commandProcess; + this.resultViewResolver = ResultViewResolver.getInstance(); + } + + @Override + public void appendResult(ResultModel result) { + ResultView resultView = resultViewResolver.getResultView(result.getType()); + if (resultView != null) { + resultView.draw(commandProcess, result); + } + } + + @Override + public void close() { + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/ShellServer.java b/core/src/main/java/com/taobao/arthas/core/shell/ShellServer.java index 49192c84b..b69ae4255 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/ShellServer.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/ShellServer.java @@ -5,6 +5,8 @@ import com.taobao.arthas.core.shell.future.Future; import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.NoOpHandler; import com.taobao.arthas.core.shell.impl.ShellServerImpl; +import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; +import com.taobao.arthas.core.shell.system.impl.JobControllerImpl; import com.taobao.arthas.core.shell.term.Term; import com.taobao.arthas.core.shell.term.TermServer; @@ -102,4 +104,14 @@ public abstract class ShellServer { * @param completionHandler handler for getting notified when service is stopped */ public abstract void close(Handler> completionHandler); + + /** + * @return global job controller instance + */ + public abstract JobControllerImpl getJobController(); + + /** + * @return get command manager instance + */ + public abstract InternalCommandManager getCommandManager(); } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/command/CommandProcess.java b/core/src/main/java/com/taobao/arthas/core/shell/command/CommandProcess.java index 0c2694ff8..3cdf16af3 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/command/CommandProcess.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/command/CommandProcess.java @@ -1,6 +1,7 @@ package com.taobao.arthas.core.shell.command; import com.taobao.arthas.core.advisor.AdviceListener; +import com.taobao.arthas.core.command.model.ResultModel; import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.session.Session; @@ -119,6 +120,13 @@ public interface CommandProcess extends Tty { */ void end(int status); + /** + * End the process. + * + * @param status the exit status. + */ + void end(int status, String message); + /** * Register listener @@ -167,4 +175,11 @@ public interface CommandProcess extends Tty { * Whether the process is running */ boolean isRunning(); + + /** + * Append the phased result to queue + * @param result a phased result of the command + */ + void appendResult(ResultModel result); + } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellImpl.java index cdd8bad73..a9e3ac54b 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellImpl.java @@ -7,22 +7,21 @@ import com.taobao.arthas.core.shell.ShellServer; import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliTokens; import com.taobao.arthas.core.shell.future.Future; -import com.taobao.arthas.core.shell.handlers.shell.CloseHandler; -import com.taobao.arthas.core.shell.handlers.shell.CommandManagerCompletionHandler; -import com.taobao.arthas.core.shell.handlers.shell.FutureHandler; -import com.taobao.arthas.core.shell.handlers.shell.InterruptHandler; -import com.taobao.arthas.core.shell.handlers.shell.ShellLineHandler; -import com.taobao.arthas.core.shell.handlers.shell.SuspendHandler; +import com.taobao.arthas.core.shell.handlers.shell.*; import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.session.impl.SessionImpl; import com.taobao.arthas.core.shell.system.ExecStatus; import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.JobController; +import com.taobao.arthas.core.shell.system.JobListener; import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; import com.taobao.arthas.core.shell.system.impl.JobControllerImpl; import com.taobao.arthas.core.shell.term.Term; -import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer; +import com.taobao.arthas.core.shell.term.impl.TermImpl; +import com.taobao.arthas.core.util.Constants; +import com.taobao.arthas.core.util.FileUtils; +import java.io.File; import java.lang.instrument.Instrumentation; import java.util.Date; import java.util.List; @@ -79,7 +78,7 @@ public class ShellImpl implements Shell { @Override public synchronized Job createJob(List args) { - Job job = jobController.createJob(commandManager, args, this); + Job job = jobController.createJob(commandManager, args, session, new ShellJobHandler(this), term, null); return job; } @@ -165,7 +164,7 @@ public class ShellImpl implements Shell { // sometimes an NPE will be thrown during shutdown via web-socket, // this ensures the shutdown process is finished properly // https://github.com/alibaba/arthas/issues/320 - logger.error("ARTHAS", "Error writing data:", t); + logger.error("Error writing data:", t); } term.close(); } else { @@ -180,4 +179,53 @@ public class ShellImpl implements Shell { public Job getForegroundJob() { return currentForegroundJob; } + + private class ShellJobHandler implements JobListener { + ShellImpl shell; + + public ShellJobHandler(ShellImpl shell) { + this.shell = shell; + } + + @Override + public void onForeground(Job job) { + shell.setForegroundJob(job); + //reset stdin handler to job's origin handler + //shell.term().stdinHandler(job.process().getStdinHandler()); + } + + @Override + public void onBackground(Job job) { + resetAndReadLine(); + } + + @Override + public void onTerminated(Job job) { + if (!job.isRunInBackground()){ + resetAndReadLine(); + } + + // save command history + Term term = shell.term(); + if (term instanceof TermImpl) { + List history = ((TermImpl) term).getReadline().getHistory(); + FileUtils.saveCommandHistory(history, new File(Constants.CMD_HISTORY_FILE)); + } + } + + @Override + public void onSuspend(Job job) { + if (!job.isRunInBackground()){ + resetAndReadLine(); + } + } + + private void resetAndReadLine() { + //reset stdin handler to echo handler + //shell.term().stdinHandler(null); + shell.setForegroundJob(null); + shell.readline(); + } + } + } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellServerImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellServerImpl.java index 802171fb4..0d649c919 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellServerImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/impl/ShellServerImpl.java @@ -242,4 +242,12 @@ public class ShellServerImpl extends ShellServer { bootstrap.destroy(); } } + + public JobControllerImpl getJobController() { + return jobController; + } + + public InternalCommandManager getCommandManager() { + return commandManager; + } } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/Job.java b/core/src/main/java/com/taobao/arthas/core/shell/system/Job.java index d884ba36b..e061264f2 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/system/Job.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/Job.java @@ -57,6 +57,11 @@ public interface Job { */ Job resume(); + /** + * @return true if the job is running in background + */ + boolean isRunInBackground(); + /** * Send the job to background. * diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/JobController.java b/core/src/main/java/com/taobao/arthas/core/shell/system/JobController.java index a00454865..4ba2455c0 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/system/JobController.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/JobController.java @@ -1,9 +1,11 @@ package com.taobao.arthas.core.shell.system; +import com.taobao.arthas.core.distribution.ResultDistributor; import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.handlers.Handler; -import com.taobao.arthas.core.shell.impl.ShellImpl; +import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; +import com.taobao.arthas.core.shell.term.Term; import java.util.List; import java.util.Set; @@ -33,10 +35,13 @@ public interface JobController { * * @param commandManager command manager * @param tokens the command tokens - * @param shell the current shell + * @param session the current session + * @param jobHandler job event handler + * @param term telnet term + * @param resultDistributor * @return the created job */ - Job createJob(InternalCommandManager commandManager, List tokens, ShellImpl shell); + Job createJob(InternalCommandManager commandManager, List tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor); /** * Close the controller and terminate all the underlying jobs, a closed controller does not accept anymore jobs. diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/JobListener.java b/core/src/main/java/com/taobao/arthas/core/shell/system/JobListener.java new file mode 100644 index 000000000..b3c889171 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/JobListener.java @@ -0,0 +1,16 @@ +package com.taobao.arthas.core.shell.system; + +/** + * Job listener + * @author gongdewei 2020-03-23 + */ +public interface JobListener { + + void onForeground(Job job); + + void onBackground(Job job); + + void onTerminated(Job job); + + void onSuspend(Job job); +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/GlobalJobControllerImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/GlobalJobControllerImpl.java index 8fe83ae56..31137f3bc 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/GlobalJobControllerImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/GlobalJobControllerImpl.java @@ -9,11 +9,15 @@ import java.util.concurrent.TimeUnit; import com.alibaba.arthas.deps.org.slf4j.Logger; import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; import com.taobao.arthas.core.GlobalOptions; +import com.taobao.arthas.core.distribution.ResultDistributor; import com.taobao.arthas.core.server.ArthasBootstrap; import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.handlers.Handler; -import com.taobao.arthas.core.shell.impl.ShellImpl; +import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.system.Job; +import com.taobao.arthas.core.shell.system.JobListener; +import com.taobao.arthas.core.shell.term.Term; + /** * 全局的Job Controller,不应该存在启停的概念,不需要在连接的断开时关闭, @@ -49,8 +53,8 @@ public class GlobalJobControllerImpl extends JobControllerImpl { } @Override - public Job createJob(InternalCommandManager commandManager, List tokens, ShellImpl shell) { - final Job job = super.createJob(commandManager, tokens, shell); + public Job createJob(InternalCommandManager commandManager, List tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) { + final Job job = super.createJob(commandManager, tokens, session, jobHandler, term, resultDistributor); /* * 达到超时时间将会停止job diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobControllerImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobControllerImpl.java index eedbddf7f..fee1b1675 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobControllerImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobControllerImpl.java @@ -1,6 +1,7 @@ package com.taobao.arthas.core.shell.system.impl; import com.taobao.arthas.core.GlobalOptions; +import com.taobao.arthas.core.distribution.ResultDistributor; import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.command.Command; import com.taobao.arthas.core.shell.command.internal.RedirectHandler; @@ -8,33 +9,27 @@ import com.taobao.arthas.core.shell.command.internal.StdoutHandler; import com.taobao.arthas.core.shell.command.internal.TermHandler; import com.taobao.arthas.core.shell.future.Future; import com.taobao.arthas.core.shell.handlers.Handler; -import com.taobao.arthas.core.shell.impl.ShellImpl; +import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.JobController; +import com.taobao.arthas.core.shell.system.JobListener; import com.taobao.arthas.core.shell.system.Process; import com.taobao.arthas.core.shell.system.impl.ProcessImpl.ProcessOutput; import com.taobao.arthas.core.shell.term.Term; import com.taobao.arthas.core.util.Constants; import com.taobao.arthas.core.util.LogUtil; import com.taobao.arthas.core.util.TokenUtils; - import io.termd.core.function.Function; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author Julien Viet * @author hengyunabc 2019-05-14 + * @author gongdewei 2020-03-23 */ public class JobControllerImpl implements JobController { @@ -58,16 +53,16 @@ public class JobControllerImpl implements JobController { } @Override - public Job createJob(InternalCommandManager commandManager, List tokens, ShellImpl shell) { + public Job createJob(InternalCommandManager commandManager, List tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) { int jobId = idGenerator.incrementAndGet(); StringBuilder line = new StringBuilder(); for (CliToken arg : tokens) { line.append(arg.raw()); } boolean runInBackground = runInBackground(tokens); - Process process = createProcess(tokens, commandManager, jobId, shell.term()); + Process process = createProcess(tokens, commandManager, jobId, term, resultDistributor); process.setJobId(jobId); - JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, shell); + JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, session, jobHandler); jobs.put(jobId, job); return job; } @@ -120,9 +115,10 @@ public class JobControllerImpl implements JobController { * @param commandManager command manager * @param jobId job id * @param term term + * @param resultDistributor * @return the created process */ - private Process createProcess(List line, InternalCommandManager commandManager, int jobId, Term term) { + private Process createProcess(List line, InternalCommandManager commandManager, int jobId, Term term, ResultDistributor resultDistributor) { try { ListIterator tokens = line.listIterator(); while (tokens.hasNext()) { @@ -130,7 +126,7 @@ public class JobControllerImpl implements JobController { if (token.isText()) { Command command = commandManager.getCommand(token.value()); if (command != null) { - return createCommandProcess(command, tokens, jobId, term); + return createCommandProcess(command, tokens, jobId, term, resultDistributor); } else { throw new IllegalArgumentException(token.value() + ": command not found"); } @@ -152,7 +148,7 @@ public class JobControllerImpl implements JobController { return runInBackground; } - private Process createCommandProcess(Command command, ListIterator tokens, int jobId, Term term) throws IOException { + private Process createCommandProcess(Command command, ListIterator tokens, int jobId, Term term, ResultDistributor resultDistributor) throws IOException { List remaining = new ArrayList(); List pipelineTokens = new ArrayList(); boolean isPipeline = false; @@ -199,7 +195,9 @@ public class JobControllerImpl implements JobController { } } ProcessOutput ProcessOutput = new ProcessOutput(stdoutHandlerChain, cacheLocation, term); - return new ProcessImpl(command, remaining, command.processHandler(), ProcessOutput); + ProcessImpl process = new ProcessImpl(command, remaining, command.processHandler(), ProcessOutput, resultDistributor); + process.setTty(term); + return process; } private String getRedirectFileName(ListIterator tokens) { @@ -226,4 +224,5 @@ public class JobControllerImpl implements JobController { public void close() { close(null); } + } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobImpl.java index 6f37e8632..c960f5d53 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/JobImpl.java @@ -7,20 +7,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.taobao.arthas.core.shell.future.Future; import com.taobao.arthas.core.shell.handlers.Handler; -import com.taobao.arthas.core.shell.handlers.shell.ShellForegroundUpdateHandler; -import com.taobao.arthas.core.shell.impl.ShellImpl; import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.system.ExecStatus; import com.taobao.arthas.core.shell.system.Job; +import com.taobao.arthas.core.shell.system.JobListener; import com.taobao.arthas.core.shell.system.Process; -import com.taobao.arthas.core.shell.term.Term; -import com.taobao.arthas.core.shell.term.impl.TermImpl; -import com.taobao.arthas.core.util.Constants; -import com.taobao.arthas.core.util.FileUtils; /** * @author Julien Viet * @author hengyunabc 2019-05-14 + * @author gongdewei 2020-03-23 */ public class JobImpl implements Job { @@ -28,25 +24,30 @@ public class JobImpl implements Job { final JobControllerImpl controller; final Process process; final String line; + private volatile Session session; private volatile ExecStatus actualStatus; // Used internally for testing only volatile long lastStopped; // When the job was last stopped - volatile ShellImpl shell; + volatile JobListener jobHandler; volatile Handler statusUpdateHandler; volatile Date timeoutDate; final Future terminateFuture; final AtomicBoolean runInBackground; - final Handler foregroundUpdatedHandler; + //final Handler foregroundUpdatedHandler; JobImpl(int id, final JobControllerImpl controller, Process process, String line, boolean runInBackground, - ShellImpl shell) { + Session session, JobListener jobHandler) { this.id = id; this.controller = controller; this.process = process; this.line = line; + this.session = session; this.terminateFuture = Future.future(); this.runInBackground = new AtomicBoolean(runInBackground); - this.shell = shell; - this.foregroundUpdatedHandler = new ShellForegroundUpdateHandler(shell); + this.jobHandler = jobHandler; + if (jobHandler == null) { + throw new IllegalArgumentException("JobListener is required"); + } + //this.foregroundUpdatedHandler = new ShellForegroundUpdateHandler(shell); process.terminatedHandler(new TerminatedHandler(controller)); } @@ -76,7 +77,7 @@ public class JobImpl implements Job { @Override public Session getSession() { - return shell.session(); + return session; } @Override @@ -89,19 +90,21 @@ public class JobImpl implements Job { runInBackground.set(!foreground); - if (foreground) { - if (foregroundUpdatedHandler != null) { - foregroundUpdatedHandler.handle(this); - } - } +// if (foreground) { +// if (foregroundUpdatedHandler != null) { +// foregroundUpdatedHandler.handle(this); +// } +// } if (statusUpdateHandler != null) { statusUpdateHandler.handle(process.status()); } - if (foreground) { - shell.setForegroundJob(this); - } else { - shell.setForegroundJob(null); + if (this.status() == ExecStatus.RUNNING) { + if (foreground) { + jobHandler.onForeground(this); + } else { + jobHandler.onBackground(this); + } } return this; } @@ -113,14 +116,15 @@ public class JobImpl implements Job { } catch (IllegalStateException ignore) { return this; } - if (!runInBackground.get() && foregroundUpdatedHandler != null) { - foregroundUpdatedHandler.handle(null); - } +// if (!runInBackground.get() && foregroundUpdatedHandler != null) { +// foregroundUpdatedHandler.handle(null); +// } if (statusUpdateHandler != null) { statusUpdateHandler.handle(process.status()); } - shell.setForegroundJob(null); +// shell.setForegroundJob(null); + jobHandler.onSuspend(this); return this; } @@ -147,6 +151,11 @@ public class JobImpl implements Job { return line; } + @Override + public boolean isRunInBackground() { + return runInBackground.get(); + } + @Override public Job toBackground() { if (!this.runInBackground.get()) { @@ -156,10 +165,12 @@ public class JobImpl implements Job { if (statusUpdateHandler != null) { statusUpdateHandler.handle(process.status()); } + jobHandler.onBackground(this); } } - shell.setForegroundJob(null); +// shell.setForegroundJob(null); +// jobHandler.onBackground(this); return this; } @@ -167,15 +178,16 @@ public class JobImpl implements Job { public Job toForeground() { if (this.runInBackground.get()) { if (runInBackground.compareAndSet(true, false)) { - if (foregroundUpdatedHandler != null) { - foregroundUpdatedHandler.handle(this); - } +// if (foregroundUpdatedHandler != null) { +// foregroundUpdatedHandler.handle(this); +// } process.toForeground(); if (statusUpdateHandler != null) { statusUpdateHandler.handle(process.status()); } - shell.setForegroundJob(this); +// shell.setForegroundJob(this); + jobHandler.onForeground(this); } } @@ -194,26 +206,34 @@ public class JobImpl implements Job { @Override public Job run(boolean foreground) { - if (foreground && foregroundUpdatedHandler != null) { - foregroundUpdatedHandler.handle(this); - } +// if (foreground && foregroundUpdatedHandler != null) { +// foregroundUpdatedHandler.handle(this); +// } actualStatus = ExecStatus.RUNNING; if (statusUpdateHandler != null) { statusUpdateHandler.handle(ExecStatus.RUNNING); } - process.setTty(shell.term()); - process.setSession(shell.session()); + //set process's tty in JobControllerImpl.createCommandProcess + //process.setTty(shell.term()); + process.setSession(this.session); process.run(foreground); - if (!foreground && foregroundUpdatedHandler != null) { - foregroundUpdatedHandler.handle(null); - } - - if (foreground) { - shell.setForegroundJob(this); - } else { - shell.setForegroundJob(null); +// if (!foreground && foregroundUpdatedHandler != null) { +// foregroundUpdatedHandler.handle(null); +// } +// +// if (foreground) { +// shell.setForegroundJob(this); +// } else { +// shell.setForegroundJob(null); +// } + if (this.status() == ExecStatus.RUNNING) { + if (foreground) { + jobHandler.onForeground(this); + } else { + jobHandler.onBackground(this); + } } return this; } @@ -228,24 +248,25 @@ public class JobImpl implements Job { @Override public void handle(Integer exitCode) { - if (!runInBackground.get() && actualStatus.equals(ExecStatus.RUNNING)) { +// if (!runInBackground.get() && actualStatus.equals(ExecStatus.RUNNING)) { // 只有前台在运行的任务,才需要调用foregroundUpdateHandler - if (foregroundUpdatedHandler != null) { - foregroundUpdatedHandler.handle(null); - } - } +// if (foregroundUpdatedHandler != null) { +// foregroundUpdatedHandler.handle(null); +// } +// } + jobHandler.onTerminated(JobImpl.this); controller.removeJob(JobImpl.this.id); if (statusUpdateHandler != null) { statusUpdateHandler.handle(ExecStatus.TERMINATED); } terminateFuture.complete(); - // save command history - Term term = shell.term(); - if (term instanceof TermImpl) { - List history = ((TermImpl) term).getReadline().getHistory(); - FileUtils.saveCommandHistory(history, new File(Constants.CMD_HISTORY_FILE)); - } + // save command history (move to JobControllerImpl.ShellJobHandler.onTerminated) +// Term term = shell.term(); +// if (term instanceof TermImpl) { +// List history = ((TermImpl) term).getReadline().getHistory(); +// FileUtils.saveCommandHistory(history, new File(Constants.CMD_HISTORY_FILE)); +// } } } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/ProcessImpl.java b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/ProcessImpl.java index b09711172..249bcf7ad 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/system/impl/ProcessImpl.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/system/impl/ProcessImpl.java @@ -4,6 +4,11 @@ import com.alibaba.arthas.deps.org.slf4j.Logger; import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; import com.taobao.arthas.core.advisor.AdviceListener; import com.taobao.arthas.core.advisor.AdviceWeaver; +import com.taobao.arthas.core.command.basic1000.HelpCommand; +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.command.model.StatusModel; +import com.taobao.arthas.core.distribution.ResultDistributor; +import com.taobao.arthas.core.distribution.impl.TermResultDistributorImpl; import com.taobao.arthas.core.server.ArthasBootstrap; import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.command.Command; @@ -16,12 +21,8 @@ import com.taobao.arthas.core.shell.system.ExecStatus; import com.taobao.arthas.core.shell.system.Process; import com.taobao.arthas.core.shell.system.ProcessAware; import com.taobao.arthas.core.shell.term.Tty; -import com.taobao.arthas.core.util.usage.StyledUsageFormatter; import com.taobao.middleware.cli.CLIException; import com.taobao.middleware.cli.CommandLine; -import com.taobao.middleware.cli.UsageMessageFormatter; -import com.taobao.text.Color; - import io.termd.core.function.Function; import java.lang.instrument.ClassFileTransformer; @@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * @author beiwei30 on 10/11/2016. + * @author gongdewei 2020-03-26 */ public class ProcessImpl implements Process { @@ -55,16 +57,18 @@ public class ProcessImpl implements Process { private Handler stdinHandler; private Handler resizeHandler; private Integer exitCode; - private CommandProcess process; + private CommandProcessImpl process; private Date startTime; private ProcessOutput processOutput; private int jobId; + private ResultDistributor resultDistributor; public ProcessImpl(Command commandContext, List args, Handler handler, - ProcessOutput processOutput) { + ProcessOutput processOutput, ResultDistributor resultDistributor) { this.commandContext = commandContext; this.handler = handler; this.args = args; + this.resultDistributor = resultDistributor; this.processStatus = ExecStatus.READY; this.processOutput = processOutput; } @@ -236,13 +240,15 @@ public class ProcessImpl implements Process { @Override public void terminate(Handler completionHandler) { - if (!terminate(-10, completionHandler)) { + if (!terminate(-10, completionHandler, null)) { throw new IllegalStateException("Cannot terminate terminated process"); } } - private synchronized boolean terminate(int exitCode, Handler completionHandler) { + private synchronized boolean terminate(int exitCode, Handler completionHandler, String message) { if (processStatus != ExecStatus.TERMINATED) { + //add status message + this.appendResult(new StatusModel(exitCode, message)); if (process != null) { processOutput.close(); } @@ -256,6 +262,13 @@ public class ProcessImpl implements Process { } } + private void appendResult(ResultModel result) { + result.setJobId(jobId); + if (resultDistributor != null) { + resultDistributor.appendResult(result); + } + } + private void updateStatus(ExecStatus statusUpdate, Integer exitCodeUpdate, boolean foregroundUpdate, Handler handler, Handler terminatedHandler, Handler completionHandler) { @@ -320,6 +333,11 @@ public class ProcessImpl implements Process { throw new IllegalStateException("Cannot execute process without a TTY set"); } + process = new CommandProcessImpl(this, tty); + if (resultDistributor == null) { + resultDistributor = new TermResultDistributorImpl(process); + } + final List args2 = new LinkedList(); for (CliToken arg : args) { if (arg.isText()) { @@ -331,25 +349,20 @@ public class ProcessImpl implements Process { try { if (commandContext.cli() != null) { if (commandContext.cli().parse(args2, false).isAskingForHelp()) { - UsageMessageFormatter formatter = new StyledUsageFormatter(Color.green); - formatter.setWidth(tty.width()); - StringBuilder usage = new StringBuilder(); - commandContext.cli().usage(usage, formatter); - usage.append('\n'); - tty.write(usage.toString()); + appendResult(new HelpCommand().createHelpDetailModel(commandContext)); terminate(); return; } cl = commandContext.cli().parse(args2); + process.setArgs2(args2); + process.setCommandLine(cl); } } catch (CLIException e) { - tty.write(e.getMessage() + "\n"); - terminate(); + terminate(-10, null, e.getMessage()); return; } - process = new CommandProcessImpl(this, args2, tty, cl); if (cacheLocation() != null) { process.echoTips("job id : " + this.jobId + "\n"); process.echoTips("cache location : " + cacheLocation() + "\n"); @@ -372,27 +385,25 @@ public class ProcessImpl implements Process { handler.handle(process); } catch (Throwable t) { logger.error("Error during processing the command:", t); - process.write("Error during processing the command, exception type: " + t.getClass().getName() + ", message:" + t.getMessage() - + ", please check $HOME/logs/arthas/arthas.log for more details. \n"); - terminate(1, null); + process.end(1, "Error during processing the command: " + t.getClass().getName() + ", message:" + t.getMessage() + + ", please check $HOME/logs/arthas/arthas.log for more details." ); } } } private class CommandProcessImpl implements CommandProcess { + private final Process process; - private final List args2; private final Tty tty; - private final CommandLine commandLine; + private List args2; + private CommandLine commandLine; private AtomicInteger times = new AtomicInteger(); private AdviceListener listener = null; private ClassFileTransformer transformer; - public CommandProcessImpl(Process process, List args2, Tty tty, CommandLine commandLine) { + public CommandProcessImpl(Process process, Tty tty) { this.process = process; - this.args2 = args2; this.tty = tty; - this.commandLine = commandLine; } @Override @@ -440,6 +451,14 @@ public class ProcessImpl implements Process { return times; } + public void setArgs2(List args2) { + this.args2 = args2; + } + + public void setCommandLine(CommandLine commandLine) { + this.commandLine = commandLine; + } + @Override public CommandProcess stdinHandler(Handler handler) { stdinHandler = handler; @@ -578,13 +597,27 @@ public class ProcessImpl implements Process { @Override public void end(int statusCode) { - terminate(statusCode, null); + end(statusCode, null); + } + + @Override + public void end(int statusCode, String message) { + terminate(statusCode, null, message); } @Override public boolean isRunning() { return processStatus == ExecStatus.RUNNING; } + + @Override + public void appendResult(ResultModel result) { + if (processStatus != ExecStatus.RUNNING) { + throw new IllegalStateException( + "Cannot write to standard output when " + status().name().toLowerCase()); + } + ProcessImpl.this.appendResult(result); + } } static class ProcessOutput { @@ -620,7 +653,10 @@ public class ProcessImpl implements Process { private void write(String data) { if (stdoutHandlerChain != null) { - for (Function function : stdoutHandlerChain) { + //hotspot, reduce memory fragment (foreach/iterator) + int size = stdoutHandlerChain.size(); + for (int i = 0; i < size; i++) { + Function function = stdoutHandlerChain.get(i); data = function.apply(data); } }