diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java
new file mode 100644
index 000000000..47b6f5306
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java
@@ -0,0 +1,54 @@
+package com.taobao.arthas.core.command.model;
+
+import com.taobao.arthas.core.shell.term.impl.http.api.ApiState;
+
+/**
+ * Command async exec process result, not the command exec result
+ * @author gongdewei 2020/4/2
+ */
+public class CommandRequestModel extends ResultModel {
+
+ private ApiState state;
+ private String command;
+ private String message;
+
+ public CommandRequestModel(String command, ApiState state) {
+ this.command = command;
+ this.state = state;
+ }
+
+ public CommandRequestModel(String command, ApiState state, String message) {
+ this.state = state;
+ this.command = command;
+ this.message = message;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public ApiState getState() {
+ return state;
+ }
+
+ public void setState(ApiState state) {
+ this.state = state;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String getType() {
+ return "command";
+ }
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java
new file mode 100644
index 000000000..182a4a2f9
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java
@@ -0,0 +1,22 @@
+package com.taobao.arthas.core.command.model;
+
+/**
+ * Command input status for webui
+ * @author gongdewei 2020/4/14
+ */
+public enum InputStatus {
+ /**
+ * Allow input new commands
+ */
+ ALLOW_INPUT,
+
+ /**
+ * Allow interrupt running job
+ */
+ ALLOW_INTERRUPT,
+
+ /**
+ * Disable input and interrupt
+ */
+ DISABLED
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java
new file mode 100644
index 000000000..b1da685a0
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java
@@ -0,0 +1,28 @@
+package com.taobao.arthas.core.command.model;
+
+/**
+ * Input status for webui
+ * @author gongdewei 2020/4/14
+ */
+public class InputStatusModel extends ResultModel {
+
+ private InputStatus inputStatus;
+
+ public InputStatusModel(InputStatus inputStatus) {
+ this.inputStatus = inputStatus;
+ }
+
+ public InputStatus getInputStatus() {
+ return inputStatus;
+ }
+
+ public void setInputStatus(InputStatus inputStatus) {
+ this.inputStatus = inputStatus;
+ }
+
+ @Override
+ public String getType() {
+ return "input_status";
+ }
+
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java
new file mode 100644
index 000000000..11bafd572
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java
@@ -0,0 +1,61 @@
+package com.taobao.arthas.core.command.model;
+
+/**
+ * @author gongdewei 2020/4/20
+ */
+public class WelcomeModel extends ResultModel {
+
+ private String pid;
+ private String time;
+ private String version;
+ private String wiki;
+ private String tutorials;
+
+ public WelcomeModel() {
+ }
+
+ @Override
+ public String getType() {
+ return "welcome";
+ }
+
+ public String getPid() {
+ return pid;
+ }
+
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+
+ public String getTime() {
+ return time;
+ }
+
+ public void setTime(String time) {
+ this.time = time;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getWiki() {
+ return wiki;
+ }
+
+ public void setWiki(String wiki) {
+ this.wiki = wiki;
+ }
+
+ public String getTutorials() {
+ return tutorials;
+ }
+
+ public void setTutorials(String tutorials) {
+ this.tutorials = tutorials;
+ }
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java
index 8c101168e..d417e3518 100644
--- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java
+++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java
@@ -1,6 +1,7 @@
package com.taobao.arthas.core.shell.term.impl.http;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
@@ -8,6 +9,7 @@ import java.net.URL;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.common.IOUtils;
+import com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler;
import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer;
import io.netty.channel.ChannelFuture;
@@ -15,7 +17,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
@@ -27,9 +28,13 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.termd.core.http.HttpTtyConnection;
import io.termd.core.util.Logging;
+import static com.taobao.arthas.core.util.HttpUtils.createRedirectResponse;
+import static com.taobao.arthas.core.util.HttpUtils.createResponse;
+
/**
* @author Julien Viet
* @author hengyunabc 2019-11-06
+ * @author gongdewei 2020-03-18
*/
public class HttpRequestHandler extends SimpleChannelInboundHandler {
private static final Logger logger = LoggerFactory.getLogger(HttpTelnetTermServer.class);
@@ -38,10 +43,13 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler options;
+
+ @Override
+ public String toString() {
+ return "ApiRequest{" +
+ "action='" + action + '\'' +
+ ", command='" + command + '\'' +
+ ", requestId='" + requestId + '\'' +
+ ", sessionId='" + sessionId + '\'' +
+ ", consumerId='" + consumerId + '\'' +
+ ", timeout=" + timeout +
+ ", options=" + options +
+ '}';
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public void setAction(String action) {
+ this.action = action;
+ }
+
+ public String getCommand() {
+ return command;
+ }
+
+ public void setCommand(String command) {
+ this.command = command;
+ }
+
+ public Map getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map options) {
+ this.options = options;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public Integer getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(Integer timeout) {
+ this.timeout = timeout;
+ }
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java
new file mode 100644
index 000000000..e0d16a518
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java
@@ -0,0 +1,79 @@
+package com.taobao.arthas.core.shell.term.impl.http.api;
+
+/**
+ * Http Api exception
+ * @author gongdewei 2020-03-19
+ */
+public class ApiResponse {
+ private String requestId;
+ private ApiState state;
+ private String message;
+ private String sessionId;
+ private String consumerId;
+ private String jobId;
+ private T body;
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public ApiResponse setRequestId(String requestId) {
+ this.requestId = requestId;
+ return this;
+ }
+
+ public ApiState getState() {
+ return state;
+ }
+
+ public ApiResponse setState(ApiState state) {
+ this.state = state;
+ return this;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public ApiResponse setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public ApiResponse setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ return this;
+ }
+
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public ApiResponse setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ return this;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public ApiResponse setJobId(String jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public T getBody() {
+ return body;
+ }
+
+ public ApiResponse setBody(T body) {
+ this.body = body;
+ return this;
+ }
+
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java
new file mode 100644
index 000000000..e222bf50e
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java
@@ -0,0 +1,35 @@
+package com.taobao.arthas.core.shell.term.impl.http.api;
+
+/**
+ * Http API response state
+ *
+ * @author gongdewei 2020-03-19
+ */
+public enum ApiState {
+ /**
+ * Scheduled async exec job
+ */
+ SCHEDULED,
+
+// RUNNING,
+
+ /**
+ * Request processed successfully
+ */
+ SUCCEEDED,
+
+ /**
+ * Request processing interrupt
+ */
+ INTERRUPTED,
+
+ /**
+ * Request processing failed
+ */
+ FAILED,
+
+ /**
+ * Request is refused
+ */
+ REFUSED
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java
new file mode 100644
index 000000000..b614ca580
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java
@@ -0,0 +1,708 @@
+package com.taobao.arthas.core.shell.term.impl.http.api;
+
+import com.alibaba.arthas.deps.org.slf4j.Logger;
+import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
+import com.alibaba.fastjson.JSON;
+import com.taobao.arthas.common.PidUtils;
+import com.taobao.arthas.core.command.model.*;
+import com.taobao.arthas.core.distribution.PackingResultDistributor;
+import com.taobao.arthas.core.distribution.ResultConsumer;
+import com.taobao.arthas.core.distribution.ResultDistributor;
+import com.taobao.arthas.core.distribution.impl.PackingResultDistributorImpl;
+import com.taobao.arthas.core.distribution.impl.ResultConsumerImpl;
+import com.taobao.arthas.core.server.ArthasBootstrap;
+import com.taobao.arthas.core.shell.cli.CliToken;
+import com.taobao.arthas.core.shell.cli.CliTokens;
+import com.taobao.arthas.core.shell.cli.Completion;
+import com.taobao.arthas.core.shell.handlers.Handler;
+import com.taobao.arthas.core.shell.history.HistoryManager;
+import com.taobao.arthas.core.shell.history.impl.HistoryManagerImpl;
+import com.taobao.arthas.core.shell.session.Session;
+import com.taobao.arthas.core.shell.session.SessionManager;
+import com.taobao.arthas.core.shell.system.Job;
+import com.taobao.arthas.core.shell.system.JobListener;
+import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
+import com.taobao.arthas.core.shell.system.impl.JobControllerImpl;
+import com.taobao.arthas.core.shell.term.SignalHandler;
+import com.taobao.arthas.core.shell.term.Term;
+import com.taobao.arthas.core.util.ArthasBanner;
+import com.taobao.arthas.core.util.DateUtils;
+import com.taobao.arthas.core.util.JsonUtils;
+import com.taobao.arthas.core.util.StringUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.*;
+import io.netty.util.CharsetUtil;
+import io.termd.core.function.Function;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Http Restful Api Handler
+ *
+ * @author gongdewei 2020-03-18
+ */
+public class HttpApiHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(HttpApiHandler.class);
+ public static final int DEFAULT_EXEC_TIMEOUT = 30000;
+ private final SessionManager sessionManager;
+ private final AtomicInteger requestIdGenerator = new AtomicInteger(0);
+ private static HttpApiHandler instance;
+ private final InternalCommandManager commandManager;
+ private final JobControllerImpl jobController;
+ private final HistoryManager historyManager;
+
+ private int jsonBufferSize = 1024 * 256;
+ private int poolSize = 8;
+ private ArrayBlockingQueue byteBufPool = new ArrayBlockingQueue(poolSize);
+ private ArrayBlockingQueue charsBufPool = new ArrayBlockingQueue(poolSize);
+ private ArrayBlockingQueue bytesPool = new ArrayBlockingQueue(poolSize);
+
+ public static HttpApiHandler getInstance() {
+ if (instance == null) {
+ synchronized (HttpApiHandler.class) {
+ instance = new HttpApiHandler();
+ }
+ }
+ return instance;
+ }
+
+ private HttpApiHandler() {
+ sessionManager = ArthasBootstrap.getInstance().getSessionManager();
+ commandManager = sessionManager.getCommandManager();
+ jobController = sessionManager.getJobController();
+ historyManager = HistoryManagerImpl.getInstance();
+
+ //init buf pool
+ JsonUtils.setSerializeWriterBufferThreshold(jsonBufferSize);
+ for (int i = 0; i < poolSize; i++) {
+ byteBufPool.offer(Unpooled.buffer(jsonBufferSize));
+ charsBufPool.offer(new char[jsonBufferSize]);
+ bytesPool.offer(new byte[jsonBufferSize]);
+ }
+ }
+
+ public HttpResponse handle(FullHttpRequest request) throws Exception {
+
+ ApiResponse result;
+ String requestBody = null;
+ String requestId = "req_" + requestIdGenerator.addAndGet(1);
+ try {
+ HttpMethod method = request.method();
+ if (HttpMethod.POST.equals(method)) {
+ requestBody = getBody(request);
+ ApiRequest apiRequest = parseRequest(requestBody);
+ apiRequest.setRequestId(requestId);
+ result = processRequest(apiRequest);
+ } else {
+ result = createResponse(ApiState.REFUSED, "Unsupported http method: " + method.name());
+ }
+ } catch (Throwable e) {
+ result = createResponse(ApiState.FAILED, "Process request error: " + e.getMessage());
+ logger.error("arthas process http api request error: " + request.uri() + ", request body: " + requestBody, e);
+ }
+ if (result == null) {
+ result = createResponse(ApiState.FAILED, "The request was not processed");
+ }
+ result.setRequestId(requestId);
+
+
+ //http response content
+ ByteBuf content = null;
+ //fastjson buf
+ char[] charsBuf = null;
+ byte[] bytesBuf = null;
+
+ try {
+ //apply response content buf first
+ content = byteBufPool.poll(2000, TimeUnit.MILLISECONDS);
+ if (content == null) {
+ throw new ApiException("get response content buf failure");
+ }
+
+ //apply fastjson buf from pool
+ charsBuf = charsBufPool.poll();
+ bytesBuf = bytesPool.poll();
+ if (charsBuf == null || bytesBuf == null) {
+ throw new ApiException("get json buf failure");
+ }
+ JsonUtils.setSerializeWriterBufThreadLocal(charsBuf, bytesBuf);
+
+ //create http response
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(),
+ HttpResponseStatus.OK, content.retain());
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8");
+ writeResult(response, result);
+ return response;
+ } catch (Exception e) {
+ //response is discarded
+ if (content != null) {
+ content.release();
+ byteBufPool.offer(content);
+ }
+ throw e;
+ } finally {
+ //give back json buf to pool
+ JsonUtils.setSerializeWriterBufThreadLocal(null, null);
+ if (charsBuf != null) {
+ charsBufPool.offer(charsBuf);
+ }
+ if (bytesBuf != null) {
+ bytesPool.offer(bytesBuf);
+ }
+ }
+ }
+
+ public void onCompleted(DefaultFullHttpResponse httpResponse) {
+ ByteBuf content = httpResponse.content();
+ content.clear();
+ if (content.capacity() == jsonBufferSize) {
+ if (!byteBufPool.offer(content)) {
+ content.release();
+ }
+ } else {
+ //replace content ByteBuf
+ content.release();
+ if (byteBufPool.remainingCapacity() > 0) {
+ byteBufPool.offer(Unpooled.buffer(jsonBufferSize));
+ }
+ }
+ }
+
+ private void writeResult(DefaultFullHttpResponse response, Object result) throws IOException {
+ ByteBufOutputStream out = new ByteBufOutputStream(response.content());
+ try {
+ JSON.writeJSONString(out, result);
+ } catch (IOException e) {
+ logger.error("write json to response failed", e);
+ throw e;
+ }
+ }
+
+ private ApiRequest parseRequest(String requestBody) throws ApiException {
+ if (StringUtils.isBlank(requestBody)) {
+ throw new ApiException("parse request failed: request body is empty");
+ }
+ try {
+ //ObjectMapper objectMapper = new ObjectMapper();
+ //return objectMapper.readValue(requestBody, ApiRequest.class);
+ return JSON.parseObject(requestBody, ApiRequest.class);
+ } catch (Exception e) {
+ throw new ApiException("parse request failed: " + e.getMessage(), e);
+ }
+ }
+
+ private ApiResponse processRequest(ApiRequest apiRequest) {
+
+ String actionStr = apiRequest.getAction();
+ try {
+ if (StringUtils.isBlank(actionStr)) {
+ throw new ApiException("'action' is required");
+ }
+ ApiAction action;
+ try {
+ action = ApiAction.valueOf(actionStr.trim().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new ApiException("unknown action: " + actionStr);
+ }
+
+ //no session required
+ if (ApiAction.INIT_SESSION.equals(action)) {
+ return processInitSessionRequest(apiRequest);
+ }
+
+ //required session
+ String sessionId = apiRequest.getSessionId();
+ if (StringUtils.isBlank(sessionId)) {
+ throw new ApiException("'sessionId' is required");
+ }
+ Session session = sessionManager.getSession(sessionId);
+ if (session == null) {
+ throw new ApiException("session not found: " + sessionId);
+ }
+ sessionManager.updateAccessTime(session);
+
+ //dispatch requests
+ ApiResponse response = dispatchRequest(action, apiRequest, session);
+ if (response != null) {
+ return response;
+ }
+
+ } catch (ApiException e) {
+ logger.info("process http api request failed: {}", e.getMessage());
+ return createResponse(ApiState.FAILED, e.getMessage());
+ } catch (Throwable e) {
+ logger.error("process http api request failed: " + e.getMessage(), e);
+ return createResponse(ApiState.FAILED, "process http api request failed: " + e.getMessage());
+ }
+
+ return createResponse(ApiState.REFUSED, "Unsupported action: " + actionStr);
+ }
+
+ private ApiResponse dispatchRequest(ApiAction action, ApiRequest apiRequest, Session session) throws ApiException {
+ switch (action) {
+ case EXEC:
+ return processExecRequest(apiRequest, session);
+ case ASYNC_EXEC:
+ return processAsyncExecRequest(apiRequest, session);
+ case INTERRUPT_JOB:
+ return processInterruptJob(apiRequest, session);
+ case PULL_RESULTS:
+ return processPullResultsRequest(apiRequest, session);
+ case SESSION_INFO:
+ return processSessionInfoRequest(apiRequest, session);
+ case JOIN_SESSION:
+ return processJoinSessionRequest(apiRequest, session);
+ case CLOSE_SESSION:
+ return processCloseSessionRequest(apiRequest, session);
+ case INIT_SESSION:
+ break;
+ }
+ return null;
+ }
+
+ private ApiResponse processInitSessionRequest(ApiRequest apiRequest) throws ApiException {
+ ApiResponse response = new ApiResponse();
+
+ //create session
+ Session session = sessionManager.createSession();
+ if (session != null) {
+
+ //create consumer
+ ResultConsumer resultConsumer = new ResultConsumerImpl();
+ session.getResultDistributor().addConsumer(resultConsumer);
+
+ session.getResultDistributor().appendResult(new MessageModel("Welcome to arthas!"));
+
+ //welcome message
+ WelcomeModel welcomeModel = new WelcomeModel();
+ welcomeModel.setVersion(ArthasBanner.version());
+ welcomeModel.setWiki(ArthasBanner.wiki());
+ welcomeModel.setTutorials(ArthasBanner.tutorials());
+ welcomeModel.setPid(PidUtils.currentPid());
+ welcomeModel.setTime(DateUtils.getCurrentDate());
+ session.getResultDistributor().appendResult(welcomeModel);
+
+ //allow input
+ updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
+
+ response.setSessionId(session.getSessionId())
+ .setConsumerId(resultConsumer.getConsumerId())
+ .setState(ApiState.SUCCEEDED);
+ } else {
+ throw new ApiException("create api session failed");
+ }
+ return response;
+ }
+
+ /**
+ * Update session input status for all consumer
+ *
+ * @param session
+ * @param inputStatus
+ */
+ private void updateSessionInputStatus(Session session, InputStatus inputStatus) {
+ session.getResultDistributor().appendResult(new InputStatusModel(inputStatus));
+ }
+
+ private ApiResponse processJoinSessionRequest(ApiRequest apiRequest, Session session) {
+
+ //create consumer
+ ResultConsumer resultConsumer = new ResultConsumerImpl();
+ //disable input and interrupt
+ resultConsumer.appendResult(new InputStatusModel(InputStatus.DISABLED));
+ session.getResultDistributor().addConsumer(resultConsumer);
+
+ ApiResponse response = new ApiResponse();
+ response.setSessionId(session.getSessionId())
+ .setConsumerId(resultConsumer.getConsumerId())
+ .setState(ApiState.SUCCEEDED);
+ return response;
+ }
+
+ private ApiResponse processSessionInfoRequest(ApiRequest apiRequest, Session session) {
+ ApiResponse response = new ApiResponse();
+ Map body = new TreeMap();
+ body.put("pid", session.getPid());
+ body.put("createTime", session.getCreateTime());
+ body.put("lastAccessTime", session.getLastAccessTime());
+
+ response.setState(ApiState.SUCCEEDED)
+ .setSessionId(session.getSessionId())
+ //.setConsumerId(consumerId)
+ .setBody(body);
+ return response;
+ }
+
+ private ApiResponse processCloseSessionRequest(ApiRequest apiRequest, Session session) {
+ sessionManager.removeSession(session.getSessionId());
+ ApiResponse response = new ApiResponse();
+ response.setState(ApiState.SUCCEEDED);
+ return response;
+ }
+
+ /**
+ * Execute command sync, wait for job finish or timeout, sending results immediately
+ *
+ * @param apiRequest
+ * @param session
+ * @return
+ */
+ private ApiResponse processExecRequest(ApiRequest apiRequest, Session session) {
+ String commandLine = apiRequest.getCommand();
+ Map body = new TreeMap();
+ body.put("command", commandLine);
+
+ ApiResponse response = new ApiResponse();
+ response.setSessionId(session.getSessionId())
+ .setBody(body);
+
+ if (!session.tryLock()) {
+ response.setState(ApiState.REFUSED)
+ .setMessage("Another command is executing.");
+ return response;
+ }
+
+ int lock = session.getLock();
+ PackingResultDistributor packingResultDistributor = null;
+ Job job = null;
+ try {
+ Job foregroundJob = session.getForegroundJob();
+ if (foregroundJob != null) {
+ response.setState(ApiState.REFUSED)
+ .setMessage("Another job is running.");
+ logger.info("Another job is running, jobId: {}", foregroundJob.id());
+ return response;
+ }
+
+ //distribute result message both to origin session channel and request channel by CompositeResultDistributor
+ packingResultDistributor = new PackingResultDistributorImpl(session);
+ //ResultDistributor resultDistributor = new CompositeResultDistributorImpl(packingResultDistributor, session.getResultDistributor());
+ job = this.createJob(commandLine, session, packingResultDistributor);
+ session.setForegroundJob(job);
+ updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT);
+
+ job.run();
+
+ } catch (Throwable e) {
+ logger.error("Exec command failed:" + e.getMessage() + ", command:" + commandLine, e);
+ response.setState(ApiState.FAILED).setMessage("Exec command failed:" + e.getMessage());
+ return response;
+ } finally {
+ if (session.getLock() == lock) {
+ session.unLock();
+ }
+ }
+
+ //wait for job completed or timeout
+ Integer timeout = apiRequest.getTimeout();
+ if (timeout == null || timeout <= 0) {
+ timeout = DEFAULT_EXEC_TIMEOUT;
+ }
+ boolean timeExpired = !waitForJob(job, timeout);
+ if (timeExpired) {
+ logger.warn("Job is exceeded time limit, force interrupt it, jobId: {}", job.id());
+ job.interrupt();
+ response.setState(ApiState.INTERRUPTED).setMessage("The job is exceeded time limit, force interrupt");
+ } else {
+ response.setState(ApiState.SUCCEEDED);
+ }
+
+ //packing results
+ body.put("jobId", job.id());
+ body.put("jobStatus", job.status());
+ body.put("timeExpired", timeExpired);
+ if (timeExpired) {
+ body.put("timeout", timeout);
+ }
+ body.put("results", packingResultDistributor.getResults());
+
+ response.setSessionId(session.getSessionId())
+ //.setConsumerId(consumerId)
+ .setBody(body);
+ return response;
+ }
+
+ /**
+ * Execute command async, create and schedule the job running, but no wait for the results.
+ *
+ * @param apiRequest
+ * @param session
+ * @return
+ */
+ private ApiResponse processAsyncExecRequest(ApiRequest apiRequest, Session session) {
+ String commandLine = apiRequest.getCommand();
+ Map body = new TreeMap();
+ body.put("command", commandLine);
+
+ ApiResponse response = new ApiResponse();
+ response.setSessionId(session.getSessionId())
+ .setBody(body);
+
+ if (!session.tryLock()) {
+ response.setState(ApiState.REFUSED)
+ .setMessage("Another command is executing.");
+ return response;
+ }
+ int lock = session.getLock();
+ try {
+
+ Job foregroundJob = session.getForegroundJob();
+ if (foregroundJob != null) {
+ response.setState(ApiState.REFUSED)
+ .setMessage("Another job is running.");
+ logger.info("Another job is running, jobId: {}", foregroundJob.id());
+ return response;
+ }
+
+ //create job
+ Job job = this.createJob(commandLine, session, session.getResultDistributor());
+ body.put("jobId", job.id());
+ body.put("jobStatus", job.status());
+ response.setState(ApiState.SCHEDULED);
+
+ //add command before exec job
+ CommandRequestModel commandRequestModel = new CommandRequestModel(commandLine, response.getState());
+ commandRequestModel.setJobId(job.id());
+ session.getResultDistributor().appendResult(commandRequestModel);
+ session.setForegroundJob(job);
+ updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT);
+
+ //run job
+ job.run();
+
+ return response;
+ } catch (Throwable e) {
+ logger.error("Async exec command failed:" + e.getMessage() + ", command:" + commandLine, e);
+ response.setState(ApiState.FAILED).setMessage("Async exec command failed:" + e.getMessage());
+ CommandRequestModel commandRequestModel = new CommandRequestModel(commandLine, response.getState(), response.getMessage());
+ session.getResultDistributor().appendResult(commandRequestModel);
+ return response;
+ } finally {
+ if (session.getLock() == lock) {
+ session.unLock();
+ }
+ }
+ }
+
+ private ApiResponse processInterruptJob(ApiRequest apiRequest, Session session) {
+ Job job = session.getForegroundJob();
+ if (job == null) {
+ return new ApiResponse().setState(ApiState.FAILED).setMessage("no foreground job is running");
+ }
+ job.interrupt();
+
+ Map body = new TreeMap();
+ body.put("jobId", job.id());
+ body.put("jobStatus", job.status());
+ return new ApiResponse()
+ .setState(ApiState.SUCCEEDED)
+ .setBody(body);
+ }
+
+ /**
+ * Pull results from result queue
+ *
+ * @param apiRequest
+ * @param session
+ * @return
+ */
+ private ApiResponse processPullResultsRequest(ApiRequest apiRequest, Session session) throws ApiException {
+ String consumerId = apiRequest.getConsumerId();
+ if (StringUtils.isBlank(consumerId)) {
+ throw new ApiException("'consumerId' is required");
+ }
+ ResultConsumer consumer = session.getResultDistributor().getConsumer(consumerId);
+ if (consumer == null) {
+ throw new ApiException("consumer not found: " + consumerId);
+ }
+
+ List results = consumer.pollResults();
+ Map body = new TreeMap();
+ body.put("results", results);
+
+ ApiResponse response = new ApiResponse();
+ response.setState(ApiState.SUCCEEDED)
+ .setSessionId(session.getSessionId())
+ .setConsumerId(consumerId)
+ .setBody(body);
+ return response;
+ }
+
+ private boolean waitForJob(Job job, int timeout) {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ switch (job.status()) {
+ case STOPPED:
+ case TERMINATED:
+ return true;
+ }
+ if (System.currentTimeMillis() - startTime > timeout) {
+ return false;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private synchronized Job createJob(List args, Session session, ResultDistributor resultDistributor) {
+ Job job = jobController.createJob(commandManager, args, session, new ApiJobHandler(session), new ApiTerm(session), resultDistributor);
+ return job;
+ }
+
+ private Job createJob(String line, Session session, ResultDistributor resultDistributor) {
+ historyManager.addHistory(line);
+ historyManager.saveHistory();
+ return createJob(CliTokens.tokenize(line), session, resultDistributor);
+ }
+
+ private ApiResponse createResponse(ApiState apiState, String message) {
+ ApiResponse apiResponse = new ApiResponse();
+ apiResponse.setState(apiState);
+ apiResponse.setMessage(message);
+ return apiResponse;
+ }
+
+ private String getBody(FullHttpRequest request) {
+ ByteBuf buf = request.content();
+ return buf.toString(CharsetUtil.UTF_8);
+ }
+
+ private class ApiJobHandler implements JobListener {
+
+ private Session session;
+
+ public ApiJobHandler(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void onForeground(Job job) {
+ session.setForegroundJob(job);
+ }
+
+ @Override
+ public void onBackground(Job job) {
+ if (session.getForegroundJob() == job) {
+ session.setForegroundJob(null);
+ updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
+ }
+ }
+
+ @Override
+ public void onTerminated(Job job) {
+ if (session.getForegroundJob() == job) {
+ session.setForegroundJob(null);
+ updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
+ }
+ }
+
+ @Override
+ public void onSuspend(Job job) {
+ if (session.getForegroundJob() == job) {
+ session.setForegroundJob(null);
+ updateSessionInputStatus(session, InputStatus.ALLOW_INPUT);
+ }
+ }
+ }
+
+ private class ApiTerm implements Term {
+
+ private Session session;
+
+ public ApiTerm(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public Term resizehandler(Handler handler) {
+ return this;
+ }
+
+ @Override
+ public String type() {
+ return "web";
+ }
+
+ @Override
+ public int width() {
+ return 1000;
+ }
+
+ @Override
+ public int height() {
+ return 200;
+ }
+
+ @Override
+ public Term stdinHandler(Handler handler) {
+ return this;
+ }
+
+ @Override
+ public Term stdoutHandler(Function handler) {
+ return this;
+ }
+
+ @Override
+ public Term write(String data) {
+ return this;
+ }
+
+ @Override
+ public long lastAccessedTime() {
+ return session.getLastAccessTime();
+ }
+
+ @Override
+ public Term echo(String text) {
+ return this;
+ }
+
+ @Override
+ public Term setSession(Session session) {
+ return this;
+ }
+
+ @Override
+ public Term interruptHandler(SignalHandler handler) {
+ return this;
+ }
+
+ @Override
+ public Term suspendHandler(SignalHandler handler) {
+ return this;
+ }
+
+ @Override
+ public void readline(String prompt, Handler lineHandler) {
+
+ }
+
+ @Override
+ public void readline(String prompt, Handler lineHandler, Handler completionHandler) {
+
+ }
+
+ @Override
+ public Term closeHandler(Handler handler) {
+ return this;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java b/core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java
new file mode 100644
index 000000000..5d8fdca8b
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java
@@ -0,0 +1,60 @@
+package com.taobao.arthas.core.util;
+
+import io.netty.handler.codec.http.*;
+import io.netty.handler.codec.http.cookie.Cookie;
+import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Set;
+
+/**
+ * @author gongdewei 2020/3/31
+ */
+public class HttpUtils {
+
+ /**
+ * Get cookie value by name
+ * @param cookies request cookies
+ * @param cookieName the cookie name
+ */
+ public static String getCookieValue(Set cookies, String cookieName) {
+ for (Cookie cookie : cookies) {
+ if(cookie.name().equals(cookieName)){
+ return cookie.value();
+ }
+ }
+ return null;
+ }
+
+ /**
+ *
+ * @param response
+ * @param name
+ * @param value
+ */
+ public static void setCookie(DefaultFullHttpResponse response, String name, String value) {
+ response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(name, value));
+ }
+
+ /**
+ * Create http response with status code and content
+ * @param request request
+ * @param status response status code
+ * @param content response content
+ */
+ public static DefaultHttpResponse createResponse(FullHttpRequest request, HttpResponseStatus status, String content) {
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status);
+ response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=utf-8");
+ try {
+ response.content().writeBytes(content.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ }
+ return response;
+ }
+
+ public static HttpResponse createRedirectResponse(FullHttpRequest request, String url) {
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FOUND);
+ response.headers().set(HttpHeaderNames.LOCATION, url);
+ return response;
+ }
+}
diff --git a/core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java b/core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java
new file mode 100644
index 000000000..5ab6acb67
--- /dev/null
+++ b/core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java
@@ -0,0 +1,93 @@
+package com.taobao.arthas.core.util;
+
+import com.alibaba.fastjson.serializer.SerializeWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+
+/**
+ * @author gongdewei 2020/5/15
+ */
+public class JsonUtils {
+ private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class);
+ private static Field serializeWriterBufLocalField;
+ private static Field serializeWriterBytesBufLocal;
+ private static Field serializeWriterBufferThreshold;
+
+ /**
+ * Set Fastjson SerializeWriter Buffer Threshold
+ * @param value
+ */
+ public static void setSerializeWriterBufferThreshold(int value) {
+ Class clazz = SerializeWriter.class;
+ try {
+ if (serializeWriterBufferThreshold == null) {
+ serializeWriterBufferThreshold = clazz.getDeclaredField("BUFFER_THRESHOLD");
+ }
+ serializeWriterBufferThreshold.setAccessible(true);
+ serializeWriterBufferThreshold.set(null, value);
+ } catch (Exception e) {
+ logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e);
+ }
+ }
+
+ /**
+ * Set Fastjson SerializeWriter ThreadLocal value
+ * @param bufSize
+ */
+ public static void setSerializeWriterBufThreadLocal(int bufSize) {
+ Class clazz = SerializeWriter.class;
+ try {
+ //set threadLocal value
+ if (serializeWriterBufLocalField == null) {
+ serializeWriterBufLocalField = clazz.getDeclaredField("bufLocal");
+ }
+ serializeWriterBufLocalField.setAccessible(true);
+ ThreadLocal bufLocal = (ThreadLocal) serializeWriterBufLocalField.get(null);
+ char[] charsLocal = bufLocal.get();
+ if (charsLocal == null || charsLocal.length < bufSize) {
+ bufLocal.set(new char[bufSize]);
+ }
+
+ if (serializeWriterBytesBufLocal == null) {
+ serializeWriterBytesBufLocal = clazz.getDeclaredField("bytesBufLocal");
+ }
+ serializeWriterBytesBufLocal.setAccessible(true);
+ ThreadLocal bytesBufLocal = (ThreadLocal) serializeWriterBytesBufLocal.get(null);
+ byte[] bytesLocal = bytesBufLocal.get();
+ if (bytesLocal == null || bytesLocal.length < bufSize) {
+ bytesBufLocal.set(new byte[bufSize]);
+ }
+ } catch (Exception e) {
+ logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e);
+ }
+ }
+
+ /**
+ * Set Fastjson SerializeWriter ThreadLocal value
+ */
+ public static void setSerializeWriterBufThreadLocal(char[] charsBuf, byte[] bytesBuf) {
+ Class clazz = SerializeWriter.class;
+ try {
+ //set threadLocal value
+ if (serializeWriterBufLocalField == null) {
+ serializeWriterBufLocalField = clazz.getDeclaredField("bufLocal");
+ }
+ serializeWriterBufLocalField.setAccessible(true);
+ ThreadLocal bufLocal = (ThreadLocal) serializeWriterBufLocalField.get(null);
+ bufLocal.set(charsBuf);
+
+ if (serializeWriterBytesBufLocal == null) {
+ serializeWriterBytesBufLocal = clazz.getDeclaredField("bytesBufLocal");
+ }
+ serializeWriterBytesBufLocal.setAccessible(true);
+ ThreadLocal bytesBufLocal = (ThreadLocal) serializeWriterBytesBufLocal.get(null);
+ bytesBufLocal.set(bytesBuf);
+ } catch (Exception e) {
+ logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e);
+ }
+ }
+
+
+}