Support /legacy_api, improve message cleanup

pull/1490/merge^2
gongdewei 4 years ago
parent d6fdd0a4e9
commit 11563c0454

@ -15,6 +15,14 @@ arthas.channel.server.websocket.port=8801
# whether start agent cleaner
arthas.channel.server.agent-cleaner.enabled=true
# message exchange topic survival time
arthas.channel.server.message-exchange.topic-survival-time-mills=60000
# message exchange topic capacity
arthas.channel.server.message-exchange.topic-capacity=1000
# for all endpoints
management.endpoints.web.exposure.include=*

@ -131,8 +131,10 @@ public class ChannelServerAutoConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean
public MessageExchangeService messageExchangeService() {
return new MessageExchangeServiceImpl();
public MessageExchangeService messageExchangeService(ChannelServerProperties serverProperties) {
int capacity = serverProperties.getMessageExchange().getTopicCapacity();
int survivalTimeMills = serverProperties.getMessageExchange().getTopicSurvivalTimeMills();
return new MessageExchangeServiceImpl(capacity, survivalTimeMills);
}
}
@ -148,8 +150,10 @@ public class ChannelServerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public MessageExchangeService messageExchangeService() {
return new RedisMessageExchangeServiceImpl();
public MessageExchangeService messageExchangeService(ChannelServerProperties serverProperties) {
int capacity = serverProperties.getMessageExchange().getTopicCapacity();
int survivalTimeMills = serverProperties.getMessageExchange().getTopicSurvivalTimeMills();
return new RedisMessageExchangeServiceImpl(capacity, survivalTimeMills);
}
@Bean

@ -8,6 +8,7 @@ public class ChannelServerProperties {
public static final String PREFIX = "arthas.channel.server";
private Server websocket = new Server();
private Server backend = new Server();
private MessageExchange messageExchange = new MessageExchange();
private AgentCleaner agentCleaner = new AgentCleaner();
@ -35,6 +36,14 @@ public class ChannelServerProperties {
this.agentCleaner = agentCleaner;
}
public MessageExchange getMessageExchange() {
return messageExchange;
}
public void setMessageExchange(MessageExchange messageExchange) {
this.messageExchange = messageExchange;
}
public static class Server {
private String host;
private int port;
@ -76,9 +85,9 @@ public class ChannelServerProperties {
public static class AgentCleaner {
private int cleanIntervalMills = 5000;
private int outOfServiceTimeoutMills = 15*1000;
private int downTimeoutMills = 30*1000;
private int removingTimeoutMills = 1800*1000;
private int outOfServiceTimeoutMills = 15000;
private int downTimeoutMills = 30000;
private int removingTimeoutMills = 1800000;
private boolean enabled = true;
public int getCleanIntervalMills() {
@ -122,4 +131,31 @@ public class ChannelServerProperties {
}
}
public static class MessageExchange {
private int topicSurvivalTimeMills = 60000;
private int topicCapacity = 1000;
public int getTopicSurvivalTimeMills() {
return topicSurvivalTimeMills;
}
/**
* message topic survival seconds
*/
public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) {
this.topicSurvivalTimeMills = topicSurvivalTimeMills;
}
public int getTopicCapacity() {
return topicCapacity;
}
/**
* message queue capacity
*/
public void setTopicCapacity(int topicCapacity) {
this.topicCapacity = topicCapacity;
}
}
}

@ -69,8 +69,6 @@
<groupId>com.taobao.arthas</groupId>
<artifactId>arthas-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<artifactId>arthas-tunnel-client</artifactId>
@ -80,6 +78,46 @@
<artifactId>termd-core</artifactId>
<groupId>com.alibaba.middleware</groupId>
</exclusion>
<exclusion>
<artifactId>arthas-channel-client</artifactId>
<groupId>com.taobao.arthas</groupId>
</exclusion>
<exclusion>
<artifactId>arthas-memorycompiler</artifactId>
<groupId>com.taobao.arthas</groupId>
</exclusion>
<exclusion>
<artifactId>arthas-vmtool</artifactId>
<groupId>com.taobao.arthas</groupId>
</exclusion>
<exclusion>
<artifactId>ognl</artifactId>
<groupId>ognl</groupId>
</exclusion>
<exclusion>
<artifactId>bytekit-core</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
<exclusion>
<artifactId>cfr</artifactId>
<groupId>org.benf</groupId>
</exclusion>
<exclusion>
<artifactId>cli</artifactId>
<groupId>com.alibaba.middleware</groupId>
</exclusion>
<exclusion>
<artifactId>text-ui</artifactId>
<groupId>com.taobao.text</groupId>
</exclusion>
<exclusion>
<artifactId>rsyntaxtextarea</artifactId>
<groupId>com.fifesoft</groupId>
</exclusion>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
</exclusions>
</dependency>

@ -3,6 +3,7 @@ package com.alibaba.arthas.channel.server.message.impl;
import com.alibaba.arthas.channel.server.conf.ScheduledExecutorConfig;
import com.alibaba.arthas.channel.server.message.MessageExchangeException;
import com.alibaba.arthas.channel.server.message.MessageExchangeService;
import com.alibaba.arthas.channel.server.message.topic.ActionRequestTopic;
import com.alibaba.arthas.channel.server.message.topic.Topic;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@ -37,15 +38,29 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
private ScheduledExecutorConfig executorServiceConfig;
private ScheduledFuture<?> scheduledFuture;
// topic survival time ms
private int topicSurvivalTimeMills = 60*1000;
// topic message queue capacity
private int topicCapacity = 1000;
public MessageExchangeServiceImpl() {
}
public MessageExchangeServiceImpl(int topicCapacity, int topicSurvivalTimeMills) {
this.topicCapacity = topicCapacity;
this.topicSurvivalTimeMills = topicSurvivalTimeMills;
}
@Override
public void start() throws MessageExchangeException {
scheduledFuture = executorServiceConfig.getExecutorService().scheduleWithFixedDelay(() -> {
try {
cleanIdleTopics(10000);
cleanIdleTopics(topicSurvivalTimeMills);
} catch (Exception e) {
logger.error("clean idle topics failure", e);
}
}, 5000, 5000, TimeUnit.MILLISECONDS);
}, 5, 5, TimeUnit.SECONDS);
}
@Override
@ -102,7 +117,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
private TopicData getAndCheckTopicExists(Topic topic) throws MessageExchangeException {
TopicData topicData = topicMap.get(topic);
if (topicData == null) {
throw new MessageExchangeException("topic is not exists");
throw new MessageExchangeException("topic is not exists: " + topic);
}
return topicData;
}
@ -190,6 +205,9 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
long now = System.currentTimeMillis();
List<TopicData> topicDataList = new ArrayList<>(topicMap.values());
for (TopicData topicData : topicDataList) {
if (topicData.topic instanceof ActionRequestTopic) {
continue;
}
long idle = now - topicData.getLastActiveTime();
if (!topicData.isSubscribed() && idle > timeout) {
try {
@ -202,7 +220,23 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
}
}
static class TopicData {
public int getTopicSurvivalTimeMills() {
return topicSurvivalTimeMills;
}
public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) {
this.topicSurvivalTimeMills = topicSurvivalTimeMills;
}
public int getTopicCapacity() {
return topicCapacity;
}
public void setTopicCapacity(int topicCapacity) {
this.topicCapacity = topicCapacity;
}
class TopicData {
private BlockingQueue<byte[]> messageQueue;
private Topic topic;
private long createTime;
@ -211,7 +245,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
public TopicData(Topic topic) {
this.topic = topic;
messageQueue = new LinkedBlockingQueue<byte[]>(1000);
messageQueue = new LinkedBlockingQueue<byte[]>(topicCapacity);
createTime = System.currentTimeMillis();
lastActiveTime = createTime;
}

@ -25,6 +25,19 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService {
private static final String topicPrefix = "arthas:channel:topics:agent:";
// topic survival time ms
private int topicSurvivalTimeMills = 60*1000;
// topic message queue capacity
private int topicCapacity = 1000;
public RedisMessageExchangeServiceImpl() {
}
public RedisMessageExchangeServiceImpl(int topicSurvivalTimeMills, int topicCapacity) {
this.topicSurvivalTimeMills = topicSurvivalTimeMills;
this.topicCapacity = topicCapacity;
}
@Autowired
private ReactiveRedisTemplate<String, byte[]> redisTemplate;
@ -70,9 +83,10 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService {
public void pushMessage(Topic topic, byte[] messageBytes) throws MessageExchangeException {
String key = topic.getTopic();
redisTemplate.opsForList().leftPush(key, messageBytes).doOnSuccess(value -> {
redisTemplate.expire(key, Duration.ofMillis(10000)).subscribe();
redisTemplate.expire(key, Duration.ofMillis(topicSurvivalTimeMills)).subscribe();
}).subscribe();
//TODO check topic capacity
}
@Override
@ -117,4 +131,19 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService {
}
public int getTopicSurvivalTimeMills() {
return topicSurvivalTimeMills;
}
public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) {
this.topicSurvivalTimeMills = topicSurvivalTimeMills;
}
public int getTopicCapacity() {
return topicCapacity;
}
public void setTopicCapacity(int topicCapacity) {
this.topicCapacity = topicCapacity;
}
}

@ -0,0 +1,237 @@
package com.alibaba.arthas.channel.server.web;
import com.alibaba.arthas.channel.proto.ActionRequest;
import com.alibaba.arthas.channel.proto.ActionResponse;
import com.alibaba.arthas.channel.proto.ExecuteParams;
import com.alibaba.arthas.channel.proto.ExecuteResult;
import com.alibaba.arthas.channel.proto.RequestAction;
import com.alibaba.arthas.channel.proto.ResponseStatus;
import com.alibaba.arthas.channel.proto.ResultFormat;
import com.alibaba.arthas.channel.server.model.AgentVO;
import com.alibaba.arthas.channel.server.service.AgentManageService;
import com.alibaba.arthas.channel.server.service.ApiActionDelegateService;
import com.alibaba.fastjson.JSON;
import com.taobao.arthas.core.shell.term.impl.http.api.ApiAction;
import com.taobao.arthas.core.shell.term.impl.http.api.ApiRequest;
import com.taobao.arthas.core.shell.term.impl.http.api.ApiResponse;
import com.taobao.arthas.core.shell.term.impl.http.api.ApiState;
import com.taobao.arthas.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
/**
* Compatible with Arthas Http API (https://arthas.aliyun.com/doc/http-api.html)
*
* @author gongdewei 2021/5/24
*/
@RestController
public class LegacyApiController {
private static final Logger logger = LoggerFactory.getLogger(LegacyApiController.class);
@Autowired
private AgentManageService agentManageService;
@Autowired
private ApiActionDelegateService apiActionDelegateService;
@RequestMapping("/legacy_api/{agentId}")
public Mono<ApiResponse> process(@PathVariable String agentId, @RequestBody ApiRequest request) {
try {
checkAgentExists(agentId);
// set default exec timeout
if (request.getExecTimeout() == null) {
request.setExecTimeout(30000);
}
Mono<ActionResponse> actionResponseMono = null;
ApiAction action = parseApiAction(request.getAction());
// process pull results
switch (action) {
case PULL_RESULTS:
checkRequestId(request);
actionResponseMono = apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout());
return convertToApiResponse(actionResponseMono);
}
// process other actions
ActionRequest actionRequest = createActionRequest(agentId, request);
switch (actionRequest.getAction()) {
case EXECUTE:
actionResponseMono = apiActionDelegateService.execCommand(agentId, actionRequest);
return convertToApiResponse(actionResponseMono);
case ASYNC_EXECUTE:
actionResponseMono = apiActionDelegateService.asyncExecCommand(agentId, actionRequest);
return convertToApiResponse(actionResponseMono);
case INIT_SESSION:
actionResponseMono = apiActionDelegateService.initSession(agentId);
return convertToApiResponse(actionResponseMono);
case CLOSE_SESSION:
actionResponseMono = apiActionDelegateService.closeSession(agentId, request.getSessionId());
return convertToApiResponse(actionResponseMono);
default:
throw new UnsupportedOperationException("unsupported action: " + actionRequest.getAction());
}
} catch (Throwable e) {
logger.error("process request failed, agentId: {}, request: {}", agentId, request, e);
ApiResponse response = new ApiResponse();
response.setState(ApiState.FAILED)
.setMessage("process request failed: " + e.getMessage())
.setAgentId(agentId)
.setRequestId(request.getRequestId())
.setSessionId(request.getSessionId());
return Mono.just(response);
}
}
private void checkRequestId(ApiRequest request) {
if (StringUtils.isBlank(request.getRequestId())) {
throw new IllegalArgumentException("Invalid request, the 'requestId' is required");
}
}
private Mono<ApiResponse> convertToApiResponse(Mono<ActionResponse> actionResponseMono) {
return actionResponseMono.flatMap((actionResponse) -> Mono.just(convertToApiResponse(actionResponse)));
}
private ApiResponse convertToApiResponse(ActionResponse actionResponse) {
ApiResponse response = new ApiResponse();
response.setAgentId(actionResponse.getAgentId());
response.setState(convertToApiState(actionResponse.getStatus()));
if (StringUtils.hasText(actionResponse.getRequestId())) {
response.setRequestId(actionResponse.getRequestId());
}
if (StringUtils.hasText(actionResponse.getMessage())) {
response.setMessage(actionResponse.getMessage());
}
if (StringUtils.hasText(actionResponse.getSessionId())) {
response.setSessionId(actionResponse.getSessionId());
}
if (actionResponse.hasExecuteResult()) {
String resultsJson = null;
ExecuteResult executeResult = actionResponse.getExecuteResult();
if (executeResult.hasResultsJson()) {
resultsJson = executeResult.getResultsJson().getValue();
}
Map<String, Object> body = new TreeMap<String, Object>();
// attributes that are not available
// body.put("command", commandLine);
//packing results
// body.put("jobId", job.id());
// body.put("jobStatus", job.status());
// body.put("timeExpired", timeExpired);
// if (timeExpired) {
// body.put("timeout", timeout);
// }
// set result as list
body.put("results", parseJsonToList(resultsJson));
// set results is a json string
//body.put("resultsJson", resultsJson);
response.setBody(body);
}
return response;
}
private List<Map> parseJsonToList(String resultsJson) {
return JSON.parseArray(resultsJson, Map.class);
}
private ActionRequest createActionRequest(String agentId, ApiRequest request) {
ActionRequest.Builder actionRequest = ActionRequest.newBuilder()
.setAgentId(agentId)
.setAction(convertToRequestAction(request.getAction()))
.setExecuteParams(createExecuteParams(request));
if (StringUtils.hasText(request.getRequestId())) {
actionRequest.setRequestId(request.getRequestId());
}
if (StringUtils.hasText(request.getSessionId())) {
actionRequest.setSessionId(request.getSessionId());
}
return actionRequest.build();
}
private ExecuteParams createExecuteParams(ApiRequest request) {
return ExecuteParams.newBuilder()
.setCommandLine(request.getCommand() != null ? request.getCommand() : "")
.setResultFormat(ResultFormat.JSON)
.setExecTimeout(request.getExecTimeout())
.build();
}
private AgentVO checkAgentExists(String agentId) {
Optional<AgentVO> optionalAgentVO = agentManageService.findAgentById(agentId).block();
if (!optionalAgentVO.isPresent()) {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found");
}
return optionalAgentVO.get();
}
private ApiState convertToApiState(ResponseStatus status) {
switch (status) {
case SUCCEEDED:
case CONTINUOUS:
return ApiState.SUCCEEDED;
case REFUSED:
return ApiState.REFUSED;
case INTERRUPTED:
return ApiState.INTERRUPTED;
case FAILED:
case UNRECOGNIZED:
return ApiState.FAILED;
}
return ApiState.FAILED;
}
private RequestAction convertToRequestAction(String actionStr) {
ApiAction action = parseApiAction(actionStr);
switch (action) {
case INIT_SESSION:
return RequestAction.INIT_SESSION;
case CLOSE_SESSION:
return RequestAction.CLOSE_SESSION;
case EXEC:
return RequestAction.EXECUTE;
case ASYNC_EXEC:
return RequestAction.ASYNC_EXECUTE;
case INTERRUPT_JOB:
return RequestAction.INTERRUPT_JOB;
case PULL_RESULTS:
case JOIN_SESSION:
case SESSION_INFO:
default:
throw new IllegalArgumentException("unsupported action: " + actionStr);
}
}
private ApiAction parseApiAction(String actionStr) {
ApiAction action;
try {
action = ApiAction.valueOf(actionStr.trim().toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("unknown action: " + actionStr);
}
return action;
}
}

@ -384,7 +384,6 @@ public class ChannelRequestHandler implements ChannelClient.RequestListener {
}
private void processAsyncExec(ActionRequest request, Session session) {
//TODO
ExecuteParams executeParams = request.getExecuteParams();
String commandLine = executeParams.getCommandLine();
@ -627,7 +626,7 @@ public class ChannelRequestHandler implements ChannelClient.RequestListener {
@Override
public void appendResult(ResultModel result) {
//TODO 优化输出,适度合并
//TODO 优化输出,合并小结果?
List<ResultModel> resultModels = new ArrayList<ResultModel>();
resultModels.add(result);

@ -448,7 +448,7 @@ public class ArthasBootstrap {
//http api session manager
sessionManager = new SessionManagerImpl(options, shellServer.getCommandManager(), shellServer.getJobController());
//http api handler
httpApiHandler = new HttpApiHandler(historyManager, sessionManager);
httpApiHandler = new HttpApiHandler(historyManager, sessionManager, configure);
logger().info("as-server listening on network={};telnet={};http={};timeout={};", configure.getIp(),
configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout());

@ -1,6 +1,5 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
import java.util.Map;
/**
* Http Api request

@ -1,11 +1,12 @@
package com.taobao.arthas.core.shell.term.impl.http.api;
/**
* Http Api exception
* Http Api response
* @author gongdewei 2020-03-19
*/
public class ApiResponse<T> {
private String requestId;
private String agentId;
private ApiState state;
private String message;
private String sessionId;
@ -67,6 +68,15 @@ public class ApiResponse<T> {
return this;
}
public String getAgentId() {
return agentId;
}
public ApiResponse<T> setAgentId(String agentId) {
this.agentId = agentId;
return this;
}
public T getBody() {
return body;
}

@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON;
import com.taobao.arthas.common.ArthasConstants;
import com.taobao.arthas.common.PidUtils;
import com.taobao.arthas.core.command.model.*;
import com.taobao.arthas.core.config.Configure;
import com.taobao.arthas.core.distribution.PackingResultDistributor;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.ResultDistributor;
@ -68,14 +69,20 @@ public class HttpApiHandler {
private ArrayBlockingQueue<ByteBuf> byteBufPool = new ArrayBlockingQueue<ByteBuf>(poolSize);
private ArrayBlockingQueue<char[]> charsBufPool = new ArrayBlockingQueue<char[]>(poolSize);
private ArrayBlockingQueue<byte[]> bytesPool = new ArrayBlockingQueue<byte[]>(poolSize);
private final String agentId;
public HttpApiHandler(HistoryManager historyManager, SessionManager sessionManager) {
public HttpApiHandler(HistoryManager historyManager, SessionManager sessionManager, Configure configure) {
this.historyManager = historyManager;
this.sessionManager = sessionManager;
commandManager = this.sessionManager.getCommandManager();
jobController = this.sessionManager.getJobController();
agentId = configure.getAgentId();
//init buf pool
initBufPool();
}
public void initBufPool() {
JsonUtils.setSerializeWriterBufferThreshold(jsonBufferSize);
for (int i = 0; i < poolSize; i++) {
byteBufPool.offer(Unpooled.buffer(jsonBufferSize));
@ -107,7 +114,7 @@ public class HttpApiHandler {
result = createResponse(ApiState.FAILED, "The request was not processed");
}
result.setRequestId(requestId);
result.setAgentId(agentId);
//http response content
ByteBuf content = null;

Loading…
Cancel
Save