fix agent channel client stream leaks of 3.5.3

pull/1490/head
gongdewei 3 years ago
parent 2102fc6929
commit 303efc6cad

@ -1,5 +1,7 @@
package com.alibaba.arthas.channel.client;
import com.alibaba.arthas.channel.common.ChannelFeatures;
import com.alibaba.arthas.channel.common.ChannelVersions;
import com.alibaba.arthas.channel.proto.ActionRequest;
import com.alibaba.arthas.channel.proto.ActionResponse;
import com.alibaba.arthas.channel.proto.AgentInfo;
@ -53,8 +55,8 @@ public class ChannelClient {
private EventLoopGroup group;
//channel info
private String channelVersion = "1.0.0";
private List<String> channelFeatures = Arrays.asList("WebConsole", "ExecuteCommand");
private String channelVersion = ChannelVersions.V_1_1_0;
private List<String> channelFeatures = Arrays.asList(ChannelFeatures.WEB_CONSOLE, ChannelFeatures.EXECUTE_COMMAND);
private long lastHeartbeatTime;
private int workThreads = 2;
private volatile StreamObserver<HeartbeatRequest> heartbeatRequestStreamObserver;

@ -0,0 +1,19 @@
package com.alibaba.arthas.channel.common;
public interface ChannelFeatures {
/**
* Support execute command
* @see com.alibaba.arthas.channel.proto.RequestAction
* @see com.alibaba.arthas.channel.proto.ActionRequest
*/
String EXECUTE_COMMAND = "ExecuteCommand";
/**
* Support proxying web console
* @see com.alibaba.arthas.channel.proto.RequestAction
* @see com.alibaba.arthas.channel.proto.ActionRequest
*/
String WEB_CONSOLE = "WebConsole";
}

@ -0,0 +1,9 @@
package com.alibaba.arthas.channel.common;
public interface ChannelVersions {
// first version
String V_1_0_0 = "1.0.0";
// support streaming heartbeat
String V_1_1_0 = "1.1.0";
}

@ -16,6 +16,7 @@ import com.alibaba.arthas.channel.server.service.AgentBizSerivce;
import com.alibaba.arthas.channel.server.service.AgentManageService;
import com.alibaba.arthas.channel.server.message.MessageExchangeService;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -171,7 +172,7 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
AgentVO agentVO;
if (optionalAgentVO.isPresent()) {
agentVO = optionalAgentVO.get();
copyAgentVO(request, agentVO);
copyToAgentVO(request, agentVO);
agentVO.setModifiedTime(now);
agentVO.setHeartbeatTime(now);
agentManageService.updateAgent(agentVO);
@ -181,7 +182,7 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
.build());
} else {
agentVO = new AgentVO();
copyAgentVO(request, agentVO);
copyToAgentVO(request, agentVO);
agentVO.setCreatedTime(now);
agentVO.setModifiedTime(now);
agentVO.setHeartbeatTime(now);
@ -191,13 +192,13 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
.setMessage("Agent info has been added: "+request.getAgentId())
.build());
}
logger.info("register agent: "+agentVO.getAgentId());
logger.info("register agent: "+request);
} finally {
responseObserver.onCompleted();
}
}
private void copyAgentVO(AgentInfo agentInfo, AgentVO agentVO) {
private void copyToAgentVO(AgentInfo agentInfo, AgentVO agentVO) {
agentVO.setAgentId(agentInfo.getAgentId());
agentVO.setAgentVersion(agentInfo.getAgentVersion());
agentVO.setAgentStatus(agentInfo.getAgentStatus().name());
@ -217,6 +218,10 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
@Override
public void onNext(HeartbeatRequest heartbeatRequest) {
handleHeartbeat(heartbeatRequest, responseObserver);
if (StringUtils.equalsAnyIgnoreCase(heartbeatRequest.getAgentVersion(), "3.5.3")) {
//fix channel client stream leaks (version < 3.5.4)
responseObserver.onCompleted();
}
}
@Override

Loading…
Cancel
Save