From 11563c04545af00962b6fe64033598d7df0200b8 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Tue, 25 May 2021 12:03:32 +0800 Subject: [PATCH] Support /legacy_api, improve message cleanup --- .../src/main/resources/application.properties | 8 + .../ChannelServerAutoConfiguration.java | 12 +- .../ChannelServerProperties.java | 42 +++- channel/channel-server/pom.xml | 42 +++- .../impl/MessageExchangeServiceImpl.java | 44 +++- .../RedisMessageExchangeServiceImpl.java | 31 ++- .../server/web/LegacyApiController.java | 237 ++++++++++++++++++ .../core/channel/ChannelRequestHandler.java | 3 +- .../arthas/core/server/ArthasBootstrap.java | 2 +- .../shell/term/impl/http/api/ApiRequest.java | 1 - .../shell/term/impl/http/api/ApiResponse.java | 12 +- .../term/impl/http/api/HttpApiHandler.java | 11 +- 12 files changed, 423 insertions(+), 22 deletions(-) create mode 100644 channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java diff --git a/channel/channel-server-app/src/main/resources/application.properties b/channel/channel-server-app/src/main/resources/application.properties index 3d09e8162..bd49b9086 100755 --- a/channel/channel-server-app/src/main/resources/application.properties +++ b/channel/channel-server-app/src/main/resources/application.properties @@ -15,6 +15,14 @@ arthas.channel.server.websocket.port=8801 # whether start agent cleaner arthas.channel.server.agent-cleaner.enabled=true +# message exchange topic survival time +arthas.channel.server.message-exchange.topic-survival-time-mills=60000 + +# message exchange topic capacity +arthas.channel.server.message-exchange.topic-capacity=1000 + + + # for all endpoints management.endpoints.web.exposure.include=* diff --git a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java index 9ab634495..66b3ee06c 100644 --- a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java +++ b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java @@ -131,8 +131,10 @@ public class ChannelServerAutoConfiguration { @Bean(initMethod = "start", destroyMethod = "stop") @ConditionalOnMissingBean - public MessageExchangeService messageExchangeService() { - return new MessageExchangeServiceImpl(); + public MessageExchangeService messageExchangeService(ChannelServerProperties serverProperties) { + int capacity = serverProperties.getMessageExchange().getTopicCapacity(); + int survivalTimeMills = serverProperties.getMessageExchange().getTopicSurvivalTimeMills(); + return new MessageExchangeServiceImpl(capacity, survivalTimeMills); } } @@ -148,8 +150,10 @@ public class ChannelServerAutoConfiguration { @Bean @ConditionalOnMissingBean - public MessageExchangeService messageExchangeService() { - return new RedisMessageExchangeServiceImpl(); + public MessageExchangeService messageExchangeService(ChannelServerProperties serverProperties) { + int capacity = serverProperties.getMessageExchange().getTopicCapacity(); + int survivalTimeMills = serverProperties.getMessageExchange().getTopicSurvivalTimeMills(); + return new RedisMessageExchangeServiceImpl(capacity, survivalTimeMills); } @Bean diff --git a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java index 1b0adf462..e97f425fb 100644 --- a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java +++ b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java @@ -8,6 +8,7 @@ public class ChannelServerProperties { public static final String PREFIX = "arthas.channel.server"; private Server websocket = new Server(); private Server backend = new Server(); + private MessageExchange messageExchange = new MessageExchange(); private AgentCleaner agentCleaner = new AgentCleaner(); @@ -35,6 +36,14 @@ public class ChannelServerProperties { this.agentCleaner = agentCleaner; } + public MessageExchange getMessageExchange() { + return messageExchange; + } + + public void setMessageExchange(MessageExchange messageExchange) { + this.messageExchange = messageExchange; + } + public static class Server { private String host; private int port; @@ -76,9 +85,9 @@ public class ChannelServerProperties { public static class AgentCleaner { private int cleanIntervalMills = 5000; - private int outOfServiceTimeoutMills = 15*1000; - private int downTimeoutMills = 30*1000; - private int removingTimeoutMills = 1800*1000; + private int outOfServiceTimeoutMills = 15000; + private int downTimeoutMills = 30000; + private int removingTimeoutMills = 1800000; private boolean enabled = true; public int getCleanIntervalMills() { @@ -122,4 +131,31 @@ public class ChannelServerProperties { } } + public static class MessageExchange { + private int topicSurvivalTimeMills = 60000; + private int topicCapacity = 1000; + + public int getTopicSurvivalTimeMills() { + return topicSurvivalTimeMills; + } + + /** + * message topic survival seconds + */ + public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) { + this.topicSurvivalTimeMills = topicSurvivalTimeMills; + } + + public int getTopicCapacity() { + return topicCapacity; + } + + /** + * message queue capacity + */ + public void setTopicCapacity(int topicCapacity) { + this.topicCapacity = topicCapacity; + } + } + } diff --git a/channel/channel-server/pom.xml b/channel/channel-server/pom.xml index 791db64cd..5271db3ee 100644 --- a/channel/channel-server/pom.xml +++ b/channel/channel-server/pom.xml @@ -69,8 +69,6 @@ com.taobao.arthas arthas-core ${project.version} - test - true arthas-tunnel-client @@ -80,6 +78,46 @@ termd-core com.alibaba.middleware + + arthas-channel-client + com.taobao.arthas + + + arthas-memorycompiler + com.taobao.arthas + + + arthas-vmtool + com.taobao.arthas + + + ognl + ognl + + + bytekit-core + com.alibaba + + + cfr + org.benf + + + cli + com.alibaba.middleware + + + text-ui + com.taobao.text + + + rsyntaxtextarea + com.fifesoft + + + fastjson + com.alibaba + diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java index 1e9dd165d..9edd133b6 100644 --- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java +++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java @@ -3,6 +3,7 @@ package com.alibaba.arthas.channel.server.message.impl; import com.alibaba.arthas.channel.server.conf.ScheduledExecutorConfig; import com.alibaba.arthas.channel.server.message.MessageExchangeException; import com.alibaba.arthas.channel.server.message.MessageExchangeService; +import com.alibaba.arthas.channel.server.message.topic.ActionRequestTopic; import com.alibaba.arthas.channel.server.message.topic.Topic; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -37,15 +38,29 @@ public class MessageExchangeServiceImpl implements MessageExchangeService { private ScheduledExecutorConfig executorServiceConfig; private ScheduledFuture scheduledFuture; + // topic survival time ms + private int topicSurvivalTimeMills = 60*1000; + + // topic message queue capacity + private int topicCapacity = 1000; + + public MessageExchangeServiceImpl() { + } + + public MessageExchangeServiceImpl(int topicCapacity, int topicSurvivalTimeMills) { + this.topicCapacity = topicCapacity; + this.topicSurvivalTimeMills = topicSurvivalTimeMills; + } + @Override public void start() throws MessageExchangeException { scheduledFuture = executorServiceConfig.getExecutorService().scheduleWithFixedDelay(() -> { try { - cleanIdleTopics(10000); + cleanIdleTopics(topicSurvivalTimeMills); } catch (Exception e) { logger.error("clean idle topics failure", e); } - }, 5000, 5000, TimeUnit.MILLISECONDS); + }, 5, 5, TimeUnit.SECONDS); } @Override @@ -102,7 +117,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService { private TopicData getAndCheckTopicExists(Topic topic) throws MessageExchangeException { TopicData topicData = topicMap.get(topic); if (topicData == null) { - throw new MessageExchangeException("topic is not exists"); + throw new MessageExchangeException("topic is not exists: " + topic); } return topicData; } @@ -190,6 +205,9 @@ public class MessageExchangeServiceImpl implements MessageExchangeService { long now = System.currentTimeMillis(); List topicDataList = new ArrayList<>(topicMap.values()); for (TopicData topicData : topicDataList) { + if (topicData.topic instanceof ActionRequestTopic) { + continue; + } long idle = now - topicData.getLastActiveTime(); if (!topicData.isSubscribed() && idle > timeout) { try { @@ -202,7 +220,23 @@ public class MessageExchangeServiceImpl implements MessageExchangeService { } } - static class TopicData { + public int getTopicSurvivalTimeMills() { + return topicSurvivalTimeMills; + } + + public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) { + this.topicSurvivalTimeMills = topicSurvivalTimeMills; + } + + public int getTopicCapacity() { + return topicCapacity; + } + + public void setTopicCapacity(int topicCapacity) { + this.topicCapacity = topicCapacity; + } + + class TopicData { private BlockingQueue messageQueue; private Topic topic; private long createTime; @@ -211,7 +245,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService { public TopicData(Topic topic) { this.topic = topic; - messageQueue = new LinkedBlockingQueue(1000); + messageQueue = new LinkedBlockingQueue(topicCapacity); createTime = System.currentTimeMillis(); lastActiveTime = createTime; } diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java index c9632ac81..a10c7b1ae 100644 --- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java +++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java @@ -25,6 +25,19 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService { private static final String topicPrefix = "arthas:channel:topics:agent:"; + // topic survival time ms + private int topicSurvivalTimeMills = 60*1000; + + // topic message queue capacity + private int topicCapacity = 1000; + + public RedisMessageExchangeServiceImpl() { + } + + public RedisMessageExchangeServiceImpl(int topicSurvivalTimeMills, int topicCapacity) { + this.topicSurvivalTimeMills = topicSurvivalTimeMills; + this.topicCapacity = topicCapacity; + } @Autowired private ReactiveRedisTemplate redisTemplate; @@ -70,9 +83,10 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService { public void pushMessage(Topic topic, byte[] messageBytes) throws MessageExchangeException { String key = topic.getTopic(); redisTemplate.opsForList().leftPush(key, messageBytes).doOnSuccess(value -> { - redisTemplate.expire(key, Duration.ofMillis(10000)).subscribe(); + redisTemplate.expire(key, Duration.ofMillis(topicSurvivalTimeMills)).subscribe(); }).subscribe(); + //TODO check topic capacity } @Override @@ -117,4 +131,19 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService { } + public int getTopicSurvivalTimeMills() { + return topicSurvivalTimeMills; + } + + public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) { + this.topicSurvivalTimeMills = topicSurvivalTimeMills; + } + + public int getTopicCapacity() { + return topicCapacity; + } + + public void setTopicCapacity(int topicCapacity) { + this.topicCapacity = topicCapacity; + } } diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java new file mode 100644 index 000000000..4b0943dec --- /dev/null +++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java @@ -0,0 +1,237 @@ +package com.alibaba.arthas.channel.server.web; + +import com.alibaba.arthas.channel.proto.ActionRequest; +import com.alibaba.arthas.channel.proto.ActionResponse; +import com.alibaba.arthas.channel.proto.ExecuteParams; +import com.alibaba.arthas.channel.proto.ExecuteResult; +import com.alibaba.arthas.channel.proto.RequestAction; +import com.alibaba.arthas.channel.proto.ResponseStatus; +import com.alibaba.arthas.channel.proto.ResultFormat; +import com.alibaba.arthas.channel.server.model.AgentVO; +import com.alibaba.arthas.channel.server.service.AgentManageService; +import com.alibaba.arthas.channel.server.service.ApiActionDelegateService; +import com.alibaba.fastjson.JSON; +import com.taobao.arthas.core.shell.term.impl.http.api.ApiAction; +import com.taobao.arthas.core.shell.term.impl.http.api.ApiRequest; +import com.taobao.arthas.core.shell.term.impl.http.api.ApiResponse; +import com.taobao.arthas.core.shell.term.impl.http.api.ApiState; +import com.taobao.arthas.core.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +/** + * Compatible with Arthas Http API (https://arthas.aliyun.com/doc/http-api.html) + * + * @author gongdewei 2021/5/24 + */ +@RestController +public class LegacyApiController { + + + private static final Logger logger = LoggerFactory.getLogger(LegacyApiController.class); + + @Autowired + private AgentManageService agentManageService; + + @Autowired + private ApiActionDelegateService apiActionDelegateService; + + @RequestMapping("/legacy_api/{agentId}") + public Mono process(@PathVariable String agentId, @RequestBody ApiRequest request) { + + try { + checkAgentExists(agentId); + + // set default exec timeout + if (request.getExecTimeout() == null) { + request.setExecTimeout(30000); + } + + Mono actionResponseMono = null; + ApiAction action = parseApiAction(request.getAction()); + // process pull results + switch (action) { + case PULL_RESULTS: + checkRequestId(request); + actionResponseMono = apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout()); + return convertToApiResponse(actionResponseMono); + } + + // process other actions + ActionRequest actionRequest = createActionRequest(agentId, request); + switch (actionRequest.getAction()) { + case EXECUTE: + actionResponseMono = apiActionDelegateService.execCommand(agentId, actionRequest); + return convertToApiResponse(actionResponseMono); + case ASYNC_EXECUTE: + actionResponseMono = apiActionDelegateService.asyncExecCommand(agentId, actionRequest); + return convertToApiResponse(actionResponseMono); + case INIT_SESSION: + actionResponseMono = apiActionDelegateService.initSession(agentId); + return convertToApiResponse(actionResponseMono); + case CLOSE_SESSION: + actionResponseMono = apiActionDelegateService.closeSession(agentId, request.getSessionId()); + return convertToApiResponse(actionResponseMono); + default: + throw new UnsupportedOperationException("unsupported action: " + actionRequest.getAction()); + } + } catch (Throwable e) { + logger.error("process request failed, agentId: {}, request: {}", agentId, request, e); + ApiResponse response = new ApiResponse(); + response.setState(ApiState.FAILED) + .setMessage("process request failed: " + e.getMessage()) + .setAgentId(agentId) + .setRequestId(request.getRequestId()) + .setSessionId(request.getSessionId()); + return Mono.just(response); + } + } + + private void checkRequestId(ApiRequest request) { + if (StringUtils.isBlank(request.getRequestId())) { + throw new IllegalArgumentException("Invalid request, the 'requestId' is required"); + } + } + + private Mono convertToApiResponse(Mono actionResponseMono) { + return actionResponseMono.flatMap((actionResponse) -> Mono.just(convertToApiResponse(actionResponse))); + } + + private ApiResponse convertToApiResponse(ActionResponse actionResponse) { + ApiResponse response = new ApiResponse(); + response.setAgentId(actionResponse.getAgentId()); + response.setState(convertToApiState(actionResponse.getStatus())); + if (StringUtils.hasText(actionResponse.getRequestId())) { + response.setRequestId(actionResponse.getRequestId()); + } + if (StringUtils.hasText(actionResponse.getMessage())) { + response.setMessage(actionResponse.getMessage()); + } + if (StringUtils.hasText(actionResponse.getSessionId())) { + response.setSessionId(actionResponse.getSessionId()); + } + if (actionResponse.hasExecuteResult()) { + String resultsJson = null; + ExecuteResult executeResult = actionResponse.getExecuteResult(); + if (executeResult.hasResultsJson()) { + resultsJson = executeResult.getResultsJson().getValue(); + } + Map body = new TreeMap(); + + // attributes that are not available +// body.put("command", commandLine); + //packing results +// body.put("jobId", job.id()); +// body.put("jobStatus", job.status()); +// body.put("timeExpired", timeExpired); +// if (timeExpired) { +// body.put("timeout", timeout); +// } + + // set result as list + body.put("results", parseJsonToList(resultsJson)); + // set results is a json string + //body.put("resultsJson", resultsJson); + + response.setBody(body); + } + return response; + } + + private List parseJsonToList(String resultsJson) { + return JSON.parseArray(resultsJson, Map.class); + } + + private ActionRequest createActionRequest(String agentId, ApiRequest request) { + ActionRequest.Builder actionRequest = ActionRequest.newBuilder() + .setAgentId(agentId) + .setAction(convertToRequestAction(request.getAction())) + .setExecuteParams(createExecuteParams(request)); + + if (StringUtils.hasText(request.getRequestId())) { + actionRequest.setRequestId(request.getRequestId()); + } + if (StringUtils.hasText(request.getSessionId())) { + actionRequest.setSessionId(request.getSessionId()); + } + + return actionRequest.build(); + } + + private ExecuteParams createExecuteParams(ApiRequest request) { + return ExecuteParams.newBuilder() + .setCommandLine(request.getCommand() != null ? request.getCommand() : "") + .setResultFormat(ResultFormat.JSON) + .setExecTimeout(request.getExecTimeout()) + .build(); + } + + private AgentVO checkAgentExists(String agentId) { + Optional optionalAgentVO = agentManageService.findAgentById(agentId).block(); + if (!optionalAgentVO.isPresent()) { + throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found"); + } + return optionalAgentVO.get(); + } + + private ApiState convertToApiState(ResponseStatus status) { + switch (status) { + case SUCCEEDED: + case CONTINUOUS: + return ApiState.SUCCEEDED; + case REFUSED: + return ApiState.REFUSED; + case INTERRUPTED: + return ApiState.INTERRUPTED; + case FAILED: + case UNRECOGNIZED: + return ApiState.FAILED; + } + return ApiState.FAILED; + } + + private RequestAction convertToRequestAction(String actionStr) { + ApiAction action = parseApiAction(actionStr); + + switch (action) { + case INIT_SESSION: + return RequestAction.INIT_SESSION; + case CLOSE_SESSION: + return RequestAction.CLOSE_SESSION; + case EXEC: + return RequestAction.EXECUTE; + case ASYNC_EXEC: + return RequestAction.ASYNC_EXECUTE; + case INTERRUPT_JOB: + return RequestAction.INTERRUPT_JOB; + case PULL_RESULTS: + case JOIN_SESSION: + case SESSION_INFO: + default: + throw new IllegalArgumentException("unsupported action: " + actionStr); + } + } + + private ApiAction parseApiAction(String actionStr) { + ApiAction action; + try { + action = ApiAction.valueOf(actionStr.trim().toUpperCase()); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("unknown action: " + actionStr); + } + return action; + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/channel/ChannelRequestHandler.java b/core/src/main/java/com/taobao/arthas/core/channel/ChannelRequestHandler.java index 089cc70f0..28e4c0c4d 100644 --- a/core/src/main/java/com/taobao/arthas/core/channel/ChannelRequestHandler.java +++ b/core/src/main/java/com/taobao/arthas/core/channel/ChannelRequestHandler.java @@ -384,7 +384,6 @@ public class ChannelRequestHandler implements ChannelClient.RequestListener { } private void processAsyncExec(ActionRequest request, Session session) { - //TODO ExecuteParams executeParams = request.getExecuteParams(); String commandLine = executeParams.getCommandLine(); @@ -627,7 +626,7 @@ public class ChannelRequestHandler implements ChannelClient.RequestListener { @Override public void appendResult(ResultModel result) { - //TODO 优化输出,适度合并 + //TODO 优化输出,合并小结果? List resultModels = new ArrayList(); resultModels.add(result); diff --git a/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java b/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java index 4db0337be..b07bbca83 100644 --- a/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java +++ b/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java @@ -448,7 +448,7 @@ public class ArthasBootstrap { //http api session manager sessionManager = new SessionManagerImpl(options, shellServer.getCommandManager(), shellServer.getJobController()); //http api handler - httpApiHandler = new HttpApiHandler(historyManager, sessionManager); + httpApiHandler = new HttpApiHandler(historyManager, sessionManager, configure); logger().info("as-server listening on network={};telnet={};http={};timeout={};", configure.getIp(), configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout()); diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiRequest.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiRequest.java index 545c3165a..e57a68fd2 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiRequest.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiRequest.java @@ -1,6 +1,5 @@ package com.taobao.arthas.core.shell.term.impl.http.api; -import java.util.Map; /** * Http Api request diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java index e0d16a518..c5f15ab1d 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/ApiResponse.java @@ -1,11 +1,12 @@ package com.taobao.arthas.core.shell.term.impl.http.api; /** - * Http Api exception + * Http Api response * @author gongdewei 2020-03-19 */ public class ApiResponse { private String requestId; + private String agentId; private ApiState state; private String message; private String sessionId; @@ -67,6 +68,15 @@ public class ApiResponse { return this; } + public String getAgentId() { + return agentId; + } + + public ApiResponse setAgentId(String agentId) { + this.agentId = agentId; + return this; + } + public T getBody() { return body; } diff --git a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java index 3256f9951..a8e443be3 100644 --- a/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java +++ b/core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/api/HttpApiHandler.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON; import com.taobao.arthas.common.ArthasConstants; import com.taobao.arthas.common.PidUtils; import com.taobao.arthas.core.command.model.*; +import com.taobao.arthas.core.config.Configure; import com.taobao.arthas.core.distribution.PackingResultDistributor; import com.taobao.arthas.core.distribution.ResultConsumer; import com.taobao.arthas.core.distribution.ResultDistributor; @@ -68,14 +69,20 @@ public class HttpApiHandler { private ArrayBlockingQueue byteBufPool = new ArrayBlockingQueue(poolSize); private ArrayBlockingQueue charsBufPool = new ArrayBlockingQueue(poolSize); private ArrayBlockingQueue bytesPool = new ArrayBlockingQueue(poolSize); + private final String agentId; - public HttpApiHandler(HistoryManager historyManager, SessionManager sessionManager) { + public HttpApiHandler(HistoryManager historyManager, SessionManager sessionManager, Configure configure) { this.historyManager = historyManager; this.sessionManager = sessionManager; commandManager = this.sessionManager.getCommandManager(); jobController = this.sessionManager.getJobController(); + agentId = configure.getAgentId(); //init buf pool + initBufPool(); + } + + public void initBufPool() { JsonUtils.setSerializeWriterBufferThreshold(jsonBufferSize); for (int i = 0; i < poolSize; i++) { byteBufPool.offer(Unpooled.buffer(jsonBufferSize)); @@ -107,7 +114,7 @@ public class HttpApiHandler { result = createResponse(ApiState.FAILED, "The request was not processed"); } result.setRequestId(requestId); - + result.setAgentId(agentId); //http response content ByteBuf content = null;