From 303efc6cad5d5dc13cee6564180d9c777873d145 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Mon, 10 Jan 2022 15:21:19 +0800 Subject: [PATCH] fix agent channel client stream leaks of 3.5.3 --- .../arthas/channel/client/ChannelClient.java | 6 ++++-- .../channel/common/ChannelFeatures.java | 19 +++++++++++++++++++ .../channel/common/ChannelVersions.java | 9 +++++++++ .../server/grpc/ArthasServiceGrpcImpl.java | 13 +++++++++---- 4 files changed, 41 insertions(+), 6 deletions(-) create mode 100644 channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelFeatures.java create mode 100644 channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelVersions.java diff --git a/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java b/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java index 797874ba5..aa94fa233 100644 --- a/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java +++ b/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java @@ -1,5 +1,7 @@ package com.alibaba.arthas.channel.client; +import com.alibaba.arthas.channel.common.ChannelFeatures; +import com.alibaba.arthas.channel.common.ChannelVersions; import com.alibaba.arthas.channel.proto.ActionRequest; import com.alibaba.arthas.channel.proto.ActionResponse; import com.alibaba.arthas.channel.proto.AgentInfo; @@ -53,8 +55,8 @@ public class ChannelClient { private EventLoopGroup group; //channel info - private String channelVersion = "1.0.0"; - private List channelFeatures = Arrays.asList("WebConsole", "ExecuteCommand"); + private String channelVersion = ChannelVersions.V_1_1_0; + private List channelFeatures = Arrays.asList(ChannelFeatures.WEB_CONSOLE, ChannelFeatures.EXECUTE_COMMAND); private long lastHeartbeatTime; private int workThreads = 2; private volatile StreamObserver heartbeatRequestStreamObserver; diff --git a/channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelFeatures.java b/channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelFeatures.java new file mode 100644 index 000000000..fbcb59ec5 --- /dev/null +++ b/channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelFeatures.java @@ -0,0 +1,19 @@ +package com.alibaba.arthas.channel.common; + +public interface ChannelFeatures { + + /** + * Support execute command + * @see com.alibaba.arthas.channel.proto.RequestAction + * @see com.alibaba.arthas.channel.proto.ActionRequest + */ + String EXECUTE_COMMAND = "ExecuteCommand"; + + /** + * Support proxying web console + * @see com.alibaba.arthas.channel.proto.RequestAction + * @see com.alibaba.arthas.channel.proto.ActionRequest + */ + String WEB_CONSOLE = "WebConsole"; + +} \ No newline at end of file diff --git a/channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelVersions.java b/channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelVersions.java new file mode 100644 index 000000000..cf1193d4a --- /dev/null +++ b/channel/channel-common/src/main/java/com/alibaba/arthas/channel/common/ChannelVersions.java @@ -0,0 +1,9 @@ +package com.alibaba.arthas.channel.common; + +public interface ChannelVersions { + // first version + String V_1_0_0 = "1.0.0"; + + // support streaming heartbeat + String V_1_1_0 = "1.1.0"; +} \ No newline at end of file diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java index e263bd24f..7003c2b31 100644 --- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java +++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java @@ -16,6 +16,7 @@ import com.alibaba.arthas.channel.server.service.AgentBizSerivce; import com.alibaba.arthas.channel.server.service.AgentManageService; import com.alibaba.arthas.channel.server.message.MessageExchangeService; import io.grpc.stub.StreamObserver; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -171,7 +172,7 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa AgentVO agentVO; if (optionalAgentVO.isPresent()) { agentVO = optionalAgentVO.get(); - copyAgentVO(request, agentVO); + copyToAgentVO(request, agentVO); agentVO.setModifiedTime(now); agentVO.setHeartbeatTime(now); agentManageService.updateAgent(agentVO); @@ -181,7 +182,7 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa .build()); } else { agentVO = new AgentVO(); - copyAgentVO(request, agentVO); + copyToAgentVO(request, agentVO); agentVO.setCreatedTime(now); agentVO.setModifiedTime(now); agentVO.setHeartbeatTime(now); @@ -191,13 +192,13 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa .setMessage("Agent info has been added: "+request.getAgentId()) .build()); } - logger.info("register agent: "+agentVO.getAgentId()); + logger.info("register agent: "+request); } finally { responseObserver.onCompleted(); } } - private void copyAgentVO(AgentInfo agentInfo, AgentVO agentVO) { + private void copyToAgentVO(AgentInfo agentInfo, AgentVO agentVO) { agentVO.setAgentId(agentInfo.getAgentId()); agentVO.setAgentVersion(agentInfo.getAgentVersion()); agentVO.setAgentStatus(agentInfo.getAgentStatus().name()); @@ -217,6 +218,10 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa @Override public void onNext(HeartbeatRequest heartbeatRequest) { handleHeartbeat(heartbeatRequest, responseObserver); + if (StringUtils.equalsAnyIgnoreCase(heartbeatRequest.getAgentVersion(), "3.5.3")) { + //fix channel client stream leaks (version < 3.5.4) + responseObserver.onCompleted(); + } } @Override