From 78316f340df9ea6dc1b738e173c50a2c3618470b Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 21:58:33 +0800 Subject: [PATCH] add http api handler --- .../command/model/CommandRequestModel.java | 54 ++ .../core/command/model/InputStatus.java | 22 + .../core/command/model/InputStatusModel.java | 28 + .../core/command/model/WelcomeModel.java | 61 ++ .../term/impl/http/HttpRequestHandler.java | 132 ++-- .../shell/term/impl/http/api/ApiAction.java | 48 ++ .../term/impl/http/api/ApiException.java | 16 + .../shell/term/impl/http/api/ApiRequest.java | 87 +++ .../shell/term/impl/http/api/ApiResponse.java | 79 ++ .../shell/term/impl/http/api/ApiState.java | 35 + .../term/impl/http/api/HttpApiHandler.java | 708 ++++++++++++++++++ .../taobao/arthas/core/util/HttpUtils.java | 60 ++ .../taobao/arthas/core/util/JsonUtils.java | 93 +++ 13 files changed, 1380 insertions(+), 43 deletions(-) create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java create mode 100644 core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiAction.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiException.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiRequest.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java create mode 100644 core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java create mode 100644 core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java create mode 100644 core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java new file mode 100644 index 000000000..47b6f5306 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/CommandRequestModel.java @@ -0,0 +1,54 @@ +package com.taobao.arthas.core.command.model; + +import com.taobao.arthas.core.shell.term.impl.http.api.ApiState; + +/** + * Command async exec process result, not the command exec result + * @author gongdewei 2020/4/2 + */ +public class CommandRequestModel extends ResultModel { + + private ApiState state; + private String command; + private String message; + + public CommandRequestModel(String command, ApiState state) { + this.command = command; + this.state = state; + } + + public CommandRequestModel(String command, ApiState state, String message) { + this.state = state; + this.command = command; + this.message = message; + } + + public String getCommand() { + return command; + } + + public void setCommand(String command) { + this.command = command; + } + + public ApiState getState() { + return state; + } + + public void setState(ApiState state) { + this.state = state; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String getType() { + return "command"; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java new file mode 100644 index 000000000..182a4a2f9 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatus.java @@ -0,0 +1,22 @@ +package com.taobao.arthas.core.command.model; + +/** + * Command input status for webui + * @author gongdewei 2020/4/14 + */ +public enum InputStatus { + /** + * Allow input new commands + */ + ALLOW_INPUT, + + /** + * Allow interrupt running job + */ + ALLOW_INTERRUPT, + + /** + * Disable input and interrupt + */ + DISABLED +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java new file mode 100644 index 000000000..b1da685a0 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/InputStatusModel.java @@ -0,0 +1,28 @@ +package com.taobao.arthas.core.command.model; + +/** + * Input status for webui + * @author gongdewei 2020/4/14 + */ +public class InputStatusModel extends ResultModel { + + private InputStatus inputStatus; + + public InputStatusModel(InputStatus inputStatus) { + this.inputStatus = inputStatus; + } + + public InputStatus getInputStatus() { + return inputStatus; + } + + public void setInputStatus(InputStatus inputStatus) { + this.inputStatus = inputStatus; + } + + @Override + public String getType() { + return "input_status"; + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java b/core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java new file mode 100644 index 000000000..11bafd572 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/command/model/WelcomeModel.java @@ -0,0 +1,61 @@ +package com.taobao.arthas.core.command.model; + +/** + * @author gongdewei 2020/4/20 + */ +public class WelcomeModel extends ResultModel { + + private String pid; + private String time; + private String version; + private String wiki; + private String tutorials; + + public WelcomeModel() { + } + + @Override + public String getType() { + return "welcome"; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getTime() { + return time; + } + + public void setTime(String time) { + this.time = time; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getWiki() { + return wiki; + } + + public void setWiki(String wiki) { + this.wiki = wiki; + } + + public String getTutorials() { + return tutorials; + } + + public void setTutorials(String tutorials) { + this.tutorials = tutorials; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java index 8c101168e..d417e3518 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java @@ -1,6 +1,7 @@ package com.taobao.arthas.core.shell.term.impl.http; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URL; @@ -8,6 +9,7 @@ import java.net.URL; import com.alibaba.arthas.deps.org.slf4j.Logger; import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; import com.taobao.arthas.common.IOUtils; +import com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler; import com.taobao.arthas.core.shell.term.impl.httptelnet.HttpTelnetTermServer; import io.netty.channel.ChannelFuture; @@ -15,7 +17,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; @@ -27,9 +28,13 @@ import io.netty.handler.codec.http.LastHttpContent; import io.termd.core.http.HttpTtyConnection; import io.termd.core.util.Logging; +import static com.taobao.arthas.core.util.HttpUtils.createRedirectResponse; +import static com.taobao.arthas.core.util.HttpUtils.createResponse; + /** * @author Julien Viet * @author hengyunabc 2019-11-06 + * @author gongdewei 2020-03-18 */ public class HttpRequestHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(HttpTelnetTermServer.class); @@ -38,10 +43,13 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler options; + + @Override + public String toString() { + return "ApiRequest{" + + "action='" + action + '\'' + + ", command='" + command + '\'' + + ", requestId='" + requestId + '\'' + + ", sessionId='" + sessionId + '\'' + + ", consumerId='" + consumerId + '\'' + + ", timeout=" + timeout + + ", options=" + options + + '}'; + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public String getCommand() { + return command; + } + + public void setCommand(String command) { + this.command = command; + } + + public Map getOptions() { + return options; + } + + public void setOptions(Map options) { + this.options = options; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getSessionId() { + return sessionId; + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public String getConsumerId() { + return consumerId; + } + + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; + } + + public Integer getTimeout() { + return timeout; + } + + public void setTimeout(Integer timeout) { + this.timeout = timeout; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java new file mode 100644 index 000000000..e0d16a518 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java @@ -0,0 +1,79 @@ +package com.taobao.arthas.core.shell.term.impl.http.api; + +/** + * Http Api exception + * @author gongdewei 2020-03-19 + */ +public class ApiResponse { + private String requestId; + private ApiState state; + private String message; + private String sessionId; + private String consumerId; + private String jobId; + private T body; + + public String getRequestId() { + return requestId; + } + + public ApiResponse setRequestId(String requestId) { + this.requestId = requestId; + return this; + } + + public ApiState getState() { + return state; + } + + public ApiResponse setState(ApiState state) { + this.state = state; + return this; + } + + public String getMessage() { + return message; + } + + public ApiResponse setMessage(String message) { + this.message = message; + return this; + } + + public String getSessionId() { + return sessionId; + } + + public ApiResponse setSessionId(String sessionId) { + this.sessionId = sessionId; + return this; + } + + public String getConsumerId() { + return consumerId; + } + + public ApiResponse setConsumerId(String consumerId) { + this.consumerId = consumerId; + return this; + } + + public String getJobId() { + return jobId; + } + + public ApiResponse setJobId(String jobId) { + this.jobId = jobId; + return this; + } + + public T getBody() { + return body; + } + + public ApiResponse setBody(T body) { + this.body = body; + return this; + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java new file mode 100644 index 000000000..e222bf50e --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiState.java @@ -0,0 +1,35 @@ +package com.taobao.arthas.core.shell.term.impl.http.api; + +/** + * Http API response state + * + * @author gongdewei 2020-03-19 + */ +public enum ApiState { + /** + * Scheduled async exec job + */ + SCHEDULED, + +// RUNNING, + + /** + * Request processed successfully + */ + SUCCEEDED, + + /** + * Request processing interrupt + */ + INTERRUPTED, + + /** + * Request processing failed + */ + FAILED, + + /** + * Request is refused + */ + REFUSED +} diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java new file mode 100644 index 000000000..b614ca580 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java @@ -0,0 +1,708 @@ +package com.taobao.arthas.core.shell.term.impl.http.api; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.alibaba.fastjson.JSON; +import com.taobao.arthas.common.PidUtils; +import com.taobao.arthas.core.command.model.*; +import com.taobao.arthas.core.distribution.PackingResultDistributor; +import com.taobao.arthas.core.distribution.ResultConsumer; +import com.taobao.arthas.core.distribution.ResultDistributor; +import com.taobao.arthas.core.distribution.impl.PackingResultDistributorImpl; +import com.taobao.arthas.core.distribution.impl.ResultConsumerImpl; +import com.taobao.arthas.core.server.ArthasBootstrap; +import com.taobao.arthas.core.shell.cli.CliToken; +import com.taobao.arthas.core.shell.cli.CliTokens; +import com.taobao.arthas.core.shell.cli.Completion; +import com.taobao.arthas.core.shell.handlers.Handler; +import com.taobao.arthas.core.shell.history.HistoryManager; +import com.taobao.arthas.core.shell.history.impl.HistoryManagerImpl; +import com.taobao.arthas.core.shell.session.Session; +import com.taobao.arthas.core.shell.session.SessionManager; +import com.taobao.arthas.core.shell.system.Job; +import com.taobao.arthas.core.shell.system.JobListener; +import com.taobao.arthas.core.shell.system.impl.InternalCommandManager; +import com.taobao.arthas.core.shell.system.impl.JobControllerImpl; +import com.taobao.arthas.core.shell.term.SignalHandler; +import com.taobao.arthas.core.shell.term.Term; +import com.taobao.arthas.core.util.ArthasBanner; +import com.taobao.arthas.core.util.DateUtils; +import com.taobao.arthas.core.util.JsonUtils; +import com.taobao.arthas.core.util.StringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.*; +import io.netty.util.CharsetUtil; +import io.termd.core.function.Function; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Http Restful Api Handler + * + * @author gongdewei 2020-03-18 + */ +public class HttpApiHandler { + + private static final Logger logger = LoggerFactory.getLogger(HttpApiHandler.class); + public static final int DEFAULT_EXEC_TIMEOUT = 30000; + private final SessionManager sessionManager; + private final AtomicInteger requestIdGenerator = new AtomicInteger(0); + private static HttpApiHandler instance; + private final InternalCommandManager commandManager; + private final JobControllerImpl jobController; + private final HistoryManager historyManager; + + private int jsonBufferSize = 1024 * 256; + private int poolSize = 8; + private ArrayBlockingQueue byteBufPool = new ArrayBlockingQueue(poolSize); + private ArrayBlockingQueue charsBufPool = new ArrayBlockingQueue(poolSize); + private ArrayBlockingQueue bytesPool = new ArrayBlockingQueue(poolSize); + + public static HttpApiHandler getInstance() { + if (instance == null) { + synchronized (HttpApiHandler.class) { + instance = new HttpApiHandler(); + } + } + return instance; + } + + private HttpApiHandler() { + sessionManager = ArthasBootstrap.getInstance().getSessionManager(); + commandManager = sessionManager.getCommandManager(); + jobController = sessionManager.getJobController(); + historyManager = HistoryManagerImpl.getInstance(); + + //init buf pool + JsonUtils.setSerializeWriterBufferThreshold(jsonBufferSize); + for (int i = 0; i < poolSize; i++) { + byteBufPool.offer(Unpooled.buffer(jsonBufferSize)); + charsBufPool.offer(new char[jsonBufferSize]); + bytesPool.offer(new byte[jsonBufferSize]); + } + } + + public HttpResponse handle(FullHttpRequest request) throws Exception { + + ApiResponse result; + String requestBody = null; + String requestId = "req_" + requestIdGenerator.addAndGet(1); + try { + HttpMethod method = request.method(); + if (HttpMethod.POST.equals(method)) { + requestBody = getBody(request); + ApiRequest apiRequest = parseRequest(requestBody); + apiRequest.setRequestId(requestId); + result = processRequest(apiRequest); + } else { + result = createResponse(ApiState.REFUSED, "Unsupported http method: " + method.name()); + } + } catch (Throwable e) { + result = createResponse(ApiState.FAILED, "Process request error: " + e.getMessage()); + logger.error("arthas process http api request error: " + request.uri() + ", request body: " + requestBody, e); + } + if (result == null) { + result = createResponse(ApiState.FAILED, "The request was not processed"); + } + result.setRequestId(requestId); + + + //http response content + ByteBuf content = null; + //fastjson buf + char[] charsBuf = null; + byte[] bytesBuf = null; + + try { + //apply response content buf first + content = byteBufPool.poll(2000, TimeUnit.MILLISECONDS); + if (content == null) { + throw new ApiException("get response content buf failure"); + } + + //apply fastjson buf from pool + charsBuf = charsBufPool.poll(); + bytesBuf = bytesPool.poll(); + if (charsBuf == null || bytesBuf == null) { + throw new ApiException("get json buf failure"); + } + JsonUtils.setSerializeWriterBufThreadLocal(charsBuf, bytesBuf); + + //create http response + DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), + HttpResponseStatus.OK, content.retain()); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8"); + writeResult(response, result); + return response; + } catch (Exception e) { + //response is discarded + if (content != null) { + content.release(); + byteBufPool.offer(content); + } + throw e; + } finally { + //give back json buf to pool + JsonUtils.setSerializeWriterBufThreadLocal(null, null); + if (charsBuf != null) { + charsBufPool.offer(charsBuf); + } + if (bytesBuf != null) { + bytesPool.offer(bytesBuf); + } + } + } + + public void onCompleted(DefaultFullHttpResponse httpResponse) { + ByteBuf content = httpResponse.content(); + content.clear(); + if (content.capacity() == jsonBufferSize) { + if (!byteBufPool.offer(content)) { + content.release(); + } + } else { + //replace content ByteBuf + content.release(); + if (byteBufPool.remainingCapacity() > 0) { + byteBufPool.offer(Unpooled.buffer(jsonBufferSize)); + } + } + } + + private void writeResult(DefaultFullHttpResponse response, Object result) throws IOException { + ByteBufOutputStream out = new ByteBufOutputStream(response.content()); + try { + JSON.writeJSONString(out, result); + } catch (IOException e) { + logger.error("write json to response failed", e); + throw e; + } + } + + private ApiRequest parseRequest(String requestBody) throws ApiException { + if (StringUtils.isBlank(requestBody)) { + throw new ApiException("parse request failed: request body is empty"); + } + try { + //ObjectMapper objectMapper = new ObjectMapper(); + //return objectMapper.readValue(requestBody, ApiRequest.class); + return JSON.parseObject(requestBody, ApiRequest.class); + } catch (Exception e) { + throw new ApiException("parse request failed: " + e.getMessage(), e); + } + } + + private ApiResponse processRequest(ApiRequest apiRequest) { + + String actionStr = apiRequest.getAction(); + try { + if (StringUtils.isBlank(actionStr)) { + throw new ApiException("'action' is required"); + } + ApiAction action; + try { + action = ApiAction.valueOf(actionStr.trim().toUpperCase()); + } catch (IllegalArgumentException e) { + throw new ApiException("unknown action: " + actionStr); + } + + //no session required + if (ApiAction.INIT_SESSION.equals(action)) { + return processInitSessionRequest(apiRequest); + } + + //required session + String sessionId = apiRequest.getSessionId(); + if (StringUtils.isBlank(sessionId)) { + throw new ApiException("'sessionId' is required"); + } + Session session = sessionManager.getSession(sessionId); + if (session == null) { + throw new ApiException("session not found: " + sessionId); + } + sessionManager.updateAccessTime(session); + + //dispatch requests + ApiResponse response = dispatchRequest(action, apiRequest, session); + if (response != null) { + return response; + } + + } catch (ApiException e) { + logger.info("process http api request failed: {}", e.getMessage()); + return createResponse(ApiState.FAILED, e.getMessage()); + } catch (Throwable e) { + logger.error("process http api request failed: " + e.getMessage(), e); + return createResponse(ApiState.FAILED, "process http api request failed: " + e.getMessage()); + } + + return createResponse(ApiState.REFUSED, "Unsupported action: " + actionStr); + } + + private ApiResponse dispatchRequest(ApiAction action, ApiRequest apiRequest, Session session) throws ApiException { + switch (action) { + case EXEC: + return processExecRequest(apiRequest, session); + case ASYNC_EXEC: + return processAsyncExecRequest(apiRequest, session); + case INTERRUPT_JOB: + return processInterruptJob(apiRequest, session); + case PULL_RESULTS: + return processPullResultsRequest(apiRequest, session); + case SESSION_INFO: + return processSessionInfoRequest(apiRequest, session); + case JOIN_SESSION: + return processJoinSessionRequest(apiRequest, session); + case CLOSE_SESSION: + return processCloseSessionRequest(apiRequest, session); + case INIT_SESSION: + break; + } + return null; + } + + private ApiResponse processInitSessionRequest(ApiRequest apiRequest) throws ApiException { + ApiResponse response = new ApiResponse(); + + //create session + Session session = sessionManager.createSession(); + if (session != null) { + + //create consumer + ResultConsumer resultConsumer = new ResultConsumerImpl(); + session.getResultDistributor().addConsumer(resultConsumer); + + session.getResultDistributor().appendResult(new MessageModel("Welcome to arthas!")); + + //welcome message + WelcomeModel welcomeModel = new WelcomeModel(); + welcomeModel.setVersion(ArthasBanner.version()); + welcomeModel.setWiki(ArthasBanner.wiki()); + welcomeModel.setTutorials(ArthasBanner.tutorials()); + welcomeModel.setPid(PidUtils.currentPid()); + welcomeModel.setTime(DateUtils.getCurrentDate()); + session.getResultDistributor().appendResult(welcomeModel); + + //allow input + updateSessionInputStatus(session, InputStatus.ALLOW_INPUT); + + response.setSessionId(session.getSessionId()) + .setConsumerId(resultConsumer.getConsumerId()) + .setState(ApiState.SUCCEEDED); + } else { + throw new ApiException("create api session failed"); + } + return response; + } + + /** + * Update session input status for all consumer + * + * @param session + * @param inputStatus + */ + private void updateSessionInputStatus(Session session, InputStatus inputStatus) { + session.getResultDistributor().appendResult(new InputStatusModel(inputStatus)); + } + + private ApiResponse processJoinSessionRequest(ApiRequest apiRequest, Session session) { + + //create consumer + ResultConsumer resultConsumer = new ResultConsumerImpl(); + //disable input and interrupt + resultConsumer.appendResult(new InputStatusModel(InputStatus.DISABLED)); + session.getResultDistributor().addConsumer(resultConsumer); + + ApiResponse response = new ApiResponse(); + response.setSessionId(session.getSessionId()) + .setConsumerId(resultConsumer.getConsumerId()) + .setState(ApiState.SUCCEEDED); + return response; + } + + private ApiResponse processSessionInfoRequest(ApiRequest apiRequest, Session session) { + ApiResponse response = new ApiResponse(); + Map body = new TreeMap(); + body.put("pid", session.getPid()); + body.put("createTime", session.getCreateTime()); + body.put("lastAccessTime", session.getLastAccessTime()); + + response.setState(ApiState.SUCCEEDED) + .setSessionId(session.getSessionId()) + //.setConsumerId(consumerId) + .setBody(body); + return response; + } + + private ApiResponse processCloseSessionRequest(ApiRequest apiRequest, Session session) { + sessionManager.removeSession(session.getSessionId()); + ApiResponse response = new ApiResponse(); + response.setState(ApiState.SUCCEEDED); + return response; + } + + /** + * Execute command sync, wait for job finish or timeout, sending results immediately + * + * @param apiRequest + * @param session + * @return + */ + private ApiResponse processExecRequest(ApiRequest apiRequest, Session session) { + String commandLine = apiRequest.getCommand(); + Map body = new TreeMap(); + body.put("command", commandLine); + + ApiResponse response = new ApiResponse(); + response.setSessionId(session.getSessionId()) + .setBody(body); + + if (!session.tryLock()) { + response.setState(ApiState.REFUSED) + .setMessage("Another command is executing."); + return response; + } + + int lock = session.getLock(); + PackingResultDistributor packingResultDistributor = null; + Job job = null; + try { + Job foregroundJob = session.getForegroundJob(); + if (foregroundJob != null) { + response.setState(ApiState.REFUSED) + .setMessage("Another job is running."); + logger.info("Another job is running, jobId: {}", foregroundJob.id()); + return response; + } + + //distribute result message both to origin session channel and request channel by CompositeResultDistributor + packingResultDistributor = new PackingResultDistributorImpl(session); + //ResultDistributor resultDistributor = new CompositeResultDistributorImpl(packingResultDistributor, session.getResultDistributor()); + job = this.createJob(commandLine, session, packingResultDistributor); + session.setForegroundJob(job); + updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT); + + job.run(); + + } catch (Throwable e) { + logger.error("Exec command failed:" + e.getMessage() + ", command:" + commandLine, e); + response.setState(ApiState.FAILED).setMessage("Exec command failed:" + e.getMessage()); + return response; + } finally { + if (session.getLock() == lock) { + session.unLock(); + } + } + + //wait for job completed or timeout + Integer timeout = apiRequest.getTimeout(); + if (timeout == null || timeout <= 0) { + timeout = DEFAULT_EXEC_TIMEOUT; + } + boolean timeExpired = !waitForJob(job, timeout); + if (timeExpired) { + logger.warn("Job is exceeded time limit, force interrupt it, jobId: {}", job.id()); + job.interrupt(); + response.setState(ApiState.INTERRUPTED).setMessage("The job is exceeded time limit, force interrupt"); + } else { + response.setState(ApiState.SUCCEEDED); + } + + //packing results + body.put("jobId", job.id()); + body.put("jobStatus", job.status()); + body.put("timeExpired", timeExpired); + if (timeExpired) { + body.put("timeout", timeout); + } + body.put("results", packingResultDistributor.getResults()); + + response.setSessionId(session.getSessionId()) + //.setConsumerId(consumerId) + .setBody(body); + return response; + } + + /** + * Execute command async, create and schedule the job running, but no wait for the results. + * + * @param apiRequest + * @param session + * @return + */ + private ApiResponse processAsyncExecRequest(ApiRequest apiRequest, Session session) { + String commandLine = apiRequest.getCommand(); + Map body = new TreeMap(); + body.put("command", commandLine); + + ApiResponse response = new ApiResponse(); + response.setSessionId(session.getSessionId()) + .setBody(body); + + if (!session.tryLock()) { + response.setState(ApiState.REFUSED) + .setMessage("Another command is executing."); + return response; + } + int lock = session.getLock(); + try { + + Job foregroundJob = session.getForegroundJob(); + if (foregroundJob != null) { + response.setState(ApiState.REFUSED) + .setMessage("Another job is running."); + logger.info("Another job is running, jobId: {}", foregroundJob.id()); + return response; + } + + //create job + Job job = this.createJob(commandLine, session, session.getResultDistributor()); + body.put("jobId", job.id()); + body.put("jobStatus", job.status()); + response.setState(ApiState.SCHEDULED); + + //add command before exec job + CommandRequestModel commandRequestModel = new CommandRequestModel(commandLine, response.getState()); + commandRequestModel.setJobId(job.id()); + session.getResultDistributor().appendResult(commandRequestModel); + session.setForegroundJob(job); + updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT); + + //run job + job.run(); + + return response; + } catch (Throwable e) { + logger.error("Async exec command failed:" + e.getMessage() + ", command:" + commandLine, e); + response.setState(ApiState.FAILED).setMessage("Async exec command failed:" + e.getMessage()); + CommandRequestModel commandRequestModel = new CommandRequestModel(commandLine, response.getState(), response.getMessage()); + session.getResultDistributor().appendResult(commandRequestModel); + return response; + } finally { + if (session.getLock() == lock) { + session.unLock(); + } + } + } + + private ApiResponse processInterruptJob(ApiRequest apiRequest, Session session) { + Job job = session.getForegroundJob(); + if (job == null) { + return new ApiResponse().setState(ApiState.FAILED).setMessage("no foreground job is running"); + } + job.interrupt(); + + Map body = new TreeMap(); + body.put("jobId", job.id()); + body.put("jobStatus", job.status()); + return new ApiResponse() + .setState(ApiState.SUCCEEDED) + .setBody(body); + } + + /** + * Pull results from result queue + * + * @param apiRequest + * @param session + * @return + */ + private ApiResponse processPullResultsRequest(ApiRequest apiRequest, Session session) throws ApiException { + String consumerId = apiRequest.getConsumerId(); + if (StringUtils.isBlank(consumerId)) { + throw new ApiException("'consumerId' is required"); + } + ResultConsumer consumer = session.getResultDistributor().getConsumer(consumerId); + if (consumer == null) { + throw new ApiException("consumer not found: " + consumerId); + } + + List results = consumer.pollResults(); + Map body = new TreeMap(); + body.put("results", results); + + ApiResponse response = new ApiResponse(); + response.setState(ApiState.SUCCEEDED) + .setSessionId(session.getSessionId()) + .setConsumerId(consumerId) + .setBody(body); + return response; + } + + private boolean waitForJob(Job job, int timeout) { + long startTime = System.currentTimeMillis(); + while (true) { + switch (job.status()) { + case STOPPED: + case TERMINATED: + return true; + } + if (System.currentTimeMillis() - startTime > timeout) { + return false; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + } + + private synchronized Job createJob(List args, Session session, ResultDistributor resultDistributor) { + Job job = jobController.createJob(commandManager, args, session, new ApiJobHandler(session), new ApiTerm(session), resultDistributor); + return job; + } + + private Job createJob(String line, Session session, ResultDistributor resultDistributor) { + historyManager.addHistory(line); + historyManager.saveHistory(); + return createJob(CliTokens.tokenize(line), session, resultDistributor); + } + + private ApiResponse createResponse(ApiState apiState, String message) { + ApiResponse apiResponse = new ApiResponse(); + apiResponse.setState(apiState); + apiResponse.setMessage(message); + return apiResponse; + } + + private String getBody(FullHttpRequest request) { + ByteBuf buf = request.content(); + return buf.toString(CharsetUtil.UTF_8); + } + + private class ApiJobHandler implements JobListener { + + private Session session; + + public ApiJobHandler(Session session) { + this.session = session; + } + + @Override + public void onForeground(Job job) { + session.setForegroundJob(job); + } + + @Override + public void onBackground(Job job) { + if (session.getForegroundJob() == job) { + session.setForegroundJob(null); + updateSessionInputStatus(session, InputStatus.ALLOW_INPUT); + } + } + + @Override + public void onTerminated(Job job) { + if (session.getForegroundJob() == job) { + session.setForegroundJob(null); + updateSessionInputStatus(session, InputStatus.ALLOW_INPUT); + } + } + + @Override + public void onSuspend(Job job) { + if (session.getForegroundJob() == job) { + session.setForegroundJob(null); + updateSessionInputStatus(session, InputStatus.ALLOW_INPUT); + } + } + } + + private class ApiTerm implements Term { + + private Session session; + + public ApiTerm(Session session) { + this.session = session; + } + + @Override + public Term resizehandler(Handler handler) { + return this; + } + + @Override + public String type() { + return "web"; + } + + @Override + public int width() { + return 1000; + } + + @Override + public int height() { + return 200; + } + + @Override + public Term stdinHandler(Handler handler) { + return this; + } + + @Override + public Term stdoutHandler(Function handler) { + return this; + } + + @Override + public Term write(String data) { + return this; + } + + @Override + public long lastAccessedTime() { + return session.getLastAccessTime(); + } + + @Override + public Term echo(String text) { + return this; + } + + @Override + public Term setSession(Session session) { + return this; + } + + @Override + public Term interruptHandler(SignalHandler handler) { + return this; + } + + @Override + public Term suspendHandler(SignalHandler handler) { + return this; + } + + @Override + public void readline(String prompt, Handler lineHandler) { + + } + + @Override + public void readline(String prompt, Handler lineHandler, Handler completionHandler) { + + } + + @Override + public Term closeHandler(Handler handler) { + return this; + } + + @Override + public void close() { + + } + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java b/core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java new file mode 100644 index 000000000..5d8fdca8b --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/util/HttpUtils.java @@ -0,0 +1,60 @@ +package com.taobao.arthas.core.util; + +import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http.cookie.ServerCookieEncoder; + +import java.io.UnsupportedEncodingException; +import java.util.Set; + +/** + * @author gongdewei 2020/3/31 + */ +public class HttpUtils { + + /** + * Get cookie value by name + * @param cookies request cookies + * @param cookieName the cookie name + */ + public static String getCookieValue(Set cookies, String cookieName) { + for (Cookie cookie : cookies) { + if(cookie.name().equals(cookieName)){ + return cookie.value(); + } + } + return null; + } + + /** + * + * @param response + * @param name + * @param value + */ + public static void setCookie(DefaultFullHttpResponse response, String name, String value) { + response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(name, value)); + } + + /** + * Create http response with status code and content + * @param request request + * @param status response status code + * @param content response content + */ + public static DefaultHttpResponse createResponse(FullHttpRequest request, HttpResponseStatus status, String content) { + DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=utf-8"); + try { + response.content().writeBytes(content.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + } + return response; + } + + public static HttpResponse createRedirectResponse(FullHttpRequest request, String url) { + DefaultFullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FOUND); + response.headers().set(HttpHeaderNames.LOCATION, url); + return response; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java b/core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java new file mode 100644 index 000000000..5ab6acb67 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/util/JsonUtils.java @@ -0,0 +1,93 @@ +package com.taobao.arthas.core.util; + +import com.alibaba.fastjson.serializer.SerializeWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; + +/** + * @author gongdewei 2020/5/15 + */ +public class JsonUtils { + private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class); + private static Field serializeWriterBufLocalField; + private static Field serializeWriterBytesBufLocal; + private static Field serializeWriterBufferThreshold; + + /** + * Set Fastjson SerializeWriter Buffer Threshold + * @param value + */ + public static void setSerializeWriterBufferThreshold(int value) { + Class clazz = SerializeWriter.class; + try { + if (serializeWriterBufferThreshold == null) { + serializeWriterBufferThreshold = clazz.getDeclaredField("BUFFER_THRESHOLD"); + } + serializeWriterBufferThreshold.setAccessible(true); + serializeWriterBufferThreshold.set(null, value); + } catch (Exception e) { + logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e); + } + } + + /** + * Set Fastjson SerializeWriter ThreadLocal value + * @param bufSize + */ + public static void setSerializeWriterBufThreadLocal(int bufSize) { + Class clazz = SerializeWriter.class; + try { + //set threadLocal value + if (serializeWriterBufLocalField == null) { + serializeWriterBufLocalField = clazz.getDeclaredField("bufLocal"); + } + serializeWriterBufLocalField.setAccessible(true); + ThreadLocal bufLocal = (ThreadLocal) serializeWriterBufLocalField.get(null); + char[] charsLocal = bufLocal.get(); + if (charsLocal == null || charsLocal.length < bufSize) { + bufLocal.set(new char[bufSize]); + } + + if (serializeWriterBytesBufLocal == null) { + serializeWriterBytesBufLocal = clazz.getDeclaredField("bytesBufLocal"); + } + serializeWriterBytesBufLocal.setAccessible(true); + ThreadLocal bytesBufLocal = (ThreadLocal) serializeWriterBytesBufLocal.get(null); + byte[] bytesLocal = bytesBufLocal.get(); + if (bytesLocal == null || bytesLocal.length < bufSize) { + bytesBufLocal.set(new byte[bufSize]); + } + } catch (Exception e) { + logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e); + } + } + + /** + * Set Fastjson SerializeWriter ThreadLocal value + */ + public static void setSerializeWriterBufThreadLocal(char[] charsBuf, byte[] bytesBuf) { + Class clazz = SerializeWriter.class; + try { + //set threadLocal value + if (serializeWriterBufLocalField == null) { + serializeWriterBufLocalField = clazz.getDeclaredField("bufLocal"); + } + serializeWriterBufLocalField.setAccessible(true); + ThreadLocal bufLocal = (ThreadLocal) serializeWriterBufLocalField.get(null); + bufLocal.set(charsBuf); + + if (serializeWriterBytesBufLocal == null) { + serializeWriterBytesBufLocal = clazz.getDeclaredField("bytesBufLocal"); + } + serializeWriterBytesBufLocal.setAccessible(true); + ThreadLocal bytesBufLocal = (ThreadLocal) serializeWriterBytesBufLocal.get(null); + bytesBufLocal.set(bytesBuf); + } catch (Exception e) { + logger.error("update SerializeWriter.BUFFER_THRESHOLD value failed", e); + } + } + + +}