From 3bf413b9c7bacf4b06282d1e8f211eb207f281a4 Mon Sep 17 00:00:00 2001 From: gongdewei Date: Thu, 21 May 2020 21:59:53 +0800 Subject: [PATCH] add result distributors and consumer for http api --- .../CompositeResultDistributor.java | 12 + .../core/distribution/DistributorOptions.java | 14 ++ .../PackingResultDistributor.java | 14 ++ .../core/distribution/ResultConsumer.java | 43 ++++ .../distribution/ResultConsumerHelper.java | 94 ++++++++ .../SharingResultDistributor.java | 31 +++ .../impl/CompositeResultDistributorImpl.java | 52 +++++ .../impl/PackingResultDistributorImpl.java | 43 ++++ .../distribution/impl/ResultConsumerImpl.java | 167 +++++++++++++ .../impl/SharingResultDistributorImpl.java | 220 ++++++++++++++++++ 10 files changed, 690 insertions(+) create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/CompositeResultDistributor.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/DistributorOptions.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/PackingResultDistributor.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumer.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumerHelper.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/SharingResultDistributor.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/impl/CompositeResultDistributorImpl.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/impl/PackingResultDistributorImpl.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/impl/ResultConsumerImpl.java create mode 100644 core/src/main/java/com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl.java diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/CompositeResultDistributor.java b/core/src/main/java/com/taobao/arthas/core/distribution/CompositeResultDistributor.java new file mode 100644 index 000000000..dd4137d1b --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/CompositeResultDistributor.java @@ -0,0 +1,12 @@ +package com.taobao.arthas.core.distribution; + +/** + * 复合结果分发器,将消息同时分发给其包含的多个真实分发器 + * @author gongdewei 2020/4/30 + */ +public interface CompositeResultDistributor extends ResultDistributor { + + void addDistributor(ResultDistributor distributor); + + void removeDistributor(ResultDistributor distributor); +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/DistributorOptions.java b/core/src/main/java/com/taobao/arthas/core/distribution/DistributorOptions.java new file mode 100644 index 000000000..f7d7e59e9 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/DistributorOptions.java @@ -0,0 +1,14 @@ +package com.taobao.arthas.core.distribution; + +/** + * 命令结果分发器选项 + * @author gongdewei 2020/5/18 + */ +public class DistributorOptions { + + /** + * ResultConsumer的结果队列长度,用于控制内存缓存的命令结果数据量 + */ + public static int resultQueueSize = 50; + +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/PackingResultDistributor.java b/core/src/main/java/com/taobao/arthas/core/distribution/PackingResultDistributor.java new file mode 100644 index 000000000..2e5e3b8b5 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/PackingResultDistributor.java @@ -0,0 +1,14 @@ +package com.taobao.arthas.core.distribution; + +import com.taobao.arthas.core.command.model.ResultModel; + +import java.util.List; + +public interface PackingResultDistributor extends ResultDistributor { + + /** + * Get results of command + */ + List getResults(); + +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumer.java b/core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumer.java new file mode 100644 index 000000000..e9d62f432 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumer.java @@ -0,0 +1,43 @@ +package com.taobao.arthas.core.distribution; + +import com.taobao.arthas.core.command.model.ResultModel; + +import java.util.List; + +/** + * Command result consumer + * @author gongdewei 2020-03-26 + */ +public interface ResultConsumer { + + /** + * Append the phased result to queue + * @param result a phased result of the command + * @return true means distribution success, return false means discard data + */ + boolean appendResult(ResultModel result); + + /** + * Retrieves and removes a pack of results from the head + * @return a pack of results + */ + List pollResults(); + + long getLastAccessTime(); + + void close(); + + boolean isClosed(); + + boolean isPolling(); + + String getConsumerId(); + + void setConsumerId(String consumerId); + + /** + * Retrieves the consumer's health status + * @return + */ + boolean isHealthy(); +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumerHelper.java b/core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumerHelper.java new file mode 100644 index 000000000..d123f9673 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/ResultConsumerHelper.java @@ -0,0 +1,94 @@ +package com.taobao.arthas.core.distribution; + +import com.alibaba.fastjson.JSON; +import com.taobao.arthas.core.command.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author gongdewei 2020/5/18 + */ +public class ResultConsumerHelper { + + private static final Logger logger = LoggerFactory.getLogger(ResultConsumerHelper.class); + + private static Map> modelFieldMap = new ConcurrentHashMap>(); + + /** + * 估算命令执行结果的item数量 + * 注意:此方法调用频繁,不能产生内存碎片 + * @param model + * @return + */ + public static int getItemCount(ResultModel model) { + int count = processSpecialCommand(model); + if (count > 0) { + return count; + } + + //TODO 抽取ItemSet/ItemGroup接口,解决ClassSetVO/mbean等分组的情况 + + //缓存Field对象,避免产生内存碎片 + Class modelClass = model.getClass(); + List fields = modelFieldMap.get(modelClass.getName()); + if (fields == null) { + fields = new ArrayList(); + modelFieldMap.put(modelClass.getName(), fields); + Field[] declaredFields = modelClass.getDeclaredFields(); + for (int i = 0; i < declaredFields.length; i++) { + Field field = declaredFields[i]; + Class fieldClass = field.getType(); + if (Collection.class.isAssignableFrom(fieldClass) + || Map.class.isAssignableFrom(fieldClass) + || fieldClass.isArray() + /* || fieldClass == ClassSetVO.class*/) { + field.setAccessible(true); + fields.add(field); + } + } + } + + //获取item数量 + try { + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + if (!field.isAccessible()) { + field.setAccessible(true); + } + Object value = field.get(model); + if (value != null) { + if (value instanceof Collection) { + return ((Collection) value).size(); + } else if (value.getClass().isArray()) { + return Array.getLength(value); + } else if (value instanceof Map) { + return ((Map) value).size(); +// } else if (value.getClass() == ClassSetVO.class) { +// return ((ClassSetVO) value).getClasses().size(); + } + } + } + } catch (Exception e) { + logger.error("get item count of result model failed, model: {}", JSON.toJSONString(model), e); + } + + return 1; + } + + private static int processSpecialCommand(ResultModel model) { +// if (model instanceof CatModel) { +// //特殊处理cat +// return ((CatModel) model).getContent().length()/100 + 1 ; +// } else if (model instanceof TraceModel) { +// //特殊处理trace +// return ((TraceModel) model).getNodeCount(); +// } + return 0; + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/SharingResultDistributor.java b/core/src/main/java/com/taobao/arthas/core/distribution/SharingResultDistributor.java new file mode 100644 index 000000000..2e7228fe1 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/SharingResultDistributor.java @@ -0,0 +1,31 @@ +package com.taobao.arthas.core.distribution; + +import java.util.List; + +public interface SharingResultDistributor extends ResultDistributor { + + /** + * Add consumer to sharing session + * @param consumer + */ + void addConsumer(ResultConsumer consumer); + + /** + * Remove consumer from sharing session + * @param consumer + */ + void removeConsumer(ResultConsumer consumer); + + /** + * Get all consumers of session + * @return + */ + List getConsumers(); + + /** + * Get consumer by id + * @param consumerId + * @return + */ + ResultConsumer getConsumer(String consumerId); +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/impl/CompositeResultDistributorImpl.java b/core/src/main/java/com/taobao/arthas/core/distribution/impl/CompositeResultDistributorImpl.java new file mode 100644 index 000000000..eb254622a --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/impl/CompositeResultDistributorImpl.java @@ -0,0 +1,52 @@ +package com.taobao.arthas.core.distribution.impl; + +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.distribution.CompositeResultDistributor; +import com.taobao.arthas.core.distribution.ResultDistributor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * 复合结果分发器,将消息同时分发给其包含的真实分发器 + * + * @author gongdewei 2020/4/30 + */ +public class CompositeResultDistributorImpl implements CompositeResultDistributor { + + private List distributors = Collections.synchronizedList(new ArrayList()); + + public CompositeResultDistributorImpl() { + } + + public CompositeResultDistributorImpl(ResultDistributor ... distributors) { + for (ResultDistributor distributor : distributors) { + this.addDistributor(distributor); + } + } + + @Override + public void addDistributor(ResultDistributor distributor) { + distributors.add(distributor); + } + + @Override + public void removeDistributor(ResultDistributor distributor) { + distributors.remove(distributor); + } + + @Override + public void appendResult(ResultModel result) { + for (ResultDistributor distributor : distributors) { + distributor.appendResult(result); + } + } + + @Override + public void close() { + for (ResultDistributor distributor : distributors) { + distributor.close(); + } + } +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/impl/PackingResultDistributorImpl.java b/core/src/main/java/com/taobao/arthas/core/distribution/impl/PackingResultDistributorImpl.java new file mode 100644 index 000000000..9903b8e48 --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/impl/PackingResultDistributorImpl.java @@ -0,0 +1,43 @@ +package com.taobao.arthas.core.distribution.impl; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.alibaba.fastjson.JSON; +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.distribution.PackingResultDistributor; +import com.taobao.arthas.core.shell.session.Session; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +public class PackingResultDistributorImpl implements PackingResultDistributor { + private static final Logger logger = LoggerFactory.getLogger(PackingResultDistributorImpl.class); + + private BlockingQueue resultQueue = new ArrayBlockingQueue(500); + private final Session session; + + public PackingResultDistributorImpl(Session session) { + this.session = session; + } + + @Override + public void appendResult(ResultModel result) { + if (!resultQueue.offer(result)) { + logger.warn("result queue is full: {}, discard later result: {}", resultQueue.size(), JSON.toJSONString(result)); + } + } + + @Override + public void close() { + } + + @Override + public List getResults() { + ArrayList results = new ArrayList(resultQueue.size()); + resultQueue.drainTo(results); + return results; + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/impl/ResultConsumerImpl.java b/core/src/main/java/com/taobao/arthas/core/distribution/impl/ResultConsumerImpl.java new file mode 100644 index 000000000..d3f3a9eab --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/impl/ResultConsumerImpl.java @@ -0,0 +1,167 @@ +package com.taobao.arthas.core.distribution.impl; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.alibaba.fastjson.JSON; +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.distribution.DistributorOptions; +import com.taobao.arthas.core.distribution.ResultConsumer; +import com.taobao.arthas.core.distribution.ResultConsumerHelper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author gongdewei 2020/3/27 + */ +public class ResultConsumerImpl implements ResultConsumer { + private static final Logger logger = LoggerFactory.getLogger(ResultConsumerImpl.class); + private BlockingQueue resultQueue; + private volatile long lastAccessTime; + private volatile boolean polling; + private ReentrantLock lock = new ReentrantLock(); + private int resultBatchSizeLimit = 20; + private int resultQueueSize = DistributorOptions.resultQueueSize; + private long pollTimeLimit = 2 * 1000; + private String consumerId; + private boolean closed; + private long sendingItemCount; + + public ResultConsumerImpl() { + lastAccessTime = System.currentTimeMillis(); + resultQueue = new ArrayBlockingQueue(resultQueueSize); + } + + @Override + public boolean appendResult(ResultModel result) { + //可能某些Consumer已经断开,不会再读取,这里不能堵塞! + boolean discard = false; + while (!resultQueue.offer(result)) { + ResultModel discardResult = resultQueue.poll(); + discard = true; + } + return !discard; + } + + @Override + public List pollResults() { + try { + lastAccessTime = System.currentTimeMillis(); + long accessTime = lastAccessTime; + if (lock.tryLock(500, TimeUnit.MILLISECONDS)) { + polling = true; + sendingItemCount = 0; + long firstResultTime = 0; + // sending delay: time elapsed after firstResultTime + long sendingDelay = 0; + // waiting time: time elapsed after access + long waitingTime = 0; + List sendingResults = new ArrayList(resultBatchSizeLimit); + + while (!closed + &&sendingResults.size() < resultBatchSizeLimit + && sendingDelay < 100 + && waitingTime < pollTimeLimit) { + ResultModel aResult = resultQueue.poll(100, TimeUnit.MILLISECONDS); + if (aResult != null) { + sendingResults.add(aResult); + //是否为第一次获取到数据 + if (firstResultTime == 0) { + firstResultTime = System.currentTimeMillis(); + } + //判断是否需要立即发送出去 + if (shouldFlush(sendingResults, aResult)) { + break; + } + } else { + if (firstResultTime > 0) { + //获取到部分数据后,队列已经取完,计算发送延时时间 + sendingDelay = System.currentTimeMillis() - firstResultTime; + } + //计算总共等待时间,长轮询最大等待时间 + waitingTime = System.currentTimeMillis() - accessTime; + } + } + + //resultQueue.drainTo(sendingResults, resultSizeLimit-sendingResults.size()); + if(logger.isDebugEnabled()) { + logger.debug("pollResults: {}, results: {}", sendingResults.size(), JSON.toJSONString(sendingResults)); + } + return sendingResults; + } + } catch (InterruptedException e) { + //e.printStackTrace(); + } finally { + if (lock.isHeldByCurrentThread()) { + lastAccessTime = System.currentTimeMillis(); + polling = false; + lock.unlock(); + } + } + return Collections.emptyList(); + } + + /** + * 估算对象数量及大小,判断是否需要立即发送出去 + * @param sendingResults + * @param last + * @return + */ + private boolean shouldFlush(List sendingResults, ResultModel last) { + //TODO 引入一个估算模型,每个model自统计对象数量 + sendingItemCount += ResultConsumerHelper.getItemCount(last); + return sendingItemCount >= 100; + } + + @Override + public boolean isHealthy() { + + return isPolling() + || resultQueue.size() < resultQueueSize + || System.currentTimeMillis() - lastAccessTime < 1000; + } + + @Override + public long getLastAccessTime() { + return lastAccessTime; + } + + @Override + public void close(){ + this.closed = true; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public boolean isPolling() { + return polling; + } + + public int getResultBatchSizeLimit() { + return resultBatchSizeLimit; + } + + public void setResultBatchSizeLimit(int resultBatchSizeLimit) { + this.resultBatchSizeLimit = resultBatchSizeLimit; + } + + @Override + public String getConsumerId() { + return consumerId; + } + + @Override + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; + } + +} diff --git a/core/src/main/java/com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl.java b/core/src/main/java/com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl.java new file mode 100644 index 000000000..3ecd4ca2a --- /dev/null +++ b/core/src/main/java/com/taobao/arthas/core/distribution/impl/SharingResultDistributorImpl.java @@ -0,0 +1,220 @@ +package com.taobao.arthas.core.distribution.impl; + +import com.alibaba.arthas.deps.org.slf4j.Logger; +import com.alibaba.arthas.deps.org.slf4j.LoggerFactory; +import com.taobao.arthas.core.command.model.InputStatusModel; +import com.taobao.arthas.core.command.model.MessageModel; +import com.taobao.arthas.core.command.model.ResultModel; +import com.taobao.arthas.core.distribution.DistributorOptions; +import com.taobao.arthas.core.distribution.ResultConsumer; +import com.taobao.arthas.core.distribution.SharingResultDistributor; +import com.taobao.arthas.core.shell.session.Session; +import com.taobao.arthas.core.shell.system.Job; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +public class SharingResultDistributorImpl implements SharingResultDistributor { + private static final Logger logger = LoggerFactory.getLogger(SharingResultDistributorImpl.class); + + private List consumers = new CopyOnWriteArrayList(); + private BlockingQueue pendingResultQueue = new ArrayBlockingQueue(10); + private final Session session; + private Thread distributorThread; + private volatile boolean running; + private AtomicInteger consumerNumGenerator = new AtomicInteger(0); + + private SharingResultConsumerImpl sharingResultConsumer = new SharingResultConsumerImpl(); + + public SharingResultDistributorImpl(Session session) { + this.session = session; + this.running = true; + distributorThread = new Thread(new DistributorTask(), "ResultDistributor"); + distributorThread.start(); + } + + @Override + public void appendResult(ResultModel result) { + //要避免阻塞影响业务线程执行 + try { + if (!pendingResultQueue.offer(result, 100, TimeUnit.MILLISECONDS)) { + ResultModel discardResult = pendingResultQueue.poll(); + // 正常情况走不到这里,除非distribute 循环堵塞或异常终止 + // 输出队列满,终止当前执行的命令 + interruptJob("result queue is full: "+ pendingResultQueue.size()); + } + } catch (InterruptedException e) { + //ignore + } + } + + private void interruptJob(String message) { + Job job = session.getForegroundJob(); + if (job != null) { + logger.warn(message+", current job was interrupted.", job.id()); + job.interrupt(); + pendingResultQueue.offer(new MessageModel(message+", current job was interrupted.")); + } + } + + private void distribute() { + while (running) { + try { + ResultModel result = pendingResultQueue.poll(100, TimeUnit.MILLISECONDS); + if (result != null) { + sharingResultConsumer.appendResult(result); + //判断是否有至少一个consumer是健康的 + int healthCount = 0; + for (int i = 0; i < consumers.size(); i++) { + ResultConsumer consumer = consumers.get(i); + if(consumer.isHealthy()){ + healthCount += 1; + } + consumer.appendResult(result); + } + //所有consumer都不是健康状态,终止当前执行的命令 + if (healthCount == 0) { + interruptJob("all consumers are unhealthy"); + } + } + } catch (Throwable e) { + logger.warn("distribute result failed: " + e.getMessage(), e); + } + } + } + + @Override + public void close() { + this.running = false; + } + + @Override + public void addConsumer(ResultConsumer consumer) { + int consumerNo = consumerNumGenerator.incrementAndGet(); + String consumerId = UUID.randomUUID().toString().replaceAll("-", "") + "_" + consumerNo; + consumer.setConsumerId(consumerId); + + //将队列中的消息复制给新的消费者 + sharingResultConsumer.copyTo(consumer); + + consumers.add(consumer); + } + + @Override + public void removeConsumer(ResultConsumer consumer) { + consumers.remove(consumer); + consumer.close(); + } + + @Override + public List getConsumers() { + return consumers; + } + + @Override + public ResultConsumer getConsumer(String consumerId) { + for (int i = 0; i < consumers.size(); i++) { + ResultConsumer consumer = consumers.get(i); + if (consumer.getConsumerId().equals(consumerId)) { + return consumer; + } + } + return null; + } + + private class DistributorTask implements Runnable { + @Override + public void run() { + distribute(); + } + } + + private class SharingResultConsumerImpl implements ResultConsumer { + private BlockingQueue resultQueue = new ArrayBlockingQueue(DistributorOptions.resultQueueSize); + private ReentrantLock queueLock = new ReentrantLock(); + private InputStatusModel lastInputStatus; + + @Override + public boolean appendResult(ResultModel result) { + queueLock.lock(); + try { + //输入状态不入历史指令队列,复制时在最后发送 + if (result instanceof InputStatusModel) { + lastInputStatus = (InputStatusModel) result; + return true; + } + while (!resultQueue.offer(result)) { + ResultModel discardResult = resultQueue.poll(); + } + } finally { + if (queueLock.isHeldByCurrentThread()) { + queueLock.unlock(); + } + } + return true; + } + + public void copyTo(ResultConsumer consumer) { + //复制时加锁,避免消息顺序错乱,这里堵塞只影响分发线程,不会影响到业务线程 + queueLock.lock(); + try { + for (ResultModel result : resultQueue) { + consumer.appendResult(result); + } + //发送输入状态 + if (lastInputStatus != null) { + consumer.appendResult(lastInputStatus); + } + } finally { + if (queueLock.isHeldByCurrentThread()) { + queueLock.unlock(); + } + } + } + + @Override + public List pollResults() { + return null; + } + + @Override + public long getLastAccessTime() { + return 0; + } + + @Override + public void close() { + + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public boolean isPolling() { + return false; + } + + @Override + public String getConsumerId() { + return "shared-consumer"; + } + + @Override + public void setConsumerId(String consumerId) { + } + + @Override + public boolean isHealthy() { + return true; + } + } +}