diff --git a/channel/channel-server-app/src/main/resources/application.properties b/channel/channel-server-app/src/main/resources/application.properties
index 3d09e8162..bd49b9086 100755
--- a/channel/channel-server-app/src/main/resources/application.properties
+++ b/channel/channel-server-app/src/main/resources/application.properties
@@ -15,6 +15,14 @@ arthas.channel.server.websocket.port=8801
# whether start agent cleaner
arthas.channel.server.agent-cleaner.enabled=true
+# message exchange topic survival time
+arthas.channel.server.message-exchange.topic-survival-time-mills=60000
+
+# message exchange topic capacity
+arthas.channel.server.message-exchange.topic-capacity=1000
+
+
+
# for all endpoints
management.endpoints.web.exposure.include=*
diff --git a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java
index 9ab634495..66b3ee06c 100644
--- a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java
+++ b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerAutoConfiguration.java
@@ -131,8 +131,10 @@ public class ChannelServerAutoConfiguration {
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean
- public MessageExchangeService messageExchangeService() {
- return new MessageExchangeServiceImpl();
+ public MessageExchangeService messageExchangeService(ChannelServerProperties serverProperties) {
+ int capacity = serverProperties.getMessageExchange().getTopicCapacity();
+ int survivalTimeMills = serverProperties.getMessageExchange().getTopicSurvivalTimeMills();
+ return new MessageExchangeServiceImpl(capacity, survivalTimeMills);
}
}
@@ -148,8 +150,10 @@ public class ChannelServerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
- public MessageExchangeService messageExchangeService() {
- return new RedisMessageExchangeServiceImpl();
+ public MessageExchangeService messageExchangeService(ChannelServerProperties serverProperties) {
+ int capacity = serverProperties.getMessageExchange().getTopicCapacity();
+ int survivalTimeMills = serverProperties.getMessageExchange().getTopicSurvivalTimeMills();
+ return new RedisMessageExchangeServiceImpl(capacity, survivalTimeMills);
}
@Bean
diff --git a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java
index 1b0adf462..e97f425fb 100644
--- a/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java
+++ b/channel/channel-server-starter/src/main/java/com/alibaba/arthas/channel/server/autoconfigure/ChannelServerProperties.java
@@ -8,6 +8,7 @@ public class ChannelServerProperties {
public static final String PREFIX = "arthas.channel.server";
private Server websocket = new Server();
private Server backend = new Server();
+ private MessageExchange messageExchange = new MessageExchange();
private AgentCleaner agentCleaner = new AgentCleaner();
@@ -35,6 +36,14 @@ public class ChannelServerProperties {
this.agentCleaner = agentCleaner;
}
+ public MessageExchange getMessageExchange() {
+ return messageExchange;
+ }
+
+ public void setMessageExchange(MessageExchange messageExchange) {
+ this.messageExchange = messageExchange;
+ }
+
public static class Server {
private String host;
private int port;
@@ -76,9 +85,9 @@ public class ChannelServerProperties {
public static class AgentCleaner {
private int cleanIntervalMills = 5000;
- private int outOfServiceTimeoutMills = 15*1000;
- private int downTimeoutMills = 30*1000;
- private int removingTimeoutMills = 1800*1000;
+ private int outOfServiceTimeoutMills = 15000;
+ private int downTimeoutMills = 30000;
+ private int removingTimeoutMills = 1800000;
private boolean enabled = true;
public int getCleanIntervalMills() {
@@ -122,4 +131,31 @@ public class ChannelServerProperties {
}
}
+ public static class MessageExchange {
+ private int topicSurvivalTimeMills = 60000;
+ private int topicCapacity = 1000;
+
+ public int getTopicSurvivalTimeMills() {
+ return topicSurvivalTimeMills;
+ }
+
+ /**
+ * message topic survival seconds
+ */
+ public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) {
+ this.topicSurvivalTimeMills = topicSurvivalTimeMills;
+ }
+
+ public int getTopicCapacity() {
+ return topicCapacity;
+ }
+
+ /**
+ * message queue capacity
+ */
+ public void setTopicCapacity(int topicCapacity) {
+ this.topicCapacity = topicCapacity;
+ }
+ }
+
}
diff --git a/channel/channel-server/pom.xml b/channel/channel-server/pom.xml
index 791db64cd..5271db3ee 100644
--- a/channel/channel-server/pom.xml
+++ b/channel/channel-server/pom.xml
@@ -69,8 +69,6 @@
com.taobao.arthas
arthas-core
${project.version}
- test
- true
arthas-tunnel-client
@@ -80,6 +78,46 @@
termd-core
com.alibaba.middleware
+
+ arthas-channel-client
+ com.taobao.arthas
+
+
+ arthas-memorycompiler
+ com.taobao.arthas
+
+
+ arthas-vmtool
+ com.taobao.arthas
+
+
+ ognl
+ ognl
+
+
+ bytekit-core
+ com.alibaba
+
+
+ cfr
+ org.benf
+
+
+ cli
+ com.alibaba.middleware
+
+
+ text-ui
+ com.taobao.text
+
+
+ rsyntaxtextarea
+ com.fifesoft
+
+
+ fastjson
+ com.alibaba
+
diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java
index 1e9dd165d..9edd133b6 100644
--- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java
+++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/message/impl/MessageExchangeServiceImpl.java
@@ -3,6 +3,7 @@ package com.alibaba.arthas.channel.server.message.impl;
import com.alibaba.arthas.channel.server.conf.ScheduledExecutorConfig;
import com.alibaba.arthas.channel.server.message.MessageExchangeException;
import com.alibaba.arthas.channel.server.message.MessageExchangeService;
+import com.alibaba.arthas.channel.server.message.topic.ActionRequestTopic;
import com.alibaba.arthas.channel.server.message.topic.Topic;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -37,15 +38,29 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
private ScheduledExecutorConfig executorServiceConfig;
private ScheduledFuture> scheduledFuture;
+ // topic survival time ms
+ private int topicSurvivalTimeMills = 60*1000;
+
+ // topic message queue capacity
+ private int topicCapacity = 1000;
+
+ public MessageExchangeServiceImpl() {
+ }
+
+ public MessageExchangeServiceImpl(int topicCapacity, int topicSurvivalTimeMills) {
+ this.topicCapacity = topicCapacity;
+ this.topicSurvivalTimeMills = topicSurvivalTimeMills;
+ }
+
@Override
public void start() throws MessageExchangeException {
scheduledFuture = executorServiceConfig.getExecutorService().scheduleWithFixedDelay(() -> {
try {
- cleanIdleTopics(10000);
+ cleanIdleTopics(topicSurvivalTimeMills);
} catch (Exception e) {
logger.error("clean idle topics failure", e);
}
- }, 5000, 5000, TimeUnit.MILLISECONDS);
+ }, 5, 5, TimeUnit.SECONDS);
}
@Override
@@ -102,7 +117,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
private TopicData getAndCheckTopicExists(Topic topic) throws MessageExchangeException {
TopicData topicData = topicMap.get(topic);
if (topicData == null) {
- throw new MessageExchangeException("topic is not exists");
+ throw new MessageExchangeException("topic is not exists: " + topic);
}
return topicData;
}
@@ -190,6 +205,9 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
long now = System.currentTimeMillis();
List topicDataList = new ArrayList<>(topicMap.values());
for (TopicData topicData : topicDataList) {
+ if (topicData.topic instanceof ActionRequestTopic) {
+ continue;
+ }
long idle = now - topicData.getLastActiveTime();
if (!topicData.isSubscribed() && idle > timeout) {
try {
@@ -202,7 +220,23 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
}
}
- static class TopicData {
+ public int getTopicSurvivalTimeMills() {
+ return topicSurvivalTimeMills;
+ }
+
+ public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) {
+ this.topicSurvivalTimeMills = topicSurvivalTimeMills;
+ }
+
+ public int getTopicCapacity() {
+ return topicCapacity;
+ }
+
+ public void setTopicCapacity(int topicCapacity) {
+ this.topicCapacity = topicCapacity;
+ }
+
+ class TopicData {
private BlockingQueue messageQueue;
private Topic topic;
private long createTime;
@@ -211,7 +245,7 @@ public class MessageExchangeServiceImpl implements MessageExchangeService {
public TopicData(Topic topic) {
this.topic = topic;
- messageQueue = new LinkedBlockingQueue(1000);
+ messageQueue = new LinkedBlockingQueue(topicCapacity);
createTime = System.currentTimeMillis();
lastActiveTime = createTime;
}
diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java
index c9632ac81..a10c7b1ae 100644
--- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java
+++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/redis/RedisMessageExchangeServiceImpl.java
@@ -25,6 +25,19 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService {
private static final String topicPrefix = "arthas:channel:topics:agent:";
+ // topic survival time ms
+ private int topicSurvivalTimeMills = 60*1000;
+
+ // topic message queue capacity
+ private int topicCapacity = 1000;
+
+ public RedisMessageExchangeServiceImpl() {
+ }
+
+ public RedisMessageExchangeServiceImpl(int topicSurvivalTimeMills, int topicCapacity) {
+ this.topicSurvivalTimeMills = topicSurvivalTimeMills;
+ this.topicCapacity = topicCapacity;
+ }
@Autowired
private ReactiveRedisTemplate redisTemplate;
@@ -70,9 +83,10 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService {
public void pushMessage(Topic topic, byte[] messageBytes) throws MessageExchangeException {
String key = topic.getTopic();
redisTemplate.opsForList().leftPush(key, messageBytes).doOnSuccess(value -> {
- redisTemplate.expire(key, Duration.ofMillis(10000)).subscribe();
+ redisTemplate.expire(key, Duration.ofMillis(topicSurvivalTimeMills)).subscribe();
}).subscribe();
+ //TODO check topic capacity
}
@Override
@@ -117,4 +131,19 @@ public class RedisMessageExchangeServiceImpl implements MessageExchangeService {
}
+ public int getTopicSurvivalTimeMills() {
+ return topicSurvivalTimeMills;
+ }
+
+ public void setTopicSurvivalTimeMills(int topicSurvivalTimeMills) {
+ this.topicSurvivalTimeMills = topicSurvivalTimeMills;
+ }
+
+ public int getTopicCapacity() {
+ return topicCapacity;
+ }
+
+ public void setTopicCapacity(int topicCapacity) {
+ this.topicCapacity = topicCapacity;
+ }
}
diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java
new file mode 100644
index 000000000..4b0943dec
--- /dev/null
+++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/web/LegacyApiController.java
@@ -0,0 +1,237 @@
+package com.alibaba.arthas.channel.server.web;
+
+import com.alibaba.arthas.channel.proto.ActionRequest;
+import com.alibaba.arthas.channel.proto.ActionResponse;
+import com.alibaba.arthas.channel.proto.ExecuteParams;
+import com.alibaba.arthas.channel.proto.ExecuteResult;
+import com.alibaba.arthas.channel.proto.RequestAction;
+import com.alibaba.arthas.channel.proto.ResponseStatus;
+import com.alibaba.arthas.channel.proto.ResultFormat;
+import com.alibaba.arthas.channel.server.model.AgentVO;
+import com.alibaba.arthas.channel.server.service.AgentManageService;
+import com.alibaba.arthas.channel.server.service.ApiActionDelegateService;
+import com.alibaba.fastjson.JSON;
+import com.taobao.arthas.core.shell.term.impl.http.api.ApiAction;
+import com.taobao.arthas.core.shell.term.impl.http.api.ApiRequest;
+import com.taobao.arthas.core.shell.term.impl.http.api.ApiResponse;
+import com.taobao.arthas.core.shell.term.impl.http.api.ApiState;
+import com.taobao.arthas.core.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ResponseStatusException;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+/**
+ * Compatible with Arthas Http API (https://arthas.aliyun.com/doc/http-api.html)
+ *
+ * @author gongdewei 2021/5/24
+ */
+@RestController
+public class LegacyApiController {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(LegacyApiController.class);
+
+ @Autowired
+ private AgentManageService agentManageService;
+
+ @Autowired
+ private ApiActionDelegateService apiActionDelegateService;
+
+ @RequestMapping("/legacy_api/{agentId}")
+ public Mono process(@PathVariable String agentId, @RequestBody ApiRequest request) {
+
+ try {
+ checkAgentExists(agentId);
+
+ // set default exec timeout
+ if (request.getExecTimeout() == null) {
+ request.setExecTimeout(30000);
+ }
+
+ Mono actionResponseMono = null;
+ ApiAction action = parseApiAction(request.getAction());
+ // process pull results
+ switch (action) {
+ case PULL_RESULTS:
+ checkRequestId(request);
+ actionResponseMono = apiActionDelegateService.pullResults(agentId, request.getRequestId(), request.getExecTimeout());
+ return convertToApiResponse(actionResponseMono);
+ }
+
+ // process other actions
+ ActionRequest actionRequest = createActionRequest(agentId, request);
+ switch (actionRequest.getAction()) {
+ case EXECUTE:
+ actionResponseMono = apiActionDelegateService.execCommand(agentId, actionRequest);
+ return convertToApiResponse(actionResponseMono);
+ case ASYNC_EXECUTE:
+ actionResponseMono = apiActionDelegateService.asyncExecCommand(agentId, actionRequest);
+ return convertToApiResponse(actionResponseMono);
+ case INIT_SESSION:
+ actionResponseMono = apiActionDelegateService.initSession(agentId);
+ return convertToApiResponse(actionResponseMono);
+ case CLOSE_SESSION:
+ actionResponseMono = apiActionDelegateService.closeSession(agentId, request.getSessionId());
+ return convertToApiResponse(actionResponseMono);
+ default:
+ throw new UnsupportedOperationException("unsupported action: " + actionRequest.getAction());
+ }
+ } catch (Throwable e) {
+ logger.error("process request failed, agentId: {}, request: {}", agentId, request, e);
+ ApiResponse response = new ApiResponse();
+ response.setState(ApiState.FAILED)
+ .setMessage("process request failed: " + e.getMessage())
+ .setAgentId(agentId)
+ .setRequestId(request.getRequestId())
+ .setSessionId(request.getSessionId());
+ return Mono.just(response);
+ }
+ }
+
+ private void checkRequestId(ApiRequest request) {
+ if (StringUtils.isBlank(request.getRequestId())) {
+ throw new IllegalArgumentException("Invalid request, the 'requestId' is required");
+ }
+ }
+
+ private Mono convertToApiResponse(Mono actionResponseMono) {
+ return actionResponseMono.flatMap((actionResponse) -> Mono.just(convertToApiResponse(actionResponse)));
+ }
+
+ private ApiResponse convertToApiResponse(ActionResponse actionResponse) {
+ ApiResponse response = new ApiResponse();
+ response.setAgentId(actionResponse.getAgentId());
+ response.setState(convertToApiState(actionResponse.getStatus()));
+ if (StringUtils.hasText(actionResponse.getRequestId())) {
+ response.setRequestId(actionResponse.getRequestId());
+ }
+ if (StringUtils.hasText(actionResponse.getMessage())) {
+ response.setMessage(actionResponse.getMessage());
+ }
+ if (StringUtils.hasText(actionResponse.getSessionId())) {
+ response.setSessionId(actionResponse.getSessionId());
+ }
+ if (actionResponse.hasExecuteResult()) {
+ String resultsJson = null;
+ ExecuteResult executeResult = actionResponse.getExecuteResult();
+ if (executeResult.hasResultsJson()) {
+ resultsJson = executeResult.getResultsJson().getValue();
+ }
+ Map body = new TreeMap();
+
+ // attributes that are not available
+// body.put("command", commandLine);
+ //packing results
+// body.put("jobId", job.id());
+// body.put("jobStatus", job.status());
+// body.put("timeExpired", timeExpired);
+// if (timeExpired) {
+// body.put("timeout", timeout);
+// }
+
+ // set result as list
+ body.put("results", parseJsonToList(resultsJson));
+ // set results is a json string
+ //body.put("resultsJson", resultsJson);
+
+ response.setBody(body);
+ }
+ return response;
+ }
+
+ private List