From 4261eba5ba065620462c863806e5e03d099e1aa8 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Tue, 25 May 2021 16:33:06 +0800 Subject: [PATCH] support pull results by sessionId (compatible with Http API) --- .../server/web/LegacyApiController.java | 133 ++++++++++++++++-- 1 file changed, 118 insertions(+), 15 deletions(-) diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java index 2371075fb..51be8fd65 100644 --- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java +++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java @@ -7,6 +7,7 @@ import com.alibaba.arthas.channel.proto.ExecuteResult; import com.alibaba.arthas.channel.proto.RequestAction; import com.alibaba.arthas.channel.proto.ResponseStatus; import com.alibaba.arthas.channel.proto.ResultFormat; +import com.alibaba.arthas.channel.server.conf.ScheduledExecutorConfig; import com.alibaba.arthas.channel.server.model.AgentVO; import com.alibaba.arthas.channel.server.service.AgentManageService; import com.alibaba.arthas.channel.server.service.ApiActionDelegateService; @@ -19,6 +20,7 @@ import com.taobao.arthas.core.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; @@ -27,10 +29,14 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ResponseStatusException; import reactor.core.publisher.Mono; +import javax.annotation.PostConstruct; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Compatible with Arthas Http API (https://arthas.aliyun.com/doc/http-api.html) @@ -43,12 +49,40 @@ public class LegacyApiController { private static final Logger logger = LoggerFactory.getLogger(LegacyApiController.class); + // sessionId -> requestId, save last requestId of async_exec + private Map asyncExecRequestMap = new ConcurrentHashMap<>(); + @Autowired private AgentManageService agentManageService; @Autowired private ApiActionDelegateService apiActionDelegateService; + @Autowired + private ScheduledExecutorConfig executorServiceConfig; + private ScheduledFuture scheduledFuture; + + // topic survival time ms + @Value("${arthas.channel.server.message-exchange.topic-survival-time-mills:60000}") + private int topicSurvivalTimeMills; + + public LegacyApiController() { + } + + @PostConstruct + private void scheduleRequestDataCleanTask() { + if (scheduledFuture == null) { + scheduledFuture = executorServiceConfig.getExecutorService().scheduleWithFixedDelay(() -> { + try { + cleanIdleRequestData(); + } catch (Exception e) { + logger.error("clean idle request data failure", e); + } + }, 10, 10, TimeUnit.SECONDS); + logger.info("Request data clean task is scheduled."); + } + } + @RequestMapping({"/legacy_api/{agentId}", "/legacy_api"}) public Mono process(@PathVariable(required = false) String agentId, @RequestBody ApiRequest request) { @@ -68,36 +102,53 @@ public class LegacyApiController { request.setExecTimeout(30000); } - Mono actionResponseMono = null; ApiAction action = parseApiAction(request.getAction()); // process pull results switch (action) { case PULL_RESULTS: + if (StringUtils.isBlank(request.getRequestId())) { + if (StringUtils.isBlank(request.getSessionId())) { + throw new IllegalArgumentException("Invalid request to pull results, need to specify 'requestId' or 'sessionId'"); + } + // find requestId by sessionId + String requestId = getAsyncExecRequestId(request); + if (requestId == null) { + throw new IllegalStateException("Find async exec requestId by sessionId failed, please specify the 'requestId' in ASYNC_EXEC result"); + } + request.setRequestId(requestId); + } checkRequestId(request); - actionResponseMono = apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout()); - return convertToApiResponse(actionResponseMono); + return apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout()) + .flatMap(this::transformToApiResponse); } // process other actions ActionRequest actionRequest = createActionRequest(agentId, request); switch (actionRequest.getAction()) { case EXECUTE: - actionResponseMono = apiActionDelegateService.execCommand(agentId, actionRequest); - return convertToApiResponse(actionResponseMono); + return apiActionDelegateService.execCommand(agentId, actionRequest) + .flatMap(this::transformToApiResponse); case ASYNC_EXECUTE: - actionResponseMono = apiActionDelegateService.asyncExecCommand(agentId, actionRequest); - return convertToApiResponse(actionResponseMono); + return apiActionDelegateService.asyncExecCommand(agentId, actionRequest) + .flatMap(this::transformToApiResponse) + .filter(apiResponse -> { + // save last requestId of async_exec + if (apiResponse.getSessionId() != null && apiResponse.getState() == ApiState.SUCCEEDED) { + saveAsyncExecRequest(apiResponse); + } + return true; + }); case INIT_SESSION: - actionResponseMono = apiActionDelegateService.initSession(agentId); - return convertToApiResponse(actionResponseMono); + return apiActionDelegateService.initSession(agentId) + .flatMap(this::transformToApiResponse); case INTERRUPT_JOB: checkSessionId(request); - actionResponseMono = apiActionDelegateService.interruptJob(agentId, request.getSessionId()); - return convertToApiResponse(actionResponseMono); + return apiActionDelegateService.interruptJob(agentId, request.getSessionId()) + .flatMap(this::transformToApiResponse); case CLOSE_SESSION: checkSessionId(request); - actionResponseMono = apiActionDelegateService.closeSession(agentId, request.getSessionId()); - return convertToApiResponse(actionResponseMono); + return apiActionDelegateService.closeSession(agentId, request.getSessionId()) + .flatMap(this::transformToApiResponse); default: throw new UnsupportedOperationException("unsupported action: " + actionRequest.getAction()); } @@ -113,6 +164,37 @@ public class LegacyApiController { } } + private void saveAsyncExecRequest(ApiResponse apiResponse) { + synchronized (asyncExecRequestMap) { + asyncExecRequestMap.put(apiResponse.getSessionId(), new AsyncExecRequestData(apiResponse.getAgentId(), apiResponse.getRequestId())); + } + } + + private String getAsyncExecRequestId(ApiRequest request) { + synchronized (asyncExecRequestMap) { + AsyncExecRequestData requestData = asyncExecRequestMap.get(request.getSessionId()); + if (requestData != null) { + requestData.time = System.currentTimeMillis(); + return requestData.requestId; + } + return null; + } + } + + private void cleanIdleRequestData() { + synchronized (asyncExecRequestMap) { + long currentTimeMillis = System.currentTimeMillis(); + asyncExecRequestMap.entrySet().removeIf(entry -> { + long idleTime = currentTimeMillis - entry.getValue().time; + boolean outdated = idleTime > topicSurvivalTimeMills; + if (outdated) { + logger.info("Removing outdated request data, idle time: {}, sessionId: {}, request: {}", idleTime, entry.getKey(), entry.getValue()); + } + return outdated; + }); + } + } + private void checkSessionId(ApiRequest request) { if (StringUtils.isBlank(request.getSessionId())) { throw new IllegalArgumentException("Invalid request, the 'sessionId' is required"); @@ -125,8 +207,8 @@ public class LegacyApiController { } } - private Mono convertToApiResponse(Mono actionResponseMono) { - return actionResponseMono.flatMap((actionResponse) -> Mono.just(convertToApiResponse(actionResponse))); + private Mono transformToApiResponse(ActionResponse actionResponse) { + return Mono.just(convertToApiResponse(actionResponse)); } private ApiResponse convertToApiResponse(ActionResponse actionResponse) { @@ -253,4 +335,25 @@ public class LegacyApiController { } return action; } + + static class AsyncExecRequestData { + private String agentId; + private String requestId; + private long time; + + public AsyncExecRequestData(String agentId, String requestId) { + this.agentId = agentId; + this.requestId = requestId; + time = System.currentTimeMillis(); + } + + @Override + public String toString() { + return "AsyncExecRequestData{" + + "agentId='" + agentId + '\'' + + ", requestId='" + requestId + '\'' + + ", time=" + time + + '}'; + } + } }