add result distributors and consumer for http api

pull/1203/head
gongdewei 5 years ago
parent 78316f340d
commit 3bf413b9c7

@ -0,0 +1,12 @@
package com.taobao.arthas.core.distribution;
/**
*
* @author gongdewei 2020/4/30
*/
public interface CompositeResultDistributor extends ResultDistributor {
void addDistributor(ResultDistributor distributor);
void removeDistributor(ResultDistributor distributor);
}

@ -0,0 +1,14 @@
package com.taobao.arthas.core.distribution;
/**
*
* @author gongdewei 2020/5/18
*/
public class DistributorOptions {
/**
* ResultConsumer
*/
public static int resultQueueSize = 50;
}

@ -0,0 +1,14 @@
package com.taobao.arthas.core.distribution;
import com.taobao.arthas.core.command.model.ResultModel;
import java.util.List;
public interface PackingResultDistributor extends ResultDistributor {
/**
* Get results of command
*/
List<ResultModel> getResults();
}

@ -0,0 +1,43 @@
package com.taobao.arthas.core.distribution;
import com.taobao.arthas.core.command.model.ResultModel;
import java.util.List;
/**
* Command result consumer
* @author gongdewei 2020-03-26
*/
public interface ResultConsumer {
/**
* Append the phased result to queue
* @param result a phased result of the command
* @return true means distribution success, return false means discard data
*/
boolean appendResult(ResultModel result);
/**
* Retrieves and removes a pack of results from the head
* @return a pack of results
*/
List<ResultModel> pollResults();
long getLastAccessTime();
void close();
boolean isClosed();
boolean isPolling();
String getConsumerId();
void setConsumerId(String consumerId);
/**
* Retrieves the consumer's health status
* @return
*/
boolean isHealthy();
}

@ -0,0 +1,94 @@
package com.taobao.arthas.core.distribution;
import com.alibaba.fastjson.JSON;
import com.taobao.arthas.core.command.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author gongdewei 2020/5/18
*/
public class ResultConsumerHelper {
private static final Logger logger = LoggerFactory.getLogger(ResultConsumerHelper.class);
private static Map<String, List<Field>> modelFieldMap = new ConcurrentHashMap<String, List<Field>>();
/**
* item
*
* @param model
* @return
*/
public static int getItemCount(ResultModel model) {
int count = processSpecialCommand(model);
if (count > 0) {
return count;
}
//TODO 抽取ItemSet/ItemGroup接口解决ClassSetVO/mbean等分组的情况
//缓存Field对象避免产生内存碎片
Class modelClass = model.getClass();
List<Field> fields = modelFieldMap.get(modelClass.getName());
if (fields == null) {
fields = new ArrayList<Field>();
modelFieldMap.put(modelClass.getName(), fields);
Field[] declaredFields = modelClass.getDeclaredFields();
for (int i = 0; i < declaredFields.length; i++) {
Field field = declaredFields[i];
Class<?> fieldClass = field.getType();
if (Collection.class.isAssignableFrom(fieldClass)
|| Map.class.isAssignableFrom(fieldClass)
|| fieldClass.isArray()
/* || fieldClass == ClassSetVO.class*/) {
field.setAccessible(true);
fields.add(field);
}
}
}
//获取item数量
try {
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (!field.isAccessible()) {
field.setAccessible(true);
}
Object value = field.get(model);
if (value != null) {
if (value instanceof Collection) {
return ((Collection) value).size();
} else if (value.getClass().isArray()) {
return Array.getLength(value);
} else if (value instanceof Map) {
return ((Map) value).size();
// } else if (value.getClass() == ClassSetVO.class) {
// return ((ClassSetVO) value).getClasses().size();
}
}
}
} catch (Exception e) {
logger.error("get item count of result model failed, model: {}", JSON.toJSONString(model), e);
}
return 1;
}
private static int processSpecialCommand(ResultModel model) {
// if (model instanceof CatModel) {
// //特殊处理cat
// return ((CatModel) model).getContent().length()/100 + 1 ;
// } else if (model instanceof TraceModel) {
// //特殊处理trace
// return ((TraceModel) model).getNodeCount();
// }
return 0;
}
}

@ -0,0 +1,31 @@
package com.taobao.arthas.core.distribution;
import java.util.List;
public interface SharingResultDistributor extends ResultDistributor {
/**
* Add consumer to sharing session
* @param consumer
*/
void addConsumer(ResultConsumer consumer);
/**
* Remove consumer from sharing session
* @param consumer
*/
void removeConsumer(ResultConsumer consumer);
/**
* Get all consumers of session
* @return
*/
List<ResultConsumer> getConsumers();
/**
* Get consumer by id
* @param consumerId
* @return
*/
ResultConsumer getConsumer(String consumerId);
}

@ -0,0 +1,52 @@
package com.taobao.arthas.core.distribution.impl;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.distribution.CompositeResultDistributor;
import com.taobao.arthas.core.distribution.ResultDistributor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*
*
* @author gongdewei 2020/4/30
*/
public class CompositeResultDistributorImpl implements CompositeResultDistributor {
private List<ResultDistributor> distributors = Collections.synchronizedList(new ArrayList<ResultDistributor>());
public CompositeResultDistributorImpl() {
}
public CompositeResultDistributorImpl(ResultDistributor ... distributors) {
for (ResultDistributor distributor : distributors) {
this.addDistributor(distributor);
}
}
@Override
public void addDistributor(ResultDistributor distributor) {
distributors.add(distributor);
}
@Override
public void removeDistributor(ResultDistributor distributor) {
distributors.remove(distributor);
}
@Override
public void appendResult(ResultModel result) {
for (ResultDistributor distributor : distributors) {
distributor.appendResult(result);
}
}
@Override
public void close() {
for (ResultDistributor distributor : distributors) {
distributor.close();
}
}
}

@ -0,0 +1,43 @@
package com.taobao.arthas.core.distribution.impl;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.distribution.PackingResultDistributor;
import com.taobao.arthas.core.shell.session.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class PackingResultDistributorImpl implements PackingResultDistributor {
private static final Logger logger = LoggerFactory.getLogger(PackingResultDistributorImpl.class);
private BlockingQueue<ResultModel> resultQueue = new ArrayBlockingQueue<ResultModel>(500);
private final Session session;
public PackingResultDistributorImpl(Session session) {
this.session = session;
}
@Override
public void appendResult(ResultModel result) {
if (!resultQueue.offer(result)) {
logger.warn("result queue is full: {}, discard later result: {}", resultQueue.size(), JSON.toJSONString(result));
}
}
@Override
public void close() {
}
@Override
public List<ResultModel> getResults() {
ArrayList<ResultModel> results = new ArrayList<ResultModel>(resultQueue.size());
resultQueue.drainTo(results);
return results;
}
}

@ -0,0 +1,167 @@
package com.taobao.arthas.core.distribution.impl;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.distribution.DistributorOptions;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.ResultConsumerHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author gongdewei 2020/3/27
*/
public class ResultConsumerImpl implements ResultConsumer {
private static final Logger logger = LoggerFactory.getLogger(ResultConsumerImpl.class);
private BlockingQueue<ResultModel> resultQueue;
private volatile long lastAccessTime;
private volatile boolean polling;
private ReentrantLock lock = new ReentrantLock();
private int resultBatchSizeLimit = 20;
private int resultQueueSize = DistributorOptions.resultQueueSize;
private long pollTimeLimit = 2 * 1000;
private String consumerId;
private boolean closed;
private long sendingItemCount;
public ResultConsumerImpl() {
lastAccessTime = System.currentTimeMillis();
resultQueue = new ArrayBlockingQueue<ResultModel>(resultQueueSize);
}
@Override
public boolean appendResult(ResultModel result) {
//可能某些Consumer已经断开不会再读取这里不能堵塞
boolean discard = false;
while (!resultQueue.offer(result)) {
ResultModel discardResult = resultQueue.poll();
discard = true;
}
return !discard;
}
@Override
public List<ResultModel> pollResults() {
try {
lastAccessTime = System.currentTimeMillis();
long accessTime = lastAccessTime;
if (lock.tryLock(500, TimeUnit.MILLISECONDS)) {
polling = true;
sendingItemCount = 0;
long firstResultTime = 0;
// sending delay: time elapsed after firstResultTime
long sendingDelay = 0;
// waiting time: time elapsed after access
long waitingTime = 0;
List<ResultModel> sendingResults = new ArrayList<ResultModel>(resultBatchSizeLimit);
while (!closed
&&sendingResults.size() < resultBatchSizeLimit
&& sendingDelay < 100
&& waitingTime < pollTimeLimit) {
ResultModel aResult = resultQueue.poll(100, TimeUnit.MILLISECONDS);
if (aResult != null) {
sendingResults.add(aResult);
//是否为第一次获取到数据
if (firstResultTime == 0) {
firstResultTime = System.currentTimeMillis();
}
//判断是否需要立即发送出去
if (shouldFlush(sendingResults, aResult)) {
break;
}
} else {
if (firstResultTime > 0) {
//获取到部分数据后,队列已经取完,计算发送延时时间
sendingDelay = System.currentTimeMillis() - firstResultTime;
}
//计算总共等待时间,长轮询最大等待时间
waitingTime = System.currentTimeMillis() - accessTime;
}
}
//resultQueue.drainTo(sendingResults, resultSizeLimit-sendingResults.size());
if(logger.isDebugEnabled()) {
logger.debug("pollResults: {}, results: {}", sendingResults.size(), JSON.toJSONString(sendingResults));
}
return sendingResults;
}
} catch (InterruptedException e) {
//e.printStackTrace();
} finally {
if (lock.isHeldByCurrentThread()) {
lastAccessTime = System.currentTimeMillis();
polling = false;
lock.unlock();
}
}
return Collections.emptyList();
}
/**
*
* @param sendingResults
* @param last
* @return
*/
private boolean shouldFlush(List<ResultModel> sendingResults, ResultModel last) {
//TODO 引入一个估算模型每个model自统计对象数量
sendingItemCount += ResultConsumerHelper.getItemCount(last);
return sendingItemCount >= 100;
}
@Override
public boolean isHealthy() {
return isPolling()
|| resultQueue.size() < resultQueueSize
|| System.currentTimeMillis() - lastAccessTime < 1000;
}
@Override
public long getLastAccessTime() {
return lastAccessTime;
}
@Override
public void close(){
this.closed = true;
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public boolean isPolling() {
return polling;
}
public int getResultBatchSizeLimit() {
return resultBatchSizeLimit;
}
public void setResultBatchSizeLimit(int resultBatchSizeLimit) {
this.resultBatchSizeLimit = resultBatchSizeLimit;
}
@Override
public String getConsumerId() {
return consumerId;
}
@Override
public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}
}

@ -0,0 +1,220 @@
package com.taobao.arthas.core.distribution.impl;
import com.alibaba.arthas.deps.org.slf4j.Logger;
import com.alibaba.arthas.deps.org.slf4j.LoggerFactory;
import com.taobao.arthas.core.command.model.InputStatusModel;
import com.taobao.arthas.core.command.model.MessageModel;
import com.taobao.arthas.core.command.model.ResultModel;
import com.taobao.arthas.core.distribution.DistributorOptions;
import com.taobao.arthas.core.distribution.ResultConsumer;
import com.taobao.arthas.core.distribution.SharingResultDistributor;
import com.taobao.arthas.core.shell.session.Session;
import com.taobao.arthas.core.shell.system.Job;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class SharingResultDistributorImpl implements SharingResultDistributor {
private static final Logger logger = LoggerFactory.getLogger(SharingResultDistributorImpl.class);
private List<ResultConsumer> consumers = new CopyOnWriteArrayList<ResultConsumer>();
private BlockingQueue<ResultModel> pendingResultQueue = new ArrayBlockingQueue<ResultModel>(10);
private final Session session;
private Thread distributorThread;
private volatile boolean running;
private AtomicInteger consumerNumGenerator = new AtomicInteger(0);
private SharingResultConsumerImpl sharingResultConsumer = new SharingResultConsumerImpl();
public SharingResultDistributorImpl(Session session) {
this.session = session;
this.running = true;
distributorThread = new Thread(new DistributorTask(), "ResultDistributor");
distributorThread.start();
}
@Override
public void appendResult(ResultModel result) {
//要避免阻塞影响业务线程执行
try {
if (!pendingResultQueue.offer(result, 100, TimeUnit.MILLISECONDS)) {
ResultModel discardResult = pendingResultQueue.poll();
// 正常情况走不到这里除非distribute 循环堵塞或异常终止
// 输出队列满,终止当前执行的命令
interruptJob("result queue is full: "+ pendingResultQueue.size());
}
} catch (InterruptedException e) {
//ignore
}
}
private void interruptJob(String message) {
Job job = session.getForegroundJob();
if (job != null) {
logger.warn(message+", current job was interrupted.", job.id());
job.interrupt();
pendingResultQueue.offer(new MessageModel(message+", current job was interrupted."));
}
}
private void distribute() {
while (running) {
try {
ResultModel result = pendingResultQueue.poll(100, TimeUnit.MILLISECONDS);
if (result != null) {
sharingResultConsumer.appendResult(result);
//判断是否有至少一个consumer是健康的
int healthCount = 0;
for (int i = 0; i < consumers.size(); i++) {
ResultConsumer consumer = consumers.get(i);
if(consumer.isHealthy()){
healthCount += 1;
}
consumer.appendResult(result);
}
//所有consumer都不是健康状态终止当前执行的命令
if (healthCount == 0) {
interruptJob("all consumers are unhealthy");
}
}
} catch (Throwable e) {
logger.warn("distribute result failed: " + e.getMessage(), e);
}
}
}
@Override
public void close() {
this.running = false;
}
@Override
public void addConsumer(ResultConsumer consumer) {
int consumerNo = consumerNumGenerator.incrementAndGet();
String consumerId = UUID.randomUUID().toString().replaceAll("-", "") + "_" + consumerNo;
consumer.setConsumerId(consumerId);
//将队列中的消息复制给新的消费者
sharingResultConsumer.copyTo(consumer);
consumers.add(consumer);
}
@Override
public void removeConsumer(ResultConsumer consumer) {
consumers.remove(consumer);
consumer.close();
}
@Override
public List<ResultConsumer> getConsumers() {
return consumers;
}
@Override
public ResultConsumer getConsumer(String consumerId) {
for (int i = 0; i < consumers.size(); i++) {
ResultConsumer consumer = consumers.get(i);
if (consumer.getConsumerId().equals(consumerId)) {
return consumer;
}
}
return null;
}
private class DistributorTask implements Runnable {
@Override
public void run() {
distribute();
}
}
private class SharingResultConsumerImpl implements ResultConsumer {
private BlockingQueue<ResultModel> resultQueue = new ArrayBlockingQueue<ResultModel>(DistributorOptions.resultQueueSize);
private ReentrantLock queueLock = new ReentrantLock();
private InputStatusModel lastInputStatus;
@Override
public boolean appendResult(ResultModel result) {
queueLock.lock();
try {
//输入状态不入历史指令队列,复制时在最后发送
if (result instanceof InputStatusModel) {
lastInputStatus = (InputStatusModel) result;
return true;
}
while (!resultQueue.offer(result)) {
ResultModel discardResult = resultQueue.poll();
}
} finally {
if (queueLock.isHeldByCurrentThread()) {
queueLock.unlock();
}
}
return true;
}
public void copyTo(ResultConsumer consumer) {
//复制时加锁,避免消息顺序错乱,这里堵塞只影响分发线程,不会影响到业务线程
queueLock.lock();
try {
for (ResultModel result : resultQueue) {
consumer.appendResult(result);
}
//发送输入状态
if (lastInputStatus != null) {
consumer.appendResult(lastInputStatus);
}
} finally {
if (queueLock.isHeldByCurrentThread()) {
queueLock.unlock();
}
}
}
@Override
public List<ResultModel> pollResults() {
return null;
}
@Override
public long getLastAccessTime() {
return 0;
}
@Override
public void close() {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public boolean isPolling() {
return false;
}
@Override
public String getConsumerId() {
return "shared-consumer";
}
@Override
public void setConsumerId(String consumerId) {
}
@Override
public boolean isHealthy() {
return true;
}
}
}
Loading…
Cancel
Save