Extract JobListener and ResultDistributor, adapt to telnet and http api

pull/1203/head
gongdewei
parent 907b5c8b0d
commit 33175d2f3e

@ -0,0 +1,24 @@
package com.taobao.arthas.core.command.model;
/**
* @author gongdewei 2020/4/2
*/
public class MessageModel extends ResultModel {
private String message;
public MessageModel() {
}
public MessageModel(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
@Override
public String getType() {
return "message";
}
}

@ -0,0 +1,27 @@
package com.taobao.arthas.core.command.model;
/**
* Command execute result
*
* @author gongdewei 2020-03-26
*/
public abstract class ResultModel {
private Integer jobId;
/**
* Command type (name)
*
* @return
*/
public abstract String getType();
public Integer getJobId() {
return jobId;
}
public void setJobId(Integer jobId) {
this.jobId = jobId;
}
}

@ -0,0 +1,52 @@
package com.taobao.arthas.core.command.model;
public class StatusModel extends ResultModel {
private int statusCode;
private String message;
public StatusModel() {
}
public StatusModel(int statusCode) {
this.statusCode = statusCode;
}
public StatusModel(int statusCode, String message) {
this.statusCode = statusCode;
this.message = message;
}
public int getStatusCode() {
return statusCode;
}
public StatusModel setStatusCode(int statusCode) {
this.statusCode = statusCode;
return this;
}
public String getMessage() {
return message;
}
public StatusModel setMessage(String message) {
this.message = message;
return this;
}
public StatusModel setStatus(int statusCode, String message) {
this.statusCode = statusCode;
this.message = message;
return this;
}
public StatusModel setStatus(int statusCode) {
return this.setStatus(statusCode, null);
}
@Override
public String getType() {
return "status";
}
}

@ -0,0 +1,14 @@
package com.taobao.arthas.core.command.view;
import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.shell.command.CommandProcess;
/**
* @author gongdewei 2020/4/2
*/
public class MessageView extends ResultView<MessageModel> {
@Override
public void draw(CommandProcess process, MessageModel result) {
writeln(process, result.getMessage());
}
}

@ -0,0 +1,30 @@
package com.taobao.arthas.core.command.view;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.shell.command.CommandProcess;
/**
* Command result view for telnet term/tty.
* Note: Result view is a reusable and stateless instance
*
* @author gongdewei 2020/3/27
*/
public abstract class ResultView<T extends ResultModel> {
/**
* formatted printing data to term/tty
*
* @param process
*/
public abstract void draw(CommandProcess process, T result);
/**
* write str and append a new line
*
* @param process
* @param str
*/
protected void writeln(CommandProcess process, String str) {
process.write(str).write("\n");
}
}

@ -0,0 +1,65 @@
package com.taobao.arthas.core.command.view;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.command.model.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author gongdewei 2020/3/27
*/
public class ResultViewResolver {
private static final Logger logger = LoggerFactory.getLogger(ResultViewResolver.class);
private Map<String, ResultView> resultViewMap = new ConcurrentHashMap<String, ResultView>();
private static ResultViewResolver viewResolver;
public static ResultViewResolver getInstance() {
if (viewResolver == null) {
synchronized (ResultViewResolver.class) {
viewResolver = new ResultViewResolver();
}
}
return viewResolver;
}
static {
getInstance().registerResultViews();
}
private void registerResultViews() {
try {
registerView(new StatusModel(), new StatusView());
registerView(new VersionModel(), new VersionView());
registerView(new MessageModel(), new MessageView());
registerView(new HelpModel(), new HelpView());
//registerView(new HistoryModel(), new HistoryView());
registerView(new EchoModel(), new EchoView());
} catch (Throwable e) {
logger.error("register result view failed", e);
}
}
private ResultViewResolver() {
}
// public void registerView(Class<? extends ExecResult> resultClass, ResultView view) throws IllegalAccessException, InstantiationException {
// ExecResult instance = resultClass.newInstance();
// this.registerView(instance.getType(), view);
// }
public <T extends ResultModel> void registerView(T resultObject, ResultView view) {
this.registerView(resultObject.getType(), view);
}
public void registerView(String resultType, ResultView view) {
resultViewMap.put(resultType, view);
}
public ResultView getResultView(String resultType) {
return resultViewMap.get(resultType);
}
}

@ -0,0 +1,18 @@
package com.taobao.arthas.core.command.view;
import com.taobao.arthas.core.command.model.StatusModel;
import com.taobao.arthas.core.shell.command.CommandProcess;
/**
* @author gongdewei 2020/3/27
*/
public class StatusView extends ResultView<StatusModel> {
@Override
public void draw(CommandProcess process, StatusModel result) {
if (result.getMessage() != null) {
writeln(process, result.getMessage());
}
}
}

@ -0,0 +1,21 @@
package com.taobao.arthas.core.distribution;
import com.taobao.arthas.core.command.model.ResultModel;
/**
* Command result distributor, sending results to consumers who joins in the same session.
* @author gongdewei 2020-03-26
*/
public interface ResultDistributor {
/**
* Append the phased result to queue
* @param result a phased result of the command
*/
void appendResult(ResultModel result);
/**
* Close result distribtor, release resources
*/
void close();
}

@ -0,0 +1,36 @@
package com.taobao.arthas.core.distribution.impl;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.command.view.ResultView;
import com.taobao.arthas.core.command.view.ResultViewResolver;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.shell.command.CommandProcess;
/**
* Term/Tty Result Distributor
*
* @author gongdewei 2020-03-26
*/
public class TermResultDistributorImpl implements ResultDistributor {
private final CommandProcess commandProcess;
private final ResultViewResolver resultViewResolver;
public TermResultDistributorImpl(CommandProcess commandProcess) {
this.commandProcess = commandProcess;
this.resultViewResolver = ResultViewResolver.getInstance();
}
@Override
public void appendResult(ResultModel result) {
ResultView resultView = resultViewResolver.getResultView(result.getType());
if (resultView != null) {
resultView.draw(commandProcess, result);
}
}
@Override
public void close() {
}
}

@ -5,6 +5,8 @@ import com.taobao.arthas.core.shell.future.Future;
import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.handlers.NoOpHandler; import com.taobao.arthas.core.shell.handlers.NoOpHandler;
import com.taobao.arthas.core.shell.impl.ShellServerImpl; import com.taobao.arthas.core.shell.impl.ShellServerImpl;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import com.taobao.arthas.core.shell.system.impl.JobControllerImpl;
import com.taobao.arthas.core.shell.term.Term; import com.taobao.arthas.core.shell.term.Term;
import com.taobao.arthas.core.shell.term.TermServer; import com.taobao.arthas.core.shell.term.TermServer;
@ -102,4 +104,14 @@ public abstract class ShellServer {
* @param completionHandler handler for getting notified when service is stopped * @param completionHandler handler for getting notified when service is stopped
*/ */
public abstract void close(Handler<Future<Void>> completionHandler); public abstract void close(Handler<Future<Void>> completionHandler);
/**
* @return global job controller instance
*/
public abstract JobControllerImpl getJobController();
/**
* @return get command manager instance
*/
public abstract InternalCommandManager getCommandManager();
} }

@ -1,6 +1,7 @@
package com.taobao.arthas.core.shell.command; package com.taobao.arthas.core.shell.command;
import com.taobao.arthas.core.advisor.AdviceListener; import com.taobao.arthas.core.advisor.AdviceListener;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.session.Session;
@ -119,6 +120,13 @@ public interface CommandProcess extends Tty {
*/ */
void end(int status); void end(int status);
/**
* End the process.
*
* @param status the exit status.
*/
void end(int status, String message);
/** /**
* Register listener * Register listener
@ -167,4 +175,11 @@ public interface CommandProcess extends Tty {
* Whether the process is running * Whether the process is running
*/ */
boolean isRunning(); boolean isRunning();
/**
* Append the phased result to queue
* @param result a phased result of the command
*/
void appendResult(ResultModel result);
} }

@ -7,22 +7,21 @@ import com.taobao.arthas.core.shell.ShellServer;
import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.cli.CliTokens; import com.taobao.arthas.core.shell.cli.CliTokens;
import com.taobao.arthas.core.shell.future.Future; import com.taobao.arthas.core.shell.future.Future;
import com.taobao.arthas.core.shell.handlers.shell.CloseHandler; import com.taobao.arthas.core.shell.handlers.shell.*;
import com.taobao.arthas.core.shell.handlers.shell.CommandManagerCompletionHandler;
import com.taobao.arthas.core.shell.handlers.shell.FutureHandler;
import com.taobao.arthas.core.shell.handlers.shell.InterruptHandler;
import com.taobao.arthas.core.shell.handlers.shell.ShellLineHandler;
import com.taobao.arthas.core.shell.handlers.shell.SuspendHandler;
import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.session.impl.SessionImpl; import com.taobao.arthas.core.shell.session.impl.SessionImpl;
import com.taobao.arthas.core.shell.system.ExecStatus; import com.taobao.arthas.core.shell.system.ExecStatus;
import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.JobController; import com.taobao.arthas.core.shell.system.JobController;
import com.taobao.arthas.core.shell.system.JobListener;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import com.taobao.arthas.core.shell.system.impl.JobControllerImpl; import com.taobao.arthas.core.shell.system.impl.JobControllerImpl;
import com.taobao.arthas.core.shell.term.Term; import com.taobao.arthas.core.shell.term.Term;
import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer; import com.taobao.arthas.core.shell.term.impl.TermImpl;
import com.taobao.arthas.core.util.Constants;
import com.taobao.arthas.core.util.FileUtils;
import java.io.File;
import java.lang.instrument.Instrumentation; import java.lang.instrument.Instrumentation;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -79,7 +78,7 @@ public class ShellImpl implements Shell {
@Override @Override
public synchronized Job createJob(List<CliToken> args) { public synchronized Job createJob(List<CliToken> args) {
Job job = jobController.createJob(commandManager, args, this); Job job = jobController.createJob(commandManager, args, session, new ShellJobHandler(this), term, null);
return job; return job;
} }
@ -165,7 +164,7 @@ public class ShellImpl implements Shell {
// sometimes an NPE will be thrown during shutdown via web-socket, // sometimes an NPE will be thrown during shutdown via web-socket,
// this ensures the shutdown process is finished properly // this ensures the shutdown process is finished properly
// https://github.com/alibaba/arthas/issues/320 // https://github.com/alibaba/arthas/issues/320
logger.error("ARTHAS", "Error writing data:", t); logger.error("Error writing data:", t);
} }
term.close(); term.close();
} else { } else {
@ -180,4 +179,53 @@ public class ShellImpl implements Shell {
public Job getForegroundJob() { public Job getForegroundJob() {
return currentForegroundJob; return currentForegroundJob;
} }
private class ShellJobHandler implements JobListener {
ShellImpl shell;
public ShellJobHandler(ShellImpl shell) {
this.shell = shell;
}
@Override
public void onForeground(Job job) {
shell.setForegroundJob(job);
//reset stdin handler to job's origin handler
//shell.term().stdinHandler(job.process().getStdinHandler());
}
@Override
public void onBackground(Job job) {
resetAndReadLine();
}
@Override
public void onTerminated(Job job) {
if (!job.isRunInBackground()){
resetAndReadLine();
}
// save command history
Term term = shell.term();
if (term instanceof TermImpl) {
List<int[]> history = ((TermImpl) term).getReadline().getHistory();
FileUtils.saveCommandHistory(history, new File(Constants.CMD_HISTORY_FILE));
}
}
@Override
public void onSuspend(Job job) {
if (!job.isRunInBackground()){
resetAndReadLine();
}
}
private void resetAndReadLine() {
//reset stdin handler to echo handler
//shell.term().stdinHandler(null);
shell.setForegroundJob(null);
shell.readline();
}
}
} }

@ -242,4 +242,12 @@ public class ShellServerImpl extends ShellServer {
bootstrap.destroy(); bootstrap.destroy();
} }
} }
public JobControllerImpl getJobController() {
return jobController;
}
public InternalCommandManager getCommandManager() {
return commandManager;
}
} }

@ -57,6 +57,11 @@ public interface Job {
*/ */
Job resume(); Job resume();
/**
* @return true if the job is running in background
*/
boolean isRunInBackground();
/** /**
* Send the job to background. * Send the job to background.
* *

@ -1,9 +1,11 @@
package com.taobao.arthas.core.shell.system; package com.taobao.arthas.core.shell.system;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.impl.ShellImpl; import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
import com.taobao.arthas.core.shell.term.Term;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -33,10 +35,13 @@ public interface JobController {
* *
* @param commandManager command manager * @param commandManager command manager
* @param tokens the command tokens * @param tokens the command tokens
* @param shell the current shell * @param session the current session
* @param jobHandler job event handler
* @param term telnet term
* @param resultDistributor
* @return the created job * @return the created job
*/ */
Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, ShellImpl shell); Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor);
/** /**
* Close the controller and terminate all the underlying jobs, a closed controller does not accept anymore jobs. * Close the controller and terminate all the underlying jobs, a closed controller does not accept anymore jobs.

@ -0,0 +1,16 @@
package com.taobao.arthas.core.shell.system;
/**
* Job listener
* @author gongdewei 2020-03-23
*/
public interface JobListener {
void onForeground(Job job);
void onBackground(Job job);
void onTerminated(Job job);
void onSuspend(Job job);
}

@ -9,11 +9,15 @@ import java.util.concurrent.TimeUnit;
import com.alibaba.arthas.deps.org.slf4j.Logger; import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.GlobalOptions; import com.taobao.arthas.core.GlobalOptions;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.server.ArthasBootstrap; import com.taobao.arthas.core.server.ArthasBootstrap;
import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.impl.ShellImpl; import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.JobListener;
import com.taobao.arthas.core.shell.term.Term;
/** /**
* Job Controller * Job Controller
@ -49,8 +53,8 @@ public class GlobalJobControllerImpl extends JobControllerImpl {
} }
@Override @Override
public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, ShellImpl shell) { public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) {
final Job job = super.createJob(commandManager, tokens, shell); final Job job = super.createJob(commandManager, tokens, session, jobHandler, term, resultDistributor);
/* /*
* job * job

@ -1,6 +1,7 @@
package com.taobao.arthas.core.shell.system.impl; package com.taobao.arthas.core.shell.system.impl;
import com.taobao.arthas.core.GlobalOptions; import com.taobao.arthas.core.GlobalOptions;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.command.Command; import com.taobao.arthas.core.shell.command.Command;
import com.taobao.arthas.core.shell.command.internal.RedirectHandler; import com.taobao.arthas.core.shell.command.internal.RedirectHandler;
@ -8,33 +9,27 @@ import com.taobao.arthas.core.shell.command.internal.StdoutHandler;
import com.taobao.arthas.core.shell.command.internal.TermHandler; import com.taobao.arthas.core.shell.command.internal.TermHandler;
import com.taobao.arthas.core.shell.future.Future; import com.taobao.arthas.core.shell.future.Future;
import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.impl.ShellImpl; import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.JobController; import com.taobao.arthas.core.shell.system.JobController;
import com.taobao.arthas.core.shell.system.JobListener;
import com.taobao.arthas.core.shell.system.Process; import com.taobao.arthas.core.shell.system.Process;
import com.taobao.arthas.core.shell.system.impl.ProcessImpl.ProcessOutput; import com.taobao.arthas.core.shell.system.impl.ProcessImpl.ProcessOutput;
import com.taobao.arthas.core.shell.term.Term; import com.taobao.arthas.core.shell.term.Term;
import com.taobao.arthas.core.util.Constants; import com.taobao.arthas.core.util.Constants;
import com.taobao.arthas.core.util.LogUtil; import com.taobao.arthas.core.util.LogUtil;
import com.taobao.arthas.core.util.TokenUtils; import com.taobao.arthas.core.util.TokenUtils;
import io.termd.core.function.Function; import io.termd.core.function.Function;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author hengyunabc 2019-05-14 * @author hengyunabc 2019-05-14
* @author gongdewei 2020-03-23
*/ */
public class JobControllerImpl implements JobController { public class JobControllerImpl implements JobController {
@ -58,16 +53,16 @@ public class JobControllerImpl implements JobController {
} }
@Override @Override
public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, ShellImpl shell) { public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) {
int jobId = idGenerator.incrementAndGet(); int jobId = idGenerator.incrementAndGet();
StringBuilder line = new StringBuilder(); StringBuilder line = new StringBuilder();
for (CliToken arg : tokens) { for (CliToken arg : tokens) {
line.append(arg.raw()); line.append(arg.raw());
} }
boolean runInBackground = runInBackground(tokens); boolean runInBackground = runInBackground(tokens);
Process process = createProcess(tokens, commandManager, jobId, shell.term()); Process process = createProcess(tokens, commandManager, jobId, term, resultDistributor);
process.setJobId(jobId); process.setJobId(jobId);
JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, shell); JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, session, jobHandler);
jobs.put(jobId, job); jobs.put(jobId, job);
return job; return job;
} }
@ -120,9 +115,10 @@ public class JobControllerImpl implements JobController {
* @param commandManager command manager * @param commandManager command manager
* @param jobId job id * @param jobId job id
* @param term term * @param term term
* @param resultDistributor
* @return the created process * @return the created process
*/ */
private Process createProcess(List<CliToken> line, InternalCommandManager commandManager, int jobId, Term term) { private Process createProcess(List<CliToken> line, InternalCommandManager commandManager, int jobId, Term term, ResultDistributor resultDistributor) {
try { try {
ListIterator<CliToken> tokens = line.listIterator(); ListIterator<CliToken> tokens = line.listIterator();
while (tokens.hasNext()) { while (tokens.hasNext()) {
@ -130,7 +126,7 @@ public class JobControllerImpl implements JobController {
if (token.isText()) { if (token.isText()) {
Command command = commandManager.getCommand(token.value()); Command command = commandManager.getCommand(token.value());
if (command != null) { if (command != null) {
return createCommandProcess(command, tokens, jobId, term); return createCommandProcess(command, tokens, jobId, term, resultDistributor);
} else { } else {
throw new IllegalArgumentException(token.value() + ": command not found"); throw new IllegalArgumentException(token.value() + ": command not found");
} }
@ -152,7 +148,7 @@ public class JobControllerImpl implements JobController {
return runInBackground; return runInBackground;
} }
private Process createCommandProcess(Command command, ListIterator<CliToken> tokens, int jobId, Term term) throws IOException { private Process createCommandProcess(Command command, ListIterator<CliToken> tokens, int jobId, Term term, ResultDistributor resultDistributor) throws IOException {
List<CliToken> remaining = new ArrayList<CliToken>(); List<CliToken> remaining = new ArrayList<CliToken>();
List<CliToken> pipelineTokens = new ArrayList<CliToken>(); List<CliToken> pipelineTokens = new ArrayList<CliToken>();
boolean isPipeline = false; boolean isPipeline = false;
@ -199,7 +195,9 @@ public class JobControllerImpl implements JobController {
} }
} }
ProcessOutput ProcessOutput = new ProcessOutput(stdoutHandlerChain, cacheLocation, term); ProcessOutput ProcessOutput = new ProcessOutput(stdoutHandlerChain, cacheLocation, term);
return new ProcessImpl(command, remaining, command.processHandler(), ProcessOutput); ProcessImpl process = new ProcessImpl(command, remaining, command.processHandler(), ProcessOutput, resultDistributor);
process.setTty(term);
return process;
} }
private String getRedirectFileName(ListIterator<CliToken> tokens) { private String getRedirectFileName(ListIterator<CliToken> tokens) {
@ -226,4 +224,5 @@ public class JobControllerImpl implements JobController {
public void close() { public void close() {
close(null); close(null);
} }
} }

@ -7,20 +7,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.taobao.arthas.core.shell.future.Future; import com.taobao.arthas.core.shell.future.Future;
import com.taobao.arthas.core.shell.handlers.Handler; import com.taobao.arthas.core.shell.handlers.Handler;
import com.taobao.arthas.core.shell.handlers.shell.ShellForegroundUpdateHandler;
import com.taobao.arthas.core.shell.impl.ShellImpl;
import com.taobao.arthas.core.shell.session.Session; import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.ExecStatus; import com.taobao.arthas.core.shell.system.ExecStatus;
import com.taobao.arthas.core.shell.system.Job; import com.taobao.arthas.core.shell.system.Job;
import com.taobao.arthas.core.shell.system.JobListener;
import com.taobao.arthas.core.shell.system.Process; import com.taobao.arthas.core.shell.system.Process;
import com.taobao.arthas.core.shell.term.Term;
import com.taobao.arthas.core.shell.term.impl.TermImpl;
import com.taobao.arthas.core.util.Constants;
import com.taobao.arthas.core.util.FileUtils;
/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
* @author hengyunabc 2019-05-14 * @author hengyunabc 2019-05-14
* @author gongdewei 2020-03-23
*/ */
public class JobImpl implements Job { public class JobImpl implements Job {
@ -28,25 +24,30 @@ public class JobImpl implements Job {
final JobControllerImpl controller; final JobControllerImpl controller;
final Process process; final Process process;
final String line; final String line;
private volatile Session session;
private volatile ExecStatus actualStatus; // Used internally for testing only private volatile ExecStatus actualStatus; // Used internally for testing only
volatile long lastStopped; // When the job was last stopped volatile long lastStopped; // When the job was last stopped
volatile ShellImpl shell; volatile JobListener jobHandler;
volatile Handler<ExecStatus> statusUpdateHandler; volatile Handler<ExecStatus> statusUpdateHandler;
volatile Date timeoutDate; volatile Date timeoutDate;
final Future<Void> terminateFuture; final Future<Void> terminateFuture;
final AtomicBoolean runInBackground; final AtomicBoolean runInBackground;
final Handler<Job> foregroundUpdatedHandler; //final Handler<Job> foregroundUpdatedHandler;
JobImpl(int id, final JobControllerImpl controller, Process process, String line, boolean runInBackground, JobImpl(int id, final JobControllerImpl controller, Process process, String line, boolean runInBackground,
ShellImpl shell) { Session session, JobListener jobHandler) {
this.id = id; this.id = id;
this.controller = controller; this.controller = controller;
this.process = process; this.process = process;
this.line = line; this.line = line;
this.session = session;
this.terminateFuture = Future.future(); this.terminateFuture = Future.future();
this.runInBackground = new AtomicBoolean(runInBackground); this.runInBackground = new AtomicBoolean(runInBackground);
this.shell = shell; this.jobHandler = jobHandler;
this.foregroundUpdatedHandler = new ShellForegroundUpdateHandler(shell); if (jobHandler == null) {
throw new IllegalArgumentException("JobListener is required");
}
//this.foregroundUpdatedHandler = new ShellForegroundUpdateHandler(shell);
process.terminatedHandler(new TerminatedHandler(controller)); process.terminatedHandler(new TerminatedHandler(controller));
} }
@ -76,7 +77,7 @@ public class JobImpl implements Job {
@Override @Override
public Session getSession() { public Session getSession() {
return shell.session(); return session;
} }
@Override @Override
@ -89,19 +90,21 @@ public class JobImpl implements Job {
runInBackground.set(!foreground); runInBackground.set(!foreground);
if (foreground) { // if (foreground) {
if (foregroundUpdatedHandler != null) { // if (foregroundUpdatedHandler != null) {
foregroundUpdatedHandler.handle(this); // foregroundUpdatedHandler.handle(this);
} // }
} // }
if (statusUpdateHandler != null) { if (statusUpdateHandler != null) {
statusUpdateHandler.handle(process.status()); statusUpdateHandler.handle(process.status());
} }
if (foreground) { if (this.status() == ExecStatus.RUNNING) {
shell.setForegroundJob(this); if (foreground) {
} else { jobHandler.onForeground(this);
shell.setForegroundJob(null); } else {
jobHandler.onBackground(this);
}
} }
return this; return this;
} }
@ -113,14 +116,15 @@ public class JobImpl implements Job {
} catch (IllegalStateException ignore) { } catch (IllegalStateException ignore) {
return this; return this;
} }
if (!runInBackground.get() && foregroundUpdatedHandler != null) { // if (!runInBackground.get() && foregroundUpdatedHandler != null) {
foregroundUpdatedHandler.handle(null); // foregroundUpdatedHandler.handle(null);
} // }
if (statusUpdateHandler != null) { if (statusUpdateHandler != null) {
statusUpdateHandler.handle(process.status()); statusUpdateHandler.handle(process.status());
} }
shell.setForegroundJob(null); // shell.setForegroundJob(null);
jobHandler.onSuspend(this);
return this; return this;
} }
@ -147,6 +151,11 @@ public class JobImpl implements Job {
return line; return line;
} }
@Override
public boolean isRunInBackground() {
return runInBackground.get();
}
@Override @Override
public Job toBackground() { public Job toBackground() {
if (!this.runInBackground.get()) { if (!this.runInBackground.get()) {
@ -156,10 +165,12 @@ public class JobImpl implements Job {
if (statusUpdateHandler != null) { if (statusUpdateHandler != null) {
statusUpdateHandler.handle(process.status()); statusUpdateHandler.handle(process.status());
} }
jobHandler.onBackground(this);
} }
} }
shell.setForegroundJob(null); // shell.setForegroundJob(null);
// jobHandler.onBackground(this);
return this; return this;
} }
@ -167,15 +178,16 @@ public class JobImpl implements Job {
public Job toForeground() { public Job toForeground() {
if (this.runInBackground.get()) { if (this.runInBackground.get()) {
if (runInBackground.compareAndSet(true, false)) { if (runInBackground.compareAndSet(true, false)) {
if (foregroundUpdatedHandler != null) { // if (foregroundUpdatedHandler != null) {
foregroundUpdatedHandler.handle(this); // foregroundUpdatedHandler.handle(this);
} // }
process.toForeground(); process.toForeground();
if (statusUpdateHandler != null) { if (statusUpdateHandler != null) {
statusUpdateHandler.handle(process.status()); statusUpdateHandler.handle(process.status());
} }
shell.setForegroundJob(this); // shell.setForegroundJob(this);
jobHandler.onForeground(this);
} }
} }
@ -194,26 +206,34 @@ public class JobImpl implements Job {
@Override @Override
public Job run(boolean foreground) { public Job run(boolean foreground) {
if (foreground && foregroundUpdatedHandler != null) { // if (foreground && foregroundUpdatedHandler != null) {
foregroundUpdatedHandler.handle(this); // foregroundUpdatedHandler.handle(this);
} // }
actualStatus = ExecStatus.RUNNING; actualStatus = ExecStatus.RUNNING;
if (statusUpdateHandler != null) { if (statusUpdateHandler != null) {
statusUpdateHandler.handle(ExecStatus.RUNNING); statusUpdateHandler.handle(ExecStatus.RUNNING);
} }
process.setTty(shell.term()); //set process's tty in JobControllerImpl.createCommandProcess
process.setSession(shell.session()); //process.setTty(shell.term());
process.setSession(this.session);
process.run(foreground); process.run(foreground);
if (!foreground && foregroundUpdatedHandler != null) { // if (!foreground && foregroundUpdatedHandler != null) {
foregroundUpdatedHandler.handle(null); // foregroundUpdatedHandler.handle(null);
} // }
//
if (foreground) { // if (foreground) {
shell.setForegroundJob(this); // shell.setForegroundJob(this);
} else { // } else {
shell.setForegroundJob(null); // shell.setForegroundJob(null);
// }
if (this.status() == ExecStatus.RUNNING) {
if (foreground) {
jobHandler.onForeground(this);
} else {
jobHandler.onBackground(this);
}
} }
return this; return this;
} }
@ -228,24 +248,25 @@ public class JobImpl implements Job {
@Override @Override
public void handle(Integer exitCode) { public void handle(Integer exitCode) {
if (!runInBackground.get() && actualStatus.equals(ExecStatus.RUNNING)) { // if (!runInBackground.get() && actualStatus.equals(ExecStatus.RUNNING)) {
// 只有前台在运行的任务才需要调用foregroundUpdateHandler // 只有前台在运行的任务才需要调用foregroundUpdateHandler
if (foregroundUpdatedHandler != null) { // if (foregroundUpdatedHandler != null) {
foregroundUpdatedHandler.handle(null); // foregroundUpdatedHandler.handle(null);
} // }
} // }
jobHandler.onTerminated(JobImpl.this);
controller.removeJob(JobImpl.this.id); controller.removeJob(JobImpl.this.id);
if (statusUpdateHandler != null) { if (statusUpdateHandler != null) {
statusUpdateHandler.handle(ExecStatus.TERMINATED); statusUpdateHandler.handle(ExecStatus.TERMINATED);
} }
terminateFuture.complete(); terminateFuture.complete();
// save command history // save command history (move to JobControllerImpl.ShellJobHandler.onTerminated)
Term term = shell.term(); // Term term = shell.term();
if (term instanceof TermImpl) { // if (term instanceof TermImpl) {
List<int[]> history = ((TermImpl) term).getReadline().getHistory(); // List<int[]> history = ((TermImpl) term).getReadline().getHistory();
FileUtils.saveCommandHistory(history, new File(Constants.CMD_HISTORY_FILE)); // FileUtils.saveCommandHistory(history, new File(Constants.CMD_HISTORY_FILE));
} // }
} }
} }

@ -4,6 +4,11 @@ import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.advisor.AdviceListener; import com.taobao.arthas.core.advisor.AdviceListener;
import com.taobao.arthas.core.advisor.AdviceWeaver; import com.taobao.arthas.core.advisor.AdviceWeaver;
import com.taobao.arthas.core.command.basic1000.HelpCommand;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.command.model.StatusModel;
import com.taobao.arthas.core.distribution.ResultDistributor;
import com.taobao.arthas.core.distribution.impl.TermResultDistributorImpl;
import com.taobao.arthas.core.server.ArthasBootstrap; import com.taobao.arthas.core.server.ArthasBootstrap;
import com.taobao.arthas.core.shell.cli.CliToken; import com.taobao.arthas.core.shell.cli.CliToken;
import com.taobao.arthas.core.shell.command.Command; import com.taobao.arthas.core.shell.command.Command;
@ -16,12 +21,8 @@ import com.taobao.arthas.core.shell.system.ExecStatus;
import com.taobao.arthas.core.shell.system.Process; import com.taobao.arthas.core.shell.system.Process;
import com.taobao.arthas.core.shell.system.ProcessAware; import com.taobao.arthas.core.shell.system.ProcessAware;
import com.taobao.arthas.core.shell.term.Tty; import com.taobao.arthas.core.shell.term.Tty;
import com.taobao.arthas.core.util.usage.StyledUsageFormatter;
import com.taobao.middleware.cli.CLIException; import com.taobao.middleware.cli.CLIException;
import com.taobao.middleware.cli.CommandLine; import com.taobao.middleware.cli.CommandLine;
import com.taobao.middleware.cli.UsageMessageFormatter;
import com.taobao.text.Color;
import io.termd.core.function.Function; import io.termd.core.function.Function;
import java.lang.instrument.ClassFileTransformer; import java.lang.instrument.ClassFileTransformer;
@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author beiwei30 on 10/11/2016. * @author beiwei30 on 10/11/2016.
* @author gongdewei 2020-03-26
*/ */
public class ProcessImpl implements Process { public class ProcessImpl implements Process {
@ -55,16 +57,18 @@ public class ProcessImpl implements Process {
private Handler<String> stdinHandler; private Handler<String> stdinHandler;
private Handler<Void> resizeHandler; private Handler<Void> resizeHandler;
private Integer exitCode; private Integer exitCode;
private CommandProcess process; private CommandProcessImpl process;
private Date startTime; private Date startTime;
private ProcessOutput processOutput; private ProcessOutput processOutput;
private int jobId; private int jobId;
private ResultDistributor resultDistributor;
public ProcessImpl(Command commandContext, List<CliToken> args, Handler<CommandProcess> handler, public ProcessImpl(Command commandContext, List<CliToken> args, Handler<CommandProcess> handler,
ProcessOutput processOutput) { ProcessOutput processOutput, ResultDistributor resultDistributor) {
this.commandContext = commandContext; this.commandContext = commandContext;
this.handler = handler; this.handler = handler;
this.args = args; this.args = args;
this.resultDistributor = resultDistributor;
this.processStatus = ExecStatus.READY; this.processStatus = ExecStatus.READY;
this.processOutput = processOutput; this.processOutput = processOutput;
} }
@ -236,13 +240,15 @@ public class ProcessImpl implements Process {
@Override @Override
public void terminate(Handler<Void> completionHandler) { public void terminate(Handler<Void> completionHandler) {
if (!terminate(-10, completionHandler)) { if (!terminate(-10, completionHandler, null)) {
throw new IllegalStateException("Cannot terminate terminated process"); throw new IllegalStateException("Cannot terminate terminated process");
} }
} }
private synchronized boolean terminate(int exitCode, Handler<Void> completionHandler) { private synchronized boolean terminate(int exitCode, Handler<Void> completionHandler, String message) {
if (processStatus != ExecStatus.TERMINATED) { if (processStatus != ExecStatus.TERMINATED) {
//add status message
this.appendResult(new StatusModel(exitCode, message));
if (process != null) { if (process != null) {
processOutput.close(); processOutput.close();
} }
@ -256,6 +262,13 @@ public class ProcessImpl implements Process {
} }
} }
private void appendResult(ResultModel result) {
result.setJobId(jobId);
if (resultDistributor != null) {
resultDistributor.appendResult(result);
}
}
private void updateStatus(ExecStatus statusUpdate, Integer exitCodeUpdate, boolean foregroundUpdate, private void updateStatus(ExecStatus statusUpdate, Integer exitCodeUpdate, boolean foregroundUpdate,
Handler<Void> handler, Handler<Integer> terminatedHandler, Handler<Void> handler, Handler<Integer> terminatedHandler,
Handler<Void> completionHandler) { Handler<Void> completionHandler) {
@ -320,6 +333,11 @@ public class ProcessImpl implements Process {
throw new IllegalStateException("Cannot execute process without a TTY set"); throw new IllegalStateException("Cannot execute process without a TTY set");
} }
process = new CommandProcessImpl(this, tty);
if (resultDistributor == null) {
resultDistributor = new TermResultDistributorImpl(process);
}
final List<String> args2 = new LinkedList<String>(); final List<String> args2 = new LinkedList<String>();
for (CliToken arg : args) { for (CliToken arg : args) {
if (arg.isText()) { if (arg.isText()) {
@ -331,25 +349,20 @@ public class ProcessImpl implements Process {
try { try {
if (commandContext.cli() != null) { if (commandContext.cli() != null) {
if (commandContext.cli().parse(args2, false).isAskingForHelp()) { if (commandContext.cli().parse(args2, false).isAskingForHelp()) {
UsageMessageFormatter formatter = new StyledUsageFormatter(Color.green); appendResult(new HelpCommand().createHelpDetailModel(commandContext));
formatter.setWidth(tty.width());
StringBuilder usage = new StringBuilder();
commandContext.cli().usage(usage, formatter);
usage.append('\n');
tty.write(usage.toString());
terminate(); terminate();
return; return;
} }
cl = commandContext.cli().parse(args2); cl = commandContext.cli().parse(args2);
process.setArgs2(args2);
process.setCommandLine(cl);
} }
} catch (CLIException e) { } catch (CLIException e) {
tty.write(e.getMessage() + "\n"); terminate(-10, null, e.getMessage());
terminate();
return; return;
} }
process = new CommandProcessImpl(this, args2, tty, cl);
if (cacheLocation() != null) { if (cacheLocation() != null) {
process.echoTips("job id : " + this.jobId + "\n"); process.echoTips("job id : " + this.jobId + "\n");
process.echoTips("cache location : " + cacheLocation() + "\n"); process.echoTips("cache location : " + cacheLocation() + "\n");
@ -372,27 +385,25 @@ public class ProcessImpl implements Process {
handler.handle(process); handler.handle(process);
} catch (Throwable t) { } catch (Throwable t) {
logger.error("Error during processing the command:", t); logger.error("Error during processing the command:", t);
process.write("Error during processing the command, exception type: " + t.getClass().getName() + ", message:" + t.getMessage() process.end(1, "Error during processing the command: " + t.getClass().getName() + ", message:" + t.getMessage()
+ ", please check $HOME/logs/arthas/arthas.log for more details. \n"); + ", please check $HOME/logs/arthas/arthas.log for more details." );
terminate(1, null);
} }
} }
} }
private class CommandProcessImpl implements CommandProcess { private class CommandProcessImpl implements CommandProcess {
private final Process process; private final Process process;
private final List<String> args2;
private final Tty tty; private final Tty tty;
private final CommandLine commandLine; private List<String> args2;
private CommandLine commandLine;
private AtomicInteger times = new AtomicInteger(); private AtomicInteger times = new AtomicInteger();
private AdviceListener listener = null; private AdviceListener listener = null;
private ClassFileTransformer transformer; private ClassFileTransformer transformer;
public CommandProcessImpl(Process process, List<String> args2, Tty tty, CommandLine commandLine) { public CommandProcessImpl(Process process, Tty tty) {
this.process = process; this.process = process;
this.args2 = args2;
this.tty = tty; this.tty = tty;
this.commandLine = commandLine;
} }
@Override @Override
@ -440,6 +451,14 @@ public class ProcessImpl implements Process {
return times; return times;
} }
public void setArgs2(List<String> args2) {
this.args2 = args2;
}
public void setCommandLine(CommandLine commandLine) {
this.commandLine = commandLine;
}
@Override @Override
public CommandProcess stdinHandler(Handler<String> handler) { public CommandProcess stdinHandler(Handler<String> handler) {
stdinHandler = handler; stdinHandler = handler;
@ -578,13 +597,27 @@ public class ProcessImpl implements Process {
@Override @Override
public void end(int statusCode) { public void end(int statusCode) {
terminate(statusCode, null); end(statusCode, null);
}
@Override
public void end(int statusCode, String message) {
terminate(statusCode, null, message);
} }
@Override @Override
public boolean isRunning() { public boolean isRunning() {
return processStatus == ExecStatus.RUNNING; return processStatus == ExecStatus.RUNNING;
} }
@Override
public void appendResult(ResultModel result) {
if (processStatus != ExecStatus.RUNNING) {
throw new IllegalStateException(
"Cannot write to standard output when " + status().name().toLowerCase());
}
ProcessImpl.this.appendResult(result);
}
} }
static class ProcessOutput { static class ProcessOutput {
@ -620,7 +653,10 @@ public class ProcessImpl implements Process {
private void write(String data) { private void write(String data) {
if (stdoutHandlerChain != null) { if (stdoutHandlerChain != null) {
for (Function<String, String> function : stdoutHandlerChain) { //hotspot, reduce memory fragment (foreach/iterator)
int size = stdoutHandlerChain.size();
for (int i = 0; i < size; i++) {
Function<String, String> function = stdoutHandlerChain.get(i);
data = function.apply(data); data = function.apply(data);
} }
} }

Loading…
Cancel
Save