From d5362bae97c4ecd5f544f0cf4ddd3ef3b9f7bd39 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Mon, 10 Jan 2022 14:14:57 +0800 Subject: [PATCH] change heartbeat to streaming method, fix grpc connections leaks in channel client --- channel/README.md | 22 ++++- .../arthas/channel/client/ChannelClient.java | 60 +++++++----- .../channel/client/ChannelClientTest.java | 15 +++ .../src/test/resources/logback-test.xml | 2 + .../src/main/proto/ArthasService.proto | 2 +- .../server/grpc/ArthasServiceGrpcImpl.java | 95 +++++++++++++------ .../server/ArthasServiceClientTest.java | 2 +- .../ArthasServiceServerJsonResultTest.java | 28 ++++-- .../server/ArthasServiceServerTest.java | 32 +++++-- 9 files changed, 181 insertions(+), 77 deletions(-) diff --git a/channel/README.md b/channel/README.md index 9694bc25e..53e22476a 100644 --- a/channel/README.md +++ b/channel/README.md @@ -107,7 +107,7 @@ http://localhost:8800/ /agent/{agentId}/exec ```shell -agent_id=a76da9861cfd +agent_id=becb81b8f33843ca932cd75ff9970318 curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/${agent_id}/exec -d ' { "action":"EXECUTE", @@ -119,13 +119,27 @@ curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/ ' ``` +返回结果: + +```json +{ + "agentId": "becb81b8f33843ca932cd75ff9970318", + "requestId": "LHY9DpPsISDu", + "status": "SUCCEEDED", + "sessionId": "9322bad2-33a8-4f7d-bafd-2f7701824bd2", + "executeResult": { + "resultsJson": "[{\"agentId\":\"becb81b8f33843ca932cd75ff9970318\",\"channelServer\":\"localhost:7700\",\"javaPid\":9347,\"jobId\":3,\"sessionId\":\"9322bad2-33a8-4f7d-bafd-2f7701824bd2\",\"tunnelConnected\":false,\"type\":\"session\"},{\"jobId\":3,\"statusCode\":0,\"type\":\"status\"}]" + } +} +``` + #### 4、异步执行命令 1) 异步执行命令并返回sse结果 /agent/{agentId}/sse_async_exec ```shell -agent_id=a76da9861cfd +agent_id=becb81b8f33843ca932cd75ff9970318 curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/${agent_id}/sse_async_exec -d ' { "action":"ASYNC_EXECUTE", @@ -142,7 +156,7 @@ curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/ /agent/{agentId}/results/{requestId} ```shell -agent_id=a76da9861cfd +agent_id=becb81b8f33843ca932cd75ff9970318 curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/${agent_id}/async_exec -d ' { "action":"ASYNC_EXECUTE", @@ -157,7 +171,7 @@ curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/ ```json { - "agentId": "a76da9861cfd", + "agentId": "becb81b8f33843ca932cd75ff9970318", "requestId": "my7yP6sU7PSe", "status": "CONTINUOUS", "sessionId": "..." diff --git a/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java b/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java index 795694da7..797874ba5 100644 --- a/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java +++ b/channel/channel-client/src/main/java/com/alibaba/arthas/channel/client/ChannelClient.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @author gongdewei 2020/8/14 @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; public class ChannelClient { private static final Logger logger = LoggerFactory.getLogger(ChannelClient.class); + private static final int MIN_INTERVAL = 2; private AgentInfoService agentInfoService; private RequestListener requestListener; @@ -41,10 +43,10 @@ public class ChannelClient { private volatile boolean isError; private String host; private int port; - private ArthasServiceGrpc.ArthasServiceStub arthasServiceStub; - private StreamObserver responseStreamObserver; - private ManagedChannel channel; - private ScheduledFuture reconnectFuture; + private volatile ArthasServiceGrpc.ArthasServiceStub arthasServiceStub; + private volatile StreamObserver responseStreamObserver; + private volatile ManagedChannel channel; + private volatile ScheduledFuture reconnectFuture; private String channelServerAddress; private int reconnectDelay = 5; private int heartbeatInterval = 5; @@ -55,6 +57,7 @@ public class ChannelClient { private List channelFeatures = Arrays.asList("WebConsole", "ExecuteCommand"); private long lastHeartbeatTime; private int workThreads = 2; + private volatile StreamObserver heartbeatRequestStreamObserver; public ChannelClient(String host, int port) { this.host = host; @@ -136,11 +139,11 @@ public class ChannelClient { } } - private void connect() throws Exception { + private synchronized void connect() throws Exception { logger.info("Connecting to channel server [{}:{}] ..", host, port); ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress(host, port); //TODO support ssl & plain text - channelBuilder.usePlaintext(true); + channelBuilder.usePlaintext(); channel = channelBuilder.build(); //register @@ -235,14 +238,30 @@ public class ChannelClient { } }); + heartbeatRequestStreamObserver = arthasServiceStub.heartbeat(new StreamObserver() { + @Override + public void onNext(HeartbeatResponse value) { + logger.debug("heartbeat result: {}" , value); + } + + @Override + public void onError(Throwable t) { + onClientError("send heartbeat error" , t); + } + + @Override + public void onCompleted() { + } + }); + isError = false; agentInfoService.updateAgentStatus(AgentStatus.IN_SERVICE); - sendHeartbeat(arthasServiceStub); + sendHeartbeat(); logger.info("Channel client is ready."); } - private void sendHeartbeat(final ArthasServiceGrpc.ArthasServiceStub arthasServiceStub) { + void sendHeartbeat() { AgentInfo agentInfo = agentInfoService.getAgentInfo(); HeartbeatRequest heartbeatRequest = HeartbeatRequest.newBuilder() .setAgentId(agentInfo.getAgentId()) @@ -251,21 +270,7 @@ public class ChannelClient { .build(); logger.debug("sending heartbeat: {}", heartbeatRequest); - arthasServiceStub.heartbeat(heartbeatRequest, new StreamObserver() { - @Override - public void onNext(HeartbeatResponse value) { - logger.debug("heartbeat result: {}", value); - } - - @Override - public void onError(Throwable t) { - onClientError("send heartbeat error", t); - } - - @Override - public void onCompleted() { - } - }); + heartbeatRequestStreamObserver.onNext(heartbeatRequest); lastHeartbeatTime = System.currentTimeMillis(); } @@ -314,7 +319,7 @@ public class ChannelClient { long delta = System.currentTimeMillis() - lastHeartbeatTime; if (delta >= heartbeatInterval * 1000) { try { - sendHeartbeat(arthasServiceStub); + sendHeartbeat(); } catch (Throwable e) { logger.error("send heartbeat failure", e); } @@ -341,6 +346,9 @@ public class ChannelClient { } protected boolean isWellKnownError(Throwable ex) { + if (ex instanceof InterruptedException || ex instanceof TimeoutException) { + return true; + } String error = ex.toString(); if (error.contains("UNAVAILABLE: Channel shutdownNow invoked")) { return true; @@ -385,7 +393,7 @@ public class ChannelClient { * @param reconnectDelay */ public void setReconnectDelay(int reconnectDelay) { - this.reconnectDelay = reconnectDelay; + this.reconnectDelay = Math.max(MIN_INTERVAL, reconnectDelay); } public int getHeartbeatInterval() { @@ -397,7 +405,7 @@ public class ChannelClient { * @param heartbeatInterval */ public void setHeartbeatInterval(int heartbeatInterval) { - this.heartbeatInterval = heartbeatInterval; + this.heartbeatInterval = Math.max(MIN_INTERVAL, heartbeatInterval); } public ScheduledExecutorService getExecutorService() { diff --git a/channel/channel-client/src/test/java/com/alibaba/arthas/channel/client/ChannelClientTest.java b/channel/channel-client/src/test/java/com/alibaba/arthas/channel/client/ChannelClientTest.java index 1ed0158b0..62735f9ab 100644 --- a/channel/channel-client/src/test/java/com/alibaba/arthas/channel/client/ChannelClientTest.java +++ b/channel/channel-client/src/test/java/com/alibaba/arthas/channel/client/ChannelClientTest.java @@ -24,6 +24,21 @@ public class ChannelClientTest { channelClient.start(); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // test heartbeat + try { + for (int i = 0; i < 1000; i++) { + channelClient.sendHeartbeat(); + Thread.sleep(100); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.in.read(); } diff --git a/channel/channel-client/src/test/resources/logback-test.xml b/channel/channel-client/src/test/resources/logback-test.xml index cd2b87e62..81c49cb86 100644 --- a/channel/channel-client/src/test/resources/logback-test.xml +++ b/channel/channel-client/src/test/resources/logback-test.xml @@ -13,4 +13,6 @@ + + \ No newline at end of file diff --git a/channel/channel-common/src/main/proto/ArthasService.proto b/channel/channel-common/src/main/proto/ArthasService.proto index dbeba1014..618a3c1a5 100644 --- a/channel/channel-common/src/main/proto/ArthasService.proto +++ b/channel/channel-common/src/main/proto/ArthasService.proto @@ -274,5 +274,5 @@ service ArthasService { /* * Report heartbeat to server from arthas client */ - rpc heartbeat(HeartbeatRequest) returns (HeartbeatResponse); + rpc heartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse); } diff --git a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java index 2c1bf1f41..e263bd24f 100644 --- a/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java +++ b/channel/channel-server/src/main/java/com/alibaba/arthas/channel/server/grpc/ArthasServiceGrpcImpl.java @@ -165,32 +165,36 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa @Override public void register(AgentInfo request, StreamObserver responseObserver) { - long now = System.currentTimeMillis(); - Optional optionalAgentVO = agentManageService.findAgentById(request.getAgentId()).block(); - AgentVO agentVO; - if (optionalAgentVO.isPresent()) { - agentVO = optionalAgentVO.get(); - copyAgentVO(request, agentVO); - agentVO.setModifiedTime(now); - agentVO.setHeartbeatTime(now); - agentManageService.updateAgent(agentVO); - responseObserver.onNext(RegisterResult.newBuilder() - .setStatus(0) - .setMessage("Agent info has been updated: "+request.getAgentId()) - .build()); - } else { - agentVO = new AgentVO(); - copyAgentVO(request, agentVO); - agentVO.setCreatedTime(now); - agentVO.setModifiedTime(now); - agentVO.setHeartbeatTime(now); - agentManageService.addAgent(agentVO); - responseObserver.onNext(RegisterResult.newBuilder() - .setStatus(0) - .setMessage("Agent info has been added: "+request.getAgentId()) - .build()); + try { + long now = System.currentTimeMillis(); + Optional optionalAgentVO = agentManageService.findAgentById(request.getAgentId()).block(); + AgentVO agentVO; + if (optionalAgentVO.isPresent()) { + agentVO = optionalAgentVO.get(); + copyAgentVO(request, agentVO); + agentVO.setModifiedTime(now); + agentVO.setHeartbeatTime(now); + agentManageService.updateAgent(agentVO); + responseObserver.onNext(RegisterResult.newBuilder() + .setStatus(0) + .setMessage("Agent info has been updated: "+request.getAgentId()) + .build()); + } else { + agentVO = new AgentVO(); + copyAgentVO(request, agentVO); + agentVO.setCreatedTime(now); + agentVO.setModifiedTime(now); + agentVO.setHeartbeatTime(now); + agentManageService.addAgent(agentVO); + responseObserver.onNext(RegisterResult.newBuilder() + .setStatus(0) + .setMessage("Agent info has been added: "+request.getAgentId()) + .build()); + } + logger.info("register agent: "+agentVO.getAgentId()); + } finally { + responseObserver.onCompleted(); } - logger.info("register agent: "+agentVO.getAgentId()); } private void copyAgentVO(AgentInfo agentInfo, AgentVO agentVO) { @@ -208,19 +212,48 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa } @Override - public void heartbeat(HeartbeatRequest request, StreamObserver responseObserver) { + public StreamObserver heartbeat(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(HeartbeatRequest heartbeatRequest) { + handleHeartbeat(heartbeatRequest, responseObserver); + } + + @Override + public void onError(Throwable t) { + //TODO 通知网络链路异常 + logger.error("An error occurred in send heartbean response stream", t); + } + + @Override + public void onCompleted() { + + } + }; + } + + void handleHeartbeat(HeartbeatRequest request, StreamObserver responseObserver) { Optional optionalAgentVO = agentManageService.findAgentById(request.getAgentId()).block(); if (!optionalAgentVO.isPresent()) { responseObserver.onNext(HeartbeatResponse.newBuilder() .setStatus(1001) - .setMessage("Agent not found: "+request.getAgentId()) + .setMessage("Agent not found: " + request.getAgentId()) .build()); return; } - agentBizSerivce.heartbeat(request.getAgentId(), request.getAgentStatus().name(), request.getAgentVersion()); - responseObserver.onNext(HeartbeatResponse.newBuilder() - .setStatus(0) - .build()); + try { + agentBizSerivce.heartbeat(request.getAgentId(), request.getAgentStatus().name(), request.getAgentVersion()); + responseObserver.onNext(HeartbeatResponse.newBuilder() + .setStatus(0) + .setMessage("heartbeat ok: " + request.getAgentId()) + .build()); + } catch (Exception e) { + logger.error("Heartbeat failure, agentId: " + request.getAgentId() + ", error: " + e.getMessage(), e); + responseObserver.onNext(HeartbeatResponse.newBuilder() + .setStatus(1002) + .setMessage("Heartbeat failure: " + request.getAgentId()) + .build()); + } } } diff --git a/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceClientTest.java b/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceClientTest.java index d26f89fb0..d718ef651 100644 --- a/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceClientTest.java +++ b/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceClientTest.java @@ -88,7 +88,7 @@ public class ArthasServiceClientTest { return; } System.out.println("start client .."); - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 7700).usePlaintext(true).build(); + ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 7700).usePlaintext().build(); ArthasServiceGrpc.ArthasServiceStub arthasServiceStub = ArthasServiceGrpc.newStub(channel); final AtomicReference> actionResponseStreamObserverHolder = new AtomicReference>(); diff --git a/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerJsonResultTest.java b/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerJsonResultTest.java index dd6281a50..c6a76c55f 100644 --- a/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerJsonResultTest.java +++ b/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerJsonResultTest.java @@ -205,12 +205,28 @@ public class ArthasServiceServerJsonResultTest { } @Override - public void heartbeat(HeartbeatRequest heartbeatRequest, StreamObserver responseObserver) { - System.out.println("heartbeat: " + heartbeatRequest); - responseObserver.onNext(HeartbeatResponse.newBuilder() - .setStatus(0) - .build()); - responseObserver.onCompleted(); + public StreamObserver heartbeat(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(HeartbeatRequest heartbeatRequest) { + System.out.println("heartbeat: " + heartbeatRequest); + responseObserver.onNext(HeartbeatResponse.newBuilder() + .setStatus(0) + .build()); + } + + @Override + public void onError(Throwable t) { + isError = true; + System.out.println("heartbeat onError"); + t.printStackTrace(); + } + + @Override + public void onCompleted() { + System.out.println("heartbeat onCompleted"); + } + }; } public StreamObserver getActionRequestStreamObserver() { diff --git a/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerTest.java b/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerTest.java index 6bd713561..f8ea2e49b 100644 --- a/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerTest.java +++ b/channel/channel-server/src/test/java/com/alibaba/arthas/channel/server/ArthasServiceServerTest.java @@ -195,13 +195,13 @@ public class ArthasServiceServerTest { @Override public void onError(Throwable t) { isError = true; - System.out.println("onError"); + System.out.println("submitResponse onError"); t.printStackTrace(); } @Override public void onCompleted() { - System.out.println("onCompleted"); + System.out.println("submitResponse onCompleted"); } }; } @@ -217,12 +217,28 @@ public class ArthasServiceServerTest { } @Override - public void heartbeat(HeartbeatRequest heartbeatRequest, StreamObserver responseObserver) { - System.out.println("heartbeat: " + heartbeatRequest); - responseObserver.onNext(HeartbeatResponse.newBuilder() - .setStatus(0) - .build()); - responseObserver.onCompleted(); + public StreamObserver heartbeat(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(HeartbeatRequest heartbeatRequest) { + System.out.println("heartbeat: " + heartbeatRequest); + responseObserver.onNext(HeartbeatResponse.newBuilder() + .setStatus(0) + .build()); + } + + @Override + public void onError(Throwable t) { + isError = true; + System.out.println("heartbeat onError"); + t.printStackTrace(); + } + + @Override + public void onCompleted() { + System.out.println("heartbeat onCompleted"); + } + }; } public StreamObserver getActionRequestStreamObserver() {