support pull results by sessionId (compatible with Http API)

pull/1490/merge^2
gongdewei 4 years ago
parent f9d0950b7d
commit 4261eba5ba

@ -7,6 +7,7 @@ import com.alibaba.arthas.channel.proto.ExecuteResult;
import com.alibaba.arthas.channel.proto.RequestAction;
import com.alibaba.arthas.channel.proto.ResponseStatus;
import com.alibaba.arthas.channel.proto.ResultFormat;
import com.alibaba.arthas.channel.server.conf.ScheduledExecutorConfig;
import com.alibaba.arthas.channel.server.model.AgentVO;
import com.alibaba.arthas.channel.server.service.AgentManageService;
import com.alibaba.arthas.channel.server.service.ApiActionDelegateService;
@ -19,6 +20,7 @@ import com.taobao.arthas.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
@ -27,10 +29,14 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Compatible with Arthas Http API (https://arthas.aliyun.com/doc/http-api.html)
@ -43,12 +49,40 @@ public class LegacyApiController {
private static final Logger logger = LoggerFactory.getLogger(LegacyApiController.class);
// sessionId -> requestId, save last requestId of async_exec
private Map<String, AsyncExecRequestData> asyncExecRequestMap = new ConcurrentHashMap<>();
@Autowired
private AgentManageService agentManageService;
@Autowired
private ApiActionDelegateService apiActionDelegateService;
@Autowired
private ScheduledExecutorConfig executorServiceConfig;
private ScheduledFuture<?> scheduledFuture;
// topic survival time ms
@Value("${arthas.channel.server.message-exchange.topic-survival-time-mills:60000}")
private int topicSurvivalTimeMills;
public LegacyApiController() {
}
@PostConstruct
private void scheduleRequestDataCleanTask() {
if (scheduledFuture == null) {
scheduledFuture = executorServiceConfig.getExecutorService().scheduleWithFixedDelay(() -> {
try {
cleanIdleRequestData();
} catch (Exception e) {
logger.error("clean idle request data failure", e);
}
}, 10, 10, TimeUnit.SECONDS);
logger.info("Request data clean task is scheduled.");
}
}
@RequestMapping({"/legacy_api/{agentId}", "/legacy_api"})
public Mono<ApiResponse> process(@PathVariable(required = false) String agentId, @RequestBody ApiRequest request) {
@ -68,36 +102,53 @@ public class LegacyApiController {
request.setExecTimeout(30000);
}
Mono<ActionResponse> actionResponseMono = null;
ApiAction action = parseApiAction(request.getAction());
// process pull results
switch (action) {
case PULL_RESULTS:
if (StringUtils.isBlank(request.getRequestId())) {
if (StringUtils.isBlank(request.getSessionId())) {
throw new IllegalArgumentException("Invalid request to pull results, need to specify 'requestId' or 'sessionId'");
}
// find requestId by sessionId
String requestId = getAsyncExecRequestId(request);
if (requestId == null) {
throw new IllegalStateException("Find async exec requestId by sessionId failed, please specify the 'requestId' in ASYNC_EXEC result");
}
request.setRequestId(requestId);
}
checkRequestId(request);
actionResponseMono = apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout());
return convertToApiResponse(actionResponseMono);
return apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout())
.flatMap(this::transformToApiResponse);
}
// process other actions
ActionRequest actionRequest = createActionRequest(agentId, request);
switch (actionRequest.getAction()) {
case EXECUTE:
actionResponseMono = apiActionDelegateService.execCommand(agentId, actionRequest);
return convertToApiResponse(actionResponseMono);
return apiActionDelegateService.execCommand(agentId, actionRequest)
.flatMap(this::transformToApiResponse);
case ASYNC_EXECUTE:
actionResponseMono = apiActionDelegateService.asyncExecCommand(agentId, actionRequest);
return convertToApiResponse(actionResponseMono);
return apiActionDelegateService.asyncExecCommand(agentId, actionRequest)
.flatMap(this::transformToApiResponse)
.filter(apiResponse -> {
// save last requestId of async_exec
if (apiResponse.getSessionId() != null && apiResponse.getState() == ApiState.SUCCEEDED) {
saveAsyncExecRequest(apiResponse);
}
return true;
});
case INIT_SESSION:
actionResponseMono = apiActionDelegateService.initSession(agentId);
return convertToApiResponse(actionResponseMono);
return apiActionDelegateService.initSession(agentId)
.flatMap(this::transformToApiResponse);
case INTERRUPT_JOB:
checkSessionId(request);
actionResponseMono = apiActionDelegateService.interruptJob(agentId, request.getSessionId());
return convertToApiResponse(actionResponseMono);
return apiActionDelegateService.interruptJob(agentId, request.getSessionId())
.flatMap(this::transformToApiResponse);
case CLOSE_SESSION:
checkSessionId(request);
actionResponseMono = apiActionDelegateService.closeSession(agentId, request.getSessionId());
return convertToApiResponse(actionResponseMono);
return apiActionDelegateService.closeSession(agentId, request.getSessionId())
.flatMap(this::transformToApiResponse);
default:
throw new UnsupportedOperationException("unsupported action: " + actionRequest.getAction());
}
@ -113,6 +164,37 @@ public class LegacyApiController {
}
}
private void saveAsyncExecRequest(ApiResponse apiResponse) {
synchronized (asyncExecRequestMap) {
asyncExecRequestMap.put(apiResponse.getSessionId(), new AsyncExecRequestData(apiResponse.getAgentId(), apiResponse.getRequestId()));
}
}
private String getAsyncExecRequestId(ApiRequest request) {
synchronized (asyncExecRequestMap) {
AsyncExecRequestData requestData = asyncExecRequestMap.get(request.getSessionId());
if (requestData != null) {
requestData.time = System.currentTimeMillis();
return requestData.requestId;
}
return null;
}
}
private void cleanIdleRequestData() {
synchronized (asyncExecRequestMap) {
long currentTimeMillis = System.currentTimeMillis();
asyncExecRequestMap.entrySet().removeIf(entry -> {
long idleTime = currentTimeMillis - entry.getValue().time;
boolean outdated = idleTime > topicSurvivalTimeMills;
if (outdated) {
logger.info("Removing outdated request data, idle time: {}, sessionId: {}, request: {}", idleTime, entry.getKey(), entry.getValue());
}
return outdated;
});
}
}
private void checkSessionId(ApiRequest request) {
if (StringUtils.isBlank(request.getSessionId())) {
throw new IllegalArgumentException("Invalid request, the 'sessionId' is required");
@ -125,8 +207,8 @@ public class LegacyApiController {
}
}
private Mono<ApiResponse> convertToApiResponse(Mono<ActionResponse> actionResponseMono) {
return actionResponseMono.flatMap((actionResponse) -> Mono.just(convertToApiResponse(actionResponse)));
private Mono<ApiResponse> transformToApiResponse(ActionResponse actionResponse) {
return Mono.just(convertToApiResponse(actionResponse));
}
private ApiResponse convertToApiResponse(ActionResponse actionResponse) {
@ -253,4 +335,25 @@ public class LegacyApiController {
}
return action;
}
static class AsyncExecRequestData {
private String agentId;
private String requestId;
private long time;
public AsyncExecRequestData(String agentId, String requestId) {
this.agentId = agentId;
this.requestId = requestId;
time = System.currentTimeMillis();
}
@Override
public String toString() {
return "AsyncExecRequestData{" +
"agentId='" + agentId + '\'' +
", requestId='" + requestId + '\'' +
", time=" + time +
'}';
}
}
}

Loading…
Cancel
Save