change heartbeat to streaming method, fix grpc connections leaks in channel client

pull/1490/head
gongdewei 3 years ago
parent 34ff805511
commit d5362bae97

@ -107,7 +107,7 @@ http://localhost:8800/
/agent/{agentId}/exec
```shell
agent_id=a76da9861cfd
agent_id=becb81b8f33843ca932cd75ff9970318
curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/${agent_id}/exec -d '
{
"action":"EXECUTE",
@ -119,13 +119,27 @@ curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/
'
```
返回结果:
```json
{
"agentId": "becb81b8f33843ca932cd75ff9970318",
"requestId": "LHY9DpPsISDu",
"status": "SUCCEEDED",
"sessionId": "9322bad2-33a8-4f7d-bafd-2f7701824bd2",
"executeResult": {
"resultsJson": "[{\"agentId\":\"becb81b8f33843ca932cd75ff9970318\",\"channelServer\":\"localhost:7700\",\"javaPid\":9347,\"jobId\":3,\"sessionId\":\"9322bad2-33a8-4f7d-bafd-2f7701824bd2\",\"tunnelConnected\":false,\"type\":\"session\"},{\"jobId\":3,\"statusCode\":0,\"type\":\"status\"}]"
}
}
```
#### 4、异步执行命令
1) 异步执行命令并返回sse结果
/agent/{agentId}/sse_async_exec
```shell
agent_id=a76da9861cfd
agent_id=becb81b8f33843ca932cd75ff9970318
curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/${agent_id}/sse_async_exec -d '
{
"action":"ASYNC_EXECUTE",
@ -142,7 +156,7 @@ curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/
/agent/{agentId}/results/{requestId}
```shell
agent_id=a76da9861cfd
agent_id=becb81b8f33843ca932cd75ff9970318
curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/${agent_id}/async_exec -d '
{
"action":"ASYNC_EXECUTE",
@ -157,7 +171,7 @@ curl -Ss -XPOST -H "Content-Type: application/json" http://localhost:8800/agent/
```json
{
"agentId": "a76da9861cfd",
"agentId": "becb81b8f33843ca932cd75ff9970318",
"requestId": "my7yP6sU7PSe",
"status": "CONTINUOUS",
"sessionId": "..."

@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author gongdewei 2020/8/14
@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
public class ChannelClient {
private static final Logger logger = LoggerFactory.getLogger(ChannelClient.class);
private static final int MIN_INTERVAL = 2;
private AgentInfoService agentInfoService;
private RequestListener requestListener;
@ -41,10 +43,10 @@ public class ChannelClient {
private volatile boolean isError;
private String host;
private int port;
private ArthasServiceGrpc.ArthasServiceStub arthasServiceStub;
private StreamObserver<ActionResponse> responseStreamObserver;
private ManagedChannel channel;
private ScheduledFuture<?> reconnectFuture;
private volatile ArthasServiceGrpc.ArthasServiceStub arthasServiceStub;
private volatile StreamObserver<ActionResponse> responseStreamObserver;
private volatile ManagedChannel channel;
private volatile ScheduledFuture<?> reconnectFuture;
private String channelServerAddress;
private int reconnectDelay = 5;
private int heartbeatInterval = 5;
@ -55,6 +57,7 @@ public class ChannelClient {
private List<String> channelFeatures = Arrays.asList("WebConsole", "ExecuteCommand");
private long lastHeartbeatTime;
private int workThreads = 2;
private volatile StreamObserver<HeartbeatRequest> heartbeatRequestStreamObserver;
public ChannelClient(String host, int port) {
this.host = host;
@ -136,11 +139,11 @@ public class ChannelClient {
}
}
private void connect() throws Exception {
private synchronized void connect() throws Exception {
logger.info("Connecting to channel server [{}:{}] ..", host, port);
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port);
//TODO support ssl & plain text
channelBuilder.usePlaintext(true);
channelBuilder.usePlaintext();
channel = channelBuilder.build();
//register
@ -235,14 +238,30 @@ public class ChannelClient {
}
});
heartbeatRequestStreamObserver = arthasServiceStub.heartbeat(new StreamObserver<HeartbeatResponse>() {
@Override
public void onNext(HeartbeatResponse value) {
logger.debug("heartbeat result: {}" , value);
}
@Override
public void onError(Throwable t) {
onClientError("send heartbeat error" , t);
}
@Override
public void onCompleted() {
}
});
isError = false;
agentInfoService.updateAgentStatus(AgentStatus.IN_SERVICE);
sendHeartbeat(arthasServiceStub);
sendHeartbeat();
logger.info("Channel client is ready.");
}
private void sendHeartbeat(final ArthasServiceGrpc.ArthasServiceStub arthasServiceStub) {
void sendHeartbeat() {
AgentInfo agentInfo = agentInfoService.getAgentInfo();
HeartbeatRequest heartbeatRequest = HeartbeatRequest.newBuilder()
.setAgentId(agentInfo.getAgentId())
@ -251,21 +270,7 @@ public class ChannelClient {
.build();
logger.debug("sending heartbeat: {}", heartbeatRequest);
arthasServiceStub.heartbeat(heartbeatRequest, new StreamObserver<HeartbeatResponse>() {
@Override
public void onNext(HeartbeatResponse value) {
logger.debug("heartbeat result: {}", value);
}
@Override
public void onError(Throwable t) {
onClientError("send heartbeat error", t);
}
@Override
public void onCompleted() {
}
});
heartbeatRequestStreamObserver.onNext(heartbeatRequest);
lastHeartbeatTime = System.currentTimeMillis();
}
@ -314,7 +319,7 @@ public class ChannelClient {
long delta = System.currentTimeMillis() - lastHeartbeatTime;
if (delta >= heartbeatInterval * 1000) {
try {
sendHeartbeat(arthasServiceStub);
sendHeartbeat();
} catch (Throwable e) {
logger.error("send heartbeat failure", e);
}
@ -341,6 +346,9 @@ public class ChannelClient {
}
protected boolean isWellKnownError(Throwable ex) {
if (ex instanceof InterruptedException || ex instanceof TimeoutException) {
return true;
}
String error = ex.toString();
if (error.contains("UNAVAILABLE: Channel shutdownNow invoked")) {
return true;
@ -385,7 +393,7 @@ public class ChannelClient {
* @param reconnectDelay
*/
public void setReconnectDelay(int reconnectDelay) {
this.reconnectDelay = reconnectDelay;
this.reconnectDelay = Math.max(MIN_INTERVAL, reconnectDelay);
}
public int getHeartbeatInterval() {
@ -397,7 +405,7 @@ public class ChannelClient {
* @param heartbeatInterval
*/
public void setHeartbeatInterval(int heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
this.heartbeatInterval = Math.max(MIN_INTERVAL, heartbeatInterval);
}
public ScheduledExecutorService getExecutorService() {

@ -24,6 +24,21 @@ public class ChannelClientTest {
channelClient.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// test heartbeat
try {
for (int i = 0; i < 1000; i++) {
channelClient.sendHeartbeat();
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.in.read();
}

@ -13,4 +13,6 @@
<appender-ref ref="STDOUT" />
</root>
<logger name="com.alibaba.arthas.channel.client" level="DEBUG" />
</configuration>

@ -274,5 +274,5 @@ service ArthasService {
/*
* Report heartbeat to server from arthas client
*/
rpc heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
rpc heartbeat(stream HeartbeatRequest) returns (stream HeartbeatResponse);
}

@ -165,32 +165,36 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
@Override
public void register(AgentInfo request, StreamObserver<RegisterResult> responseObserver) {
long now = System.currentTimeMillis();
Optional<AgentVO> optionalAgentVO = agentManageService.findAgentById(request.getAgentId()).block();
AgentVO agentVO;
if (optionalAgentVO.isPresent()) {
agentVO = optionalAgentVO.get();
copyAgentVO(request, agentVO);
agentVO.setModifiedTime(now);
agentVO.setHeartbeatTime(now);
agentManageService.updateAgent(agentVO);
responseObserver.onNext(RegisterResult.newBuilder()
.setStatus(0)
.setMessage("Agent info has been updated: "+request.getAgentId())
.build());
} else {
agentVO = new AgentVO();
copyAgentVO(request, agentVO);
agentVO.setCreatedTime(now);
agentVO.setModifiedTime(now);
agentVO.setHeartbeatTime(now);
agentManageService.addAgent(agentVO);
responseObserver.onNext(RegisterResult.newBuilder()
.setStatus(0)
.setMessage("Agent info has been added: "+request.getAgentId())
.build());
try {
long now = System.currentTimeMillis();
Optional<AgentVO> optionalAgentVO = agentManageService.findAgentById(request.getAgentId()).block();
AgentVO agentVO;
if (optionalAgentVO.isPresent()) {
agentVO = optionalAgentVO.get();
copyAgentVO(request, agentVO);
agentVO.setModifiedTime(now);
agentVO.setHeartbeatTime(now);
agentManageService.updateAgent(agentVO);
responseObserver.onNext(RegisterResult.newBuilder()
.setStatus(0)
.setMessage("Agent info has been updated: "+request.getAgentId())
.build());
} else {
agentVO = new AgentVO();
copyAgentVO(request, agentVO);
agentVO.setCreatedTime(now);
agentVO.setModifiedTime(now);
agentVO.setHeartbeatTime(now);
agentManageService.addAgent(agentVO);
responseObserver.onNext(RegisterResult.newBuilder()
.setStatus(0)
.setMessage("Agent info has been added: "+request.getAgentId())
.build());
}
logger.info("register agent: "+agentVO.getAgentId());
} finally {
responseObserver.onCompleted();
}
logger.info("register agent: "+agentVO.getAgentId());
}
private void copyAgentVO(AgentInfo agentInfo, AgentVO agentVO) {
@ -208,19 +212,48 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
}
@Override
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
public StreamObserver<HeartbeatRequest> heartbeat(StreamObserver<HeartbeatResponse> responseObserver) {
return new StreamObserver<HeartbeatRequest>() {
@Override
public void onNext(HeartbeatRequest heartbeatRequest) {
handleHeartbeat(heartbeatRequest, responseObserver);
}
@Override
public void onError(Throwable t) {
//TODO 通知网络链路异常
logger.error("An error occurred in send heartbean response stream", t);
}
@Override
public void onCompleted() {
}
};
}
void handleHeartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
Optional<AgentVO> optionalAgentVO = agentManageService.findAgentById(request.getAgentId()).block();
if (!optionalAgentVO.isPresent()) {
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(1001)
.setMessage("Agent not found: "+request.getAgentId())
.setMessage("Agent not found: " + request.getAgentId())
.build());
return;
}
agentBizSerivce.heartbeat(request.getAgentId(), request.getAgentStatus().name(), request.getAgentVersion());
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(0)
.build());
try {
agentBizSerivce.heartbeat(request.getAgentId(), request.getAgentStatus().name(), request.getAgentVersion());
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(0)
.setMessage("heartbeat ok: " + request.getAgentId())
.build());
} catch (Exception e) {
logger.error("Heartbeat failure, agentId: " + request.getAgentId() + ", error: " + e.getMessage(), e);
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(1002)
.setMessage("Heartbeat failure: " + request.getAgentId())
.build());
}
}
}

@ -88,7 +88,7 @@ public class ArthasServiceClientTest {
return;
}
System.out.println("start client ..");
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 7700).usePlaintext(true).build();
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 7700).usePlaintext().build();
ArthasServiceGrpc.ArthasServiceStub arthasServiceStub = ArthasServiceGrpc.newStub(channel);
final AtomicReference<StreamObserver<ActionResponse>> actionResponseStreamObserverHolder = new AtomicReference<StreamObserver<ActionResponse>>();

@ -205,12 +205,28 @@ public class ArthasServiceServerJsonResultTest {
}
@Override
public void heartbeat(HeartbeatRequest heartbeatRequest, StreamObserver<HeartbeatResponse> responseObserver) {
System.out.println("heartbeat: " + heartbeatRequest);
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(0)
.build());
responseObserver.onCompleted();
public StreamObserver<HeartbeatRequest> heartbeat(StreamObserver<HeartbeatResponse> responseObserver) {
return new StreamObserver<HeartbeatRequest>() {
@Override
public void onNext(HeartbeatRequest heartbeatRequest) {
System.out.println("heartbeat: " + heartbeatRequest);
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(0)
.build());
}
@Override
public void onError(Throwable t) {
isError = true;
System.out.println("heartbeat onError");
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("heartbeat onCompleted");
}
};
}
public StreamObserver<ActionRequest> getActionRequestStreamObserver() {

@ -195,13 +195,13 @@ public class ArthasServiceServerTest {
@Override
public void onError(Throwable t) {
isError = true;
System.out.println("onError");
System.out.println("submitResponse onError");
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
System.out.println("submitResponse onCompleted");
}
};
}
@ -217,12 +217,28 @@ public class ArthasServiceServerTest {
}
@Override
public void heartbeat(HeartbeatRequest heartbeatRequest, StreamObserver<HeartbeatResponse> responseObserver) {
System.out.println("heartbeat: " + heartbeatRequest);
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(0)
.build());
responseObserver.onCompleted();
public StreamObserver<HeartbeatRequest> heartbeat(StreamObserver<HeartbeatResponse> responseObserver) {
return new StreamObserver<HeartbeatRequest>() {
@Override
public void onNext(HeartbeatRequest heartbeatRequest) {
System.out.println("heartbeat: " + heartbeatRequest);
responseObserver.onNext(HeartbeatResponse.newBuilder()
.setStatus(0)
.build());
}
@Override
public void onError(Throwable t) {
isError = true;
System.out.println("heartbeat onError");
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("heartbeat onCompleted");
}
};
}
public StreamObserver<ActionRequest> getActionRequestStreamObserver() {

Loading…
Cancel
Save