|
|
@ -79,15 +79,18 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
actionRequest = ActionRequest.parseFrom(messageBytes);
|
|
|
|
actionRequest = ActionRequest.parseFrom(messageBytes);
|
|
|
|
} catch (Throwable e) {
|
|
|
|
} catch (Throwable e) {
|
|
|
|
logger.error("parse action request message failure", e);
|
|
|
|
logger.error("parse request message failure", e);
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
responseObserver.onNext(actionRequest);
|
|
|
|
responseObserver.onNext(actionRequest);
|
|
|
|
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
|
|
|
|
|
logger.debug("sent request to agent, agentId: {}, request: {}", agentId, actionRequest);
|
|
|
|
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
} catch (Throwable e) {
|
|
|
|
} catch (Throwable e) {
|
|
|
|
logger.error("send action request message to arthas agent failure", e);
|
|
|
|
logger.error("send request message to arthas agent failure", e);
|
|
|
|
//TODO 如何通知请求来源方发送请求失败?通知打开的WebConsole
|
|
|
|
//TODO 如何通知请求来源方发送请求失败?通知打开的WebConsole
|
|
|
|
agentBizSerivce.compareAndUpdateAgentStatus(agentId, AgentStatus.IN_SERVICE, AgentStatus.OUT_OF_SERVICE);
|
|
|
|
agentBizSerivce.compareAndUpdateAgentStatus(agentId, AgentStatus.IN_SERVICE, AgentStatus.OUT_OF_SERVICE);
|
|
|
|
//responseObserver.onError(e);
|
|
|
|
//responseObserver.onError(e);
|
|
|
@ -123,20 +126,24 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
|
|
|
|
GeneralResult.Builder resultBuilder = GeneralResult.newBuilder();
|
|
|
|
GeneralResult.Builder resultBuilder = GeneralResult.newBuilder();
|
|
|
|
ActionResponseTopic responseTopic = new ActionResponseTopic(agentId, actionResponse.getRequestId());
|
|
|
|
ActionResponseTopic responseTopic = new ActionResponseTopic(agentId, actionResponse.getRequestId());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (logger.isDebugEnabled()) {
|
|
|
|
|
|
|
|
logger.debug("receive response from agent, agentId:{}, response: {}", agentId, actionResponse);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
messageExchangeService.pushMessage(responseTopic, messageBytes);
|
|
|
|
messageExchangeService.pushMessage(responseTopic, messageBytes);
|
|
|
|
resultBuilder.setStatus(0);
|
|
|
|
resultBuilder.setStatus(0);
|
|
|
|
} catch (Throwable e) {
|
|
|
|
} catch (Throwable e) {
|
|
|
|
resultBuilder
|
|
|
|
resultBuilder
|
|
|
|
.setStatus(1000)
|
|
|
|
.setStatus(1000)
|
|
|
|
.setMessage("push message failure");
|
|
|
|
.setMessage("push response message failure");
|
|
|
|
logger.error("push action response message failure: "+responseTopic, e);
|
|
|
|
logger.error("push response message failure: "+responseTopic, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
responseObserver.onNext(resultBuilder.build());
|
|
|
|
responseObserver.onNext(resultBuilder.build());
|
|
|
|
} catch (Throwable e) {
|
|
|
|
} catch (Throwable e) {
|
|
|
|
logger.error("send response result failure", e);
|
|
|
|
logger.error("send response ack result failure", e);
|
|
|
|
responseObserver.onError(e);
|
|
|
|
responseObserver.onError(e);
|
|
|
|
//TODO 通知网络链路异常
|
|
|
|
//TODO 通知网络链路异常
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -145,11 +152,13 @@ public class ArthasServiceGrpcImpl extends ArthasServiceGrpc.ArthasServiceImplBa
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void onError(Throwable t) {
|
|
|
|
public void onError(Throwable t) {
|
|
|
|
//TODO 通知网络链路异常
|
|
|
|
//TODO 通知网络链路异常
|
|
|
|
|
|
|
|
logger.error("An error occurred in submit response stream", t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void onCompleted() {
|
|
|
|
public void onCompleted() {
|
|
|
|
//TODO 通知网络链路异常
|
|
|
|
//TODO 通知网络链路异常
|
|
|
|
|
|
|
|
logger.error("submit response stream has completed");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|