《Apache Dubbo微服务开发从入门到精通》——可视化监测服务状态——二、 微服务集群监控(3) https://developer.aliyun.com/article/1224284
聚合收集器
public class AggregateMetricsCollector implements MetricsCollector, MetricsListener { private int bucketNum; private int timeWindowSeconds; private final Map<MethodMetric, TimeWindowCounter> totalRequests = new ConcurrentHashMap<>(); private final Map<MethodMetric, TimeWindowCounter> succeedRequests = new ConcurrentHashMap<>(); private final Map<MethodMetric, TimeWindowCounter> failedRequests = new ConcurrentHashMap<>(); private final Map<MethodMetric, TimeWindowCounter> qps = new ConcurrentHashMap<>(); private final Map<MethodMetric, TimeWindowQuantile> rt = new ConcurrentHashMap<>(); private final ApplicationModel applicationModel; private static final Integer DEFAULT_COMPRESSION = 100; private static final Integer DEFAULT_BUCKET_NUM = 10; private static final Integer DEFAULT_TIME_WINDOW_SECONDS = 120; //在构造函数中解析配置信息 public AggregateMetricsCollector(ApplicationModel applicationModel) { this.applicationModel = applicationModel; ConfigManager configManager = applicationModel.getApplicationConfigManager(); MetricsConfig config = configManager.getMetrics().orElse(null); if (config != null && config.getAggregation() != null && Boolean.TRUE.equals(config.getAggregation().getEnabled())) { // only registered when aggregation is enabled. registerListener(); AggregationConfig aggregation = config.getAggregation(); this.bucketNum = aggregation.getBucketNum() == null ? DEFAULT_BUCKET_NUM : aggregation.getBucketNum(); this.timeWindowSeconds = aggregation.getTimeWindowSeconds() == null ? DEFAULT_TIME_WINDOW_SECONDS : aggregation.getTimeWindowSeconds(); } } }
如果开启了本地聚合,则通过spring的BeanFactory添加监听,将AggregateMetricsCollector与DefaultMetricsCollector绑定,实现一种生存者消费者的模式,DefaultMetricsCollector中使用监听器列表,方便扩展。
c) 指标聚合
滑动窗口
假设我们初始有6个bucket,每个窗口时间设置为2分钟,每次写入指标数据时,会将数据分别写入6个bucket内,每隔两分钟移动一个bucket并且清除原来bucket内的数据。读取指标时,读取当前current指向的bucket,以达到滑动窗口的效果。
具体如下图所示,实现了当前 bucket 内存储了配置中设置的bucket生命周期内的数据,即近期数据。
在每个bucket内,使用TDigest算法计算分位数指标。
注:
TDigest算法(极端分位精确度高,如p1 p99,中间分位精确度低,如p50),相关资料如下
• https://op8867555.github.io/posts/2018-04-09-tdigest.html
• https://blog.csdn.net/csdnnews/article/details/116246540
• 开源实现:https://github.com/tdunning/t-digest
代码实现如下,除了TimeWindowQuantile用来计算分位数指标外,另外提供了TimeWindowCounter来收集时间区间内的指标数量。
public class TimeWindowQuantile { private final double compression; private final TDigest[] ringBuffer; private int currentBucket; private long lastRotateTimestampMillis; private final long durationBetweenRotatesMillis; public TimeWindowQuantile(double compression, int bucketNum, int timeWindowSeconds) { this.compression = compression; this.ringBuffer = new TDigest[bucketNum]; for (int i = 0; i < bucketNum; i++) { this.ringBuffer[i] = TDigest.createDigest(compression); } this.currentBucket = 0; this.lastRotateTimestampMillis = System.currentTimeMillis(); this.durationBetweenRotatesMillis = TimeUnit.SECONDS.toMillis(timeWindowSeconds) / bucketNum; } public synchronized double quantile(double q) { TDigest currentBucket = rotate(); return currentBucket.quantile(q); } public synchronized void add(double value) { rotate(); for (TDigest bucket : ringBuffer) { bucket.add(value); } } private TDigest rotate() { long timeSinceLastRotateMillis = System.currentTimeMillis() - lastRotateTimestampMillis; while (timeSinceLastRotateMillis > durationBetweenRotatesMillis) { ringBuffer[currentBucket] = TDigest.createDigest(compression); if (++currentBucket >= ringBuffer.length) { currentBucket = 0; } timeSinceLastRotateMillis -= durationBetweenRotatesMillis; lastRotateTimestampMillis += durationBetweenRotatesMillis; } return ringBuffer[currentBucket]; } }
《Apache Dubbo微服务开发从入门到精通》——可视化监测服务状态——二、 微服务集群监控(5) https://developer.aliyun.com/article/1224282